diff --git a/ConflictSet.cpp b/ConflictSet.cpp index 3c583a4..7e35b57 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -17,9 +17,9 @@ limitations under the License. #include "ConflictSet.h" #include "Internal.h" #include "LongestCommonPrefix.h" +#include "Metrics.h" #include -#include #include #include #include @@ -578,39 +578,6 @@ static_assert(kBytesPerKey - sizeof(Node0) >= kMinNodeSurplus); constexpr int64_t kFreeListMaxMemory = 1 << 20; -struct Metric { - Metric *prev; - const char *name; - const char *help; - ConflictSet::MetricsV1::Type type; - std::atomic value; - -protected: - Metric(ConflictSet::Impl *impl, const char *name, const char *help, - ConflictSet::MetricsV1::Type type); -}; - -struct Gauge : private Metric { - Gauge(ConflictSet::Impl *impl, const char *name, const char *help) - : Metric(impl, name, help, ConflictSet::MetricsV1::Gauge) {} - - void set(int64_t value) { - this->value.store(value, std::memory_order_relaxed); - } -}; - -struct Counter : private Metric { - Counter(ConflictSet::Impl *impl, const char *name, const char *help) - : Metric(impl, name, help, ConflictSet::MetricsV1::Counter) {} - // Expensive. Accumulate locally and then call add instead of repeatedly - // calling add. - void add(int64_t value) { - assert(value >= 0); - static_assert(std::atomic::is_always_lock_free); - this->value.fetch_add(value, std::memory_order_relaxed); - } -}; - template struct BoundedFreeListAllocator { static_assert(sizeof(T) >= sizeof(void *)); @@ -3242,7 +3209,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { 0) * 2; - memory_bytes.set(totalBytes); point_writes_total.add(tls.accum.point_writes); range_writes_total.add(tls.accum.range_writes); nodes_allocated_total.add(tls.accum.nodes_allocated); @@ -3331,7 +3297,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { #endif keyUpdates = gcScanStep(keyUpdates); - memory_bytes.set(totalBytes); nodes_allocated_total.add(std::exchange(tls.accum.nodes_allocated, 0)); nodes_released_total.add(std::exchange(tls.accum.nodes_released, 0)); entries_inserted_total.add(std::exchange(tls.accum.entries_inserted, 0)); @@ -3379,7 +3344,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { explicit Impl(int64_t oldestVersion) { assert(oldestVersion >= 0); init(oldestVersion); - initMetrics(); + metrics = initMetrics(metricsList, metricsCount); } ~Impl() { eraseTree(root, &tls); @@ -3402,23 +3367,12 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { MetricsV1 *metrics; int metricsCount = 0; - void initMetrics() { - metrics = (MetricsV1 *)safe_malloc(metricsCount * sizeof(metrics[0])); - for (auto [i, m] = std::make_tuple(metricsCount - 1, metricList); i >= 0; - --i, m = m->prev) { - metrics[i].name = m->name; - metrics[i].help = m->help; - metrics[i].p = m; - metrics[i].type = m->type; - } - } - - Metric *metricList = nullptr; + Metric *metricsList = nullptr; #define GAUGE(name, help) \ - Gauge name { this, #name, help } + Gauge name { metricsList, metricsCount, #name, help } #define COUNTER(name, help) \ - Counter name { this, #name, help } + Counter name { metricsList, metricsCount, #name, help } // ==================== METRICS DEFINITIONS ==================== COUNTER(point_read_total, "Total number of point reads checked"); COUNTER(point_read_short_circuit_total, @@ -3484,13 +3438,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { } }; -Metric::Metric(ConflictSet::Impl *impl, const char *name, const char *help, - ConflictSet::MetricsV1::Type type) - : prev(std::exchange(impl->metricList, this)), name(name), help(help), - type(type), value(0) { - ++impl->metricsCount; -} - Node *&getInTree(Node *n, ConflictSet::Impl *impl) { return n->parent == nullptr ? impl->root : getChildExists(n->parent, n->parentsIndex); @@ -3509,6 +3456,7 @@ void internal_addWrites(ConflictSet::Impl *impl, mallocBytesDelta = 0; impl->addWrites(writes, count, writeVersion); impl->totalBytes += mallocBytesDelta; + impl->memory_bytes.set(impl->totalBytes); #if SHOW_MEMORY if (impl->totalBytes != mallocBytes) { abort(); @@ -3520,6 +3468,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) { mallocBytesDelta = 0; impl->setOldestVersion(oldestVersion); impl->totalBytes += mallocBytesDelta; + impl->memory_bytes.set(impl->totalBytes); #if SHOW_MEMORY if (impl->totalBytes != mallocBytes) { abort(); diff --git a/Jenkinsfile b/Jenkinsfile index 22f8f63..a5287fc 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -129,7 +129,7 @@ pipeline { } steps { script { - filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h" + filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h" } CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON") sh """ diff --git a/Metrics.h b/Metrics.h new file mode 100644 index 0000000..b093bcd --- /dev/null +++ b/Metrics.h @@ -0,0 +1,64 @@ +#pragma once + +#include "ConflictSet.h" +#include "Internal.h" + +#include +#include +#include + +struct Metric { + Metric *prev; + const char *name; + const char *help; + weaselab::ConflictSet::MetricsV1::Type type; + std::atomic value; + +protected: + Metric(Metric *&metricList, int &metricsCount, const char *name, + const char *help, weaselab::ConflictSet::MetricsV1::Type type) + : prev(std::exchange(metricList, this)), name(name), help(help), + type(type), value(0) { + ++metricsCount; + } +}; + +struct Gauge : private Metric { + Gauge(Metric *&metricList, int &metricsCount, const char *name, + const char *help) + : Metric(metricList, metricsCount, name, help, + weaselab::ConflictSet::MetricsV1::Gauge) {} + + void set(int64_t value) { + this->value.store(value, std::memory_order_relaxed); + } +}; + +struct Counter : private Metric { + Counter(Metric *&metricList, int &metricsCount, const char *name, + const char *help) + : Metric(metricList, metricsCount, name, help, + weaselab::ConflictSet::MetricsV1::Counter) {} + // Expensive. Accumulate locally and then call add instead of repeatedly + // calling add. + void add(int64_t value) { + assert(value >= 0); + static_assert(std::atomic::is_always_lock_free); + this->value.fetch_add(value, std::memory_order_relaxed); + } +}; + +inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList, + int metricsCount) { + weaselab::ConflictSet::MetricsV1 *metrics = + (weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount * + sizeof(metrics[0])); + for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0; + --i, m = m->prev) { + metrics[i].name = m->name; + metrics[i].help = m->help; + metrics[i].p = m; + metrics[i].type = m->type; + } + return metrics; +} \ No newline at end of file diff --git a/ServerBench.cpp b/ServerBench.cpp index 96e519a..c7c9af8 100644 --- a/ServerBench.cpp +++ b/ServerBench.cpp @@ -21,7 +21,7 @@ std::atomic transactions; -constexpr int kBaseSearchDepth = 32; +constexpr int kBaseSearchDepth = 115; constexpr int kWindowSize = 10000000; std::string numToKey(int64_t num) { diff --git a/SkipList.cpp b/SkipList.cpp index e0bf4e1..e2e1a4e 100644 --- a/SkipList.cpp +++ b/SkipList.cpp @@ -22,6 +22,7 @@ #include "ConflictSet.h" #include "Internal.h" +#include "Metrics.h" #include #include @@ -434,13 +435,14 @@ public: return result; } - void detectConflicts(ReadConflictRange *ranges, int count, - ConflictSet::Result *transactionConflictStatus) const { + // Return number of iterations of main loop + int detectConflicts(ReadConflictRange *ranges, int count, + ConflictSet::Result *transactionConflictStatus) const { const int M = 16; int nextJob[M]; CheckMax inProgress[M]; if (!count) - return; + return 0; int started = std::min(M, count); for (int i = 0; i < started; i++) { @@ -451,8 +453,9 @@ public: int prevJob = started - 1; int job = 0; + int iters = 0; // vtune: 340 parts - while (true) { + for (;; ++iters) { if (inProgress[job].advance()) { if (started == count) { if (prevJob == job) @@ -468,6 +471,7 @@ public: prevJob = job; job = nextJob[job]; } + return iters; } void find(const StringRef *values, Finger *results, int *temp, int count) { @@ -702,15 +706,27 @@ private: }; }; +struct ReadContext { + int64_t commits_accum = 0; + int64_t conflicts_accum = 0; + int64_t too_olds_accum = 0; + int64_t check_bytes_accum = 0; +}; + struct __attribute__((visibility("hidden"))) ConflictSet::Impl { Impl(int64_t oldestVersion) : oldestVersion(oldestVersion), newestVersion(oldestVersion), - skipList(oldestVersion) {} + skipList(oldestVersion) { + metrics = initMetrics(metricsList, metricsCount); + } + ~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); } void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results, - int count) const { + int count) { + ReadContext tls; Arena arena; auto *ranges = new (arena) ReadConflictRange[count]; for (int i = 0; i < count; ++i) { + tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len; ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)}; ranges[i].end = reads[i].end.len > 0 ? StringRef{reads[i].end.p, size_t(reads[i].end.len)} @@ -718,13 +734,22 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { ranges[i].version = reads[i].readVersion; results[i] = ConflictSet::Commit; } - skipList.detectConflicts(ranges, count, results); + int iters = skipList.detectConflicts(ranges, count, results); for (int i = 0; i < count; ++i) { if (reads[i].readVersion < oldestVersion || reads[i].readVersion < newestVersion - 2e9) { results[i] = TooOld; } + tls.commits_accum += results[i] == Commit; + tls.conflicts_accum += results[i] == Conflict; + tls.too_olds_accum += results[i] == TooOld; } + range_read_iterations_total.add(iters); + range_read_total.add(count); + commits_total.add(tls.commits_accum); + conflicts_total.add(tls.conflicts_accum); + too_olds_total.add(tls.too_olds_accum); + check_bytes_total.add(tls.check_bytes_accum); } void addWrites(const ConflictSet::WriteRange *writes, int count, @@ -788,6 +813,9 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { } void setOldestVersion(int64_t oldestVersion) { + // This isn't 100% accurate. It overcounts if you hit the end + gc_iterations_total.add(keyUpdates); + assert(oldestVersion >= this->oldestVersion); this->oldestVersion = oldestVersion; SkipList::Finger finger; @@ -802,6 +830,54 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { int64_t totalBytes = 0; + MetricsV1 *metrics; + int metricsCount = 0; + Metric *metricsList = nullptr; + +#define GAUGE(name, help) \ + Gauge name { metricsList, metricsCount, #name, help } +#define COUNTER(name, help) \ + Counter name { metricsList, metricsCount, #name, help } + // ==================== METRICS DEFINITIONS ==================== + COUNTER(range_read_total, "Total number of range reads checked"); + COUNTER(range_read_iterations_total, + "Total number of iterations of the main loops for range read checks"); + COUNTER(commits_total, + "Total number of checks where the result is \"commit\""); + COUNTER(conflicts_total, + "Total number of checks where the result is \"conflict\""); + COUNTER(too_olds_total, + "Total number of checks where the result is \"too old\""); + COUNTER(check_bytes_total, "Total number of key bytes checked"); + GAUGE(memory_bytes, "Total number of bytes in use"); + COUNTER(nodes_allocated_total, + "The total number of physical tree nodes allocated"); + COUNTER(nodes_released_total, + "The total number of physical tree nodes released"); + COUNTER(insert_iterations_total, + "The total number of iterations of the main loop for insertion. " + "Includes searches where the entry already existed, and so insertion " + "did not take place"); + COUNTER(entries_inserted_total, + "The total number of entries inserted in the tree"); + COUNTER(entries_erased_total, + "The total number of entries erased from the tree"); + COUNTER( + gc_iterations_total, + "The total number of iterations of the main loop for garbage collection"); + COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites"); + GAUGE(oldest_version, + "The lowest version that doesn't result in \"TooOld\" for checks"); + GAUGE(newest_version, "The version of the most recent call to addWrites"); + // ==================== END METRICS DEFINITIONS ==================== +#undef GAUGE +#undef COUNTER + + void getMetricsV1(MetricsV1 **metrics, int *count) { + *metrics = this->metrics; + *count = metricsCount; + } + private: int64_t keyUpdates = 0; Arena removalArena; @@ -824,6 +900,7 @@ void internal_addWrites(ConflictSet::Impl *impl, mallocBytesDelta = 0; impl->addWrites(writes, count, writeVersion); impl->totalBytes += mallocBytesDelta; + impl->memory_bytes.set(impl->totalBytes); #if SHOW_MEMORY if (impl->totalBytes != mallocBytes) { abort(); @@ -835,6 +912,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) { mallocBytesDelta = 0; impl->setOldestVersion(oldestVersion); impl->totalBytes += mallocBytesDelta; + impl->memory_bytes.set(impl->totalBytes); #if SHOW_MEMORY if (impl->totalBytes != mallocBytes) { abort(); @@ -858,12 +936,11 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; } void internal_getMetricsV1(ConflictSet::Impl *impl, ConflictSet::MetricsV1 **metrics, int *count) { - *metrics = nullptr; - *count = 0; + return impl->getMetricsV1(metrics, count); } double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) { - return 0; + return ((Metric *)metric->p)->value.load(std::memory_order_relaxed); } void ConflictSet::check(const ReadRange *reads, Result *results,