11 Commits

Author SHA1 Message Date
56893f9702 Update stale comments
All checks were successful
Tests / Clang total: 2879, passed: 2879
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / 64 bit versions total: 2879, passed: 2879
Tests / Debug total: 2877, passed: 2877
Tests / SIMD fallback total: 2879, passed: 2879
Tests / Release [gcc] total: 2879, passed: 2879
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [gcc,aarch64] total: 2146, passed: 2146
Tests / Coverage total: 2163, passed: 2163
Code Coverage #### Project Overview No changes detected, that affect the code coverage. * Line Coverage: 99.23% (1803/1817) * Branch Coverage: 68.36% (1426/2086) * Complexity Density: 0.00 * Lines of Code: 1817 #### Quality Gates Summary Output truncated.
weaselab/conflict-set/pipeline/head This commit looks good
2024-09-05 10:06:02 -07:00
e2234be10f Update README.md 2024-09-04 13:08:20 -07:00
ce853680f2 Finish checkRangeRightSide conversion 2024-09-04 12:36:43 -07:00
5c39c1d64f CheckRangeRightSide conversion WIP 2024-09-04 12:25:16 -07:00
55b73c8ddb Convert CheckRangeRightSide WIP 2024-09-04 12:22:36 -07:00
b9503f8258 Converting CheckRangeRightSide WIP 2024-09-04 12:19:53 -07:00
c4c4531bd3 Simplify 2024-09-04 12:12:54 -07:00
2037d37c66 checkRangeLeftSide 2024-09-04 12:11:16 -07:00
6fe6a244af Get CheckRangeLeftSide closer, WIP 2024-09-04 12:08:16 -07:00
8a4b370e2a Make CheckRange{Left,Right}Side into normal functions, WIP 2024-09-04 12:04:58 -07:00
394f09f9fb WIP adding metrics to skiplist 2024-08-30 16:06:43 -07:00
6 changed files with 233 additions and 242 deletions

View File

@@ -17,9 +17,9 @@ limitations under the License.
#include "ConflictSet.h"
#include "Internal.h"
#include "LongestCommonPrefix.h"
#include "Metrics.h"
#include <algorithm>
#include <atomic>
#include <bit>
#include <cassert>
#include <cstddef>
@@ -578,39 +578,6 @@ static_assert(kBytesPerKey - sizeof(Node0) >= kMinNodeSurplus);
constexpr int64_t kFreeListMaxMemory = 1 << 20;
struct Metric {
Metric *prev;
const char *name;
const char *help;
ConflictSet::MetricsV1::Type type;
std::atomic<int64_t> value;
protected:
Metric(ConflictSet::Impl *impl, const char *name, const char *help,
ConflictSet::MetricsV1::Type type);
};
struct Gauge : private Metric {
Gauge(ConflictSet::Impl *impl, const char *name, const char *help)
: Metric(impl, name, help, ConflictSet::MetricsV1::Gauge) {}
void set(int64_t value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(ConflictSet::Impl *impl, const char *name, const char *help)
: Metric(impl, name, help, ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(int64_t value) {
assert(value >= 0);
static_assert(std::atomic<int64_t>::is_always_lock_free);
this->value.fetch_add(value, std::memory_order_relaxed);
}
};
template <class T> struct BoundedFreeListAllocator {
static_assert(sizeof(T) >= sizeof(void *));
@@ -2543,38 +2510,19 @@ downLeftSpine:
namespace {
// Return true if the max version among all keys that start with key[:prefixLen]
// that are >= key is <= readVersion
struct CheckRangeLeftSide {
CheckRangeLeftSide(Node *n, std::span<const uint8_t> key, int prefixLen,
InternalVersionT readVersion, ReadContext *tls)
: n(n), remaining(key), prefixLen(prefixLen), readVersion(readVersion),
impl(tls->impl), tls(tls) {
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr, "Check range left side from %s for keys starting with %s\n",
printable(key).c_str(),
printable(key.subspan(0, prefixLen)).c_str());
#endif
}
Node *n;
std::span<const uint8_t> remaining;
int prefixLen;
InternalVersionT readVersion;
ConflictSet::Impl *impl;
ReadContext *tls;
bool checkRangeLeftSide(Node *n, std::span<const uint8_t> key, int prefixLen,
InternalVersionT readVersion, ReadContext *tls) {
auto remaining = key;
int searchPathLen = 0;
bool ok;
bool step() {
for (;; ++tls->range_read_iterations_accum) {
if (remaining.size() == 0) {
assert(searchPathLen >= prefixLen);
ok = maxVersion(n) <= readVersion;
return true;
return maxVersion(n) <= readVersion;
}
if (searchPathLen >= prefixLen) {
if (!checkMaxBetweenExclusive(n, remaining[0], 256, readVersion, tls)) {
ok = false;
return true;
return false;
}
}
@@ -2584,18 +2532,16 @@ struct CheckRangeLeftSide {
if (c != nullptr) {
if (searchPathLen < prefixLen) {
n = c;
return downLeftSpine();
goto downLeftSpine;
}
n = c;
ok = maxVersion(n) <= readVersion;
return true;
return maxVersion(n) <= readVersion;
} else {
n = nextSibling(n);
if (n == nullptr) {
ok = true;
return true;
}
return downLeftSpine();
goto downLeftSpine;
}
}
@@ -2611,21 +2557,18 @@ struct CheckRangeLeftSide {
auto c = n->partialKey()[i] <=> remaining[i];
if (c > 0) {
if (searchPathLen < prefixLen) {
return downLeftSpine();
goto downLeftSpine;
}
if (n->entryPresent && n->entry.rangeVersion > readVersion) {
ok = false;
return true;
return false;
}
ok = maxVersion(n) <= readVersion;
return true;
return maxVersion(n) <= readVersion;
} else {
n = nextSibling(n);
if (n == nullptr) {
ok = true;
return true;
}
return downLeftSpine();
goto downLeftSpine;
}
}
if (commonLen == n->partialKeyLen) {
@@ -2634,83 +2577,47 @@ struct CheckRangeLeftSide {
} else if (n->partialKeyLen > int(remaining.size())) {
assert(searchPathLen >= prefixLen);
if (n->entryPresent && n->entry.rangeVersion > readVersion) {
ok = false;
return true;
return false;
}
ok = maxVersion(n) <= readVersion;
return true;
return maxVersion(n) <= readVersion;
}
}
if (maxV <= readVersion) {
ok = true;
return true;
}
return false;
}
bool downLeftSpine() {
for (; !n->entryPresent; n = getFirstChildExists(n)) {
}
ok = n->entry.rangeVersion <= readVersion;
return true;
downLeftSpine:
for (; !n->entryPresent; n = getFirstChildExists(n)) {
}
};
return n->entry.rangeVersion <= readVersion;
}
// Return true if the max version among all keys that start with key[:prefixLen]
// that are < key is <= readVersion
struct CheckRangeRightSide {
CheckRangeRightSide(Node *n, std::span<const uint8_t> key, int prefixLen,
InternalVersionT readVersion, ReadContext *tls)
: n(n), key(key), remaining(key), prefixLen(prefixLen),
readVersion(readVersion), impl(tls->impl), tls(tls) {
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr, "Check range right side to %s for keys starting with %s\n",
printable(key).c_str(),
printable(key.subspan(0, prefixLen)).c_str());
#endif
}
Node *n;
std::span<const uint8_t> key;
std::span<const uint8_t> remaining;
int prefixLen;
InternalVersionT readVersion;
ConflictSet::Impl *impl;
ReadContext *tls;
bool checkRangeRightSide(Node *n, std::span<const uint8_t> key, int prefixLen,
InternalVersionT readVersion, ReadContext *tls) {
auto remaining = key;
int searchPathLen = 0;
bool ok;
bool step() {
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr,
"Search path: %s, searchPathLen: %d, prefixLen: %d, remaining: "
"%s\n",
getSearchPathPrintable(n).c_str(), searchPathLen, prefixLen,
printable(remaining).c_str());
#endif
for (;; ++tls->range_read_iterations_accum) {
assert(searchPathLen <= int(key.size()));
if (remaining.size() == 0) {
return downLeftSpine();
goto downLeftSpine;
}
if (searchPathLen >= prefixLen) {
if (n->entryPresent && n->entry.pointVersion > readVersion) {
ok = false;
return true;
return false;
}
if (!checkMaxBetweenExclusive(n, -1, remaining[0], readVersion, tls)) {
ok = false;
return true;
return false;
}
}
if (searchPathLen > prefixLen && n->entryPresent &&
n->entry.rangeVersion > readVersion) {
ok = false;
return true;
return false;
}
auto *child = getChild(n, remaining[0]);
@@ -2718,9 +2625,9 @@ struct CheckRangeRightSide {
auto c = getChildGeq(n, remaining[0]);
if (c != nullptr) {
n = c;
return downLeftSpine();
goto downLeftSpine;
} else {
return backtrack();
goto backtrack;
}
}
@@ -2736,57 +2643,48 @@ struct CheckRangeRightSide {
++searchPathLen;
auto c = n->partialKey()[i] <=> remaining[i];
if (c > 0) {
return downLeftSpine();
goto downLeftSpine;
} else {
if (searchPathLen > prefixLen && n->entryPresent &&
n->entry.rangeVersion > readVersion) {
ok = false;
return true;
return false;
}
return backtrack();
goto backtrack;
}
}
if (commonLen == n->partialKeyLen) {
// partial key matches
remaining = remaining.subspan(commonLen, remaining.size() - commonLen);
} else if (n->partialKeyLen > int(remaining.size())) {
return downLeftSpine();
}
}
return false;
}
bool backtrack() {
for (;;) {
// searchPathLen > prefixLen implies n is not the root
if (searchPathLen > prefixLen && maxVersion(n) > readVersion) {
ok = false;
return true;
}
if (n->parent == nullptr) {
ok = true;
return true;
}
auto next = getChildGeq(n->parent, n->parentsIndex + 1);
if (next == nullptr) {
searchPathLen -= 1 + n->partialKeyLen;
n = n->parent;
} else {
searchPathLen -= n->partialKeyLen;
n = next;
searchPathLen += n->partialKeyLen;
return downLeftSpine();
goto downLeftSpine;
}
}
}
bool downLeftSpine() {
for (; !n->entryPresent; n = getFirstChildExists(n)) {
backtrack:
for (;;) {
// searchPathLen > prefixLen implies n is not the root
if (searchPathLen > prefixLen && maxVersion(n) > readVersion) {
return false;
}
if (n->parent == nullptr) {
return true;
}
auto next = getChildGeq(n->parent, n->parentsIndex + 1);
if (next == nullptr) {
searchPathLen -= 1 + n->partialKeyLen;
n = n->parent;
} else {
searchPathLen -= n->partialKeyLen;
n = next;
searchPathLen += n->partialKeyLen;
goto downLeftSpine;
}
ok = n->entry.rangeVersion <= readVersion;
return true;
}
};
downLeftSpine:
for (; !n->entryPresent; n = getFirstChildExists(n)) {
}
return n->entry.rangeVersion <= readVersion;
}
} // namespace
bool checkRangeRead(Node *n, std::span<const uint8_t> begin,
@@ -2807,8 +2705,8 @@ bool checkRangeRead(Node *n, std::span<const uint8_t> begin,
auto remaining = begin.subspan(0, lcp);
Arena arena;
// If the common prefix isn't a prefix of any physical entry in the tree, we
// can go to "downLeftSpine"
// Advance down common prefix, but stay on a physical path in the tree
for (;; ++tls->range_read_iterations_accum) {
assert(getSearchPath(arena, n) <=>
begin.subspan(0, lcp - remaining.size()) ==
@@ -2849,47 +2747,17 @@ bool checkRangeRead(Node *n, std::span<const uint8_t> begin,
lcp -= consumed;
if (lcp == int(begin.size())) {
CheckRangeRightSide checkRangeRightSide{n, end, lcp, readVersion, tls};
while (!checkRangeRightSide.step())
;
return checkRangeRightSide.ok;
return checkRangeRightSide(n, end, lcp, readVersion, tls);
}
if (!checkRangeStartsWith(n, begin.subspan(0, lcp), begin[lcp], end[lcp],
readVersion, tls)) {
return false;
}
// This makes it safe to check maxVersion within CheckRangeLeftSide. If this
// This makes it safe to check maxVersion within checkRangeLeftSide. If this
// were false, then we would have returned above since lcp == begin.size().
assert(!(n->parent == nullptr && begin.size() == 0));
CheckRangeLeftSide checkRangeLeftSide{n, begin, lcp + 1, readVersion, tls};
CheckRangeRightSide checkRangeRightSide{n, end, lcp + 1, readVersion, tls};
for (;;) {
bool leftDone = checkRangeLeftSide.step();
bool rightDone = checkRangeRightSide.step();
if (!leftDone && !rightDone) {
tls->range_read_iterations_accum += 2;
continue;
}
if (leftDone && rightDone) {
break;
} else if (leftDone) {
while (!checkRangeRightSide.step()) {
++tls->range_read_iterations_accum;
}
break;
} else {
assert(rightDone);
while (!checkRangeLeftSide.step()) {
++tls->range_read_iterations_accum;
}
}
break;
}
return checkRangeLeftSide.ok && checkRangeRightSide.ok;
return checkRangeStartsWith(n, begin.subspan(0, lcp), begin[lcp], end[lcp],
readVersion, tls) &&
checkRangeLeftSide(n, begin, lcp + 1, readVersion, tls) &&
checkRangeRightSide(n, end, lcp + 1, readVersion, tls);
}
#ifdef __x86_64__
@@ -3242,7 +3110,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
0) *
2;
memory_bytes.set(totalBytes);
point_writes_total.add(tls.accum.point_writes);
range_writes_total.add(tls.accum.range_writes);
nodes_allocated_total.add(tls.accum.nodes_allocated);
@@ -3331,7 +3198,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
#endif
keyUpdates = gcScanStep(keyUpdates);
memory_bytes.set(totalBytes);
nodes_allocated_total.add(std::exchange(tls.accum.nodes_allocated, 0));
nodes_released_total.add(std::exchange(tls.accum.nodes_released, 0));
entries_inserted_total.add(std::exchange(tls.accum.entries_inserted, 0));
@@ -3379,7 +3245,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
explicit Impl(int64_t oldestVersion) {
assert(oldestVersion >= 0);
init(oldestVersion);
initMetrics();
metrics = initMetrics(metricsList, metricsCount);
}
~Impl() {
eraseTree(root, &tls);
@@ -3402,23 +3268,12 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
MetricsV1 *metrics;
int metricsCount = 0;
void initMetrics() {
metrics = (MetricsV1 *)safe_malloc(metricsCount * sizeof(metrics[0]));
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricList); i >= 0;
--i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].p = m;
metrics[i].type = m->type;
}
}
Metric *metricList = nullptr;
Metric *metricsList = nullptr;
#define GAUGE(name, help) \
Gauge name { this, #name, help }
Gauge name { metricsList, metricsCount, #name, help }
#define COUNTER(name, help) \
Counter name { this, #name, help }
Counter name { metricsList, metricsCount, #name, help }
// ==================== METRICS DEFINITIONS ====================
COUNTER(point_read_total, "Total number of point reads checked");
COUNTER(point_read_short_circuit_total,
@@ -3484,13 +3339,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
}
};
Metric::Metric(ConflictSet::Impl *impl, const char *name, const char *help,
ConflictSet::MetricsV1::Type type)
: prev(std::exchange(impl->metricList, this)), name(name), help(help),
type(type), value(0) {
++impl->metricsCount;
}
Node *&getInTree(Node *n, ConflictSet::Impl *impl) {
return n->parent == nullptr ? impl->root
: getChildExists(n->parent, n->parentsIndex);
@@ -3509,6 +3357,7 @@ void internal_addWrites(ConflictSet::Impl *impl,
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -3520,6 +3369,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();

2
Jenkinsfile vendored
View File

@@ -129,7 +129,7 @@ pipeline {
}
steps {
script {
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h"
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h"
}
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
sh """

64
Metrics.h Normal file
View File

@@ -0,0 +1,64 @@
#pragma once
#include "ConflictSet.h"
#include "Internal.h"
#include <assert.h>
#include <atomic>
#include <tuple>
struct Metric {
Metric *prev;
const char *name;
const char *help;
weaselab::ConflictSet::MetricsV1::Type type;
std::atomic<int64_t> value;
protected:
Metric(Metric *&metricList, int &metricsCount, const char *name,
const char *help, weaselab::ConflictSet::MetricsV1::Type type)
: prev(std::exchange(metricList, this)), name(name), help(help),
type(type), value(0) {
++metricsCount;
}
};
struct Gauge : private Metric {
Gauge(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Gauge) {}
void set(int64_t value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(int64_t value) {
assert(value >= 0);
static_assert(std::atomic<int64_t>::is_always_lock_free);
this->value.fetch_add(value, std::memory_order_relaxed);
}
};
inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList,
int metricsCount) {
weaselab::ConflictSet::MetricsV1 *metrics =
(weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount *
sizeof(metrics[0]));
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0;
--i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].p = m;
metrics[i].type = m->type;
}
return metrics;
}

View File

@@ -24,15 +24,15 @@ Hardware for all benchmarks is an AMD Ryzen 9 7900 with (2x32GB) 5600MT/s CL28-3
| ns/op | op/s | err% | ins/op | cyc/op | IPC | bra/op | miss% | total | benchmark
|--------------------:|--------------------:|--------:|----------------:|----------------:|-------:|---------------:|--------:|----------:|:----------
| 11.18 | 89,455,125.34 | 0.6% | 185.37 | 57.08 | 3.248 | 41.51 | 0.4% | 0.01 | `point reads`
| 14.53 | 68,800,688.89 | 0.4% | 282.41 | 74.80 | 3.776 | 55.06 | 0.3% | 0.01 | `prefix reads`
| 36.54 | 27,367,576.87 | 0.2% | 798.06 | 188.90 | 4.225 | 141.69 | 0.2% | 0.01 | `range reads`
| 16.69 | 59,912,106.02 | 0.6% | 314.57 | 86.29 | 3.645 | 39.84 | 0.4% | 0.01 | `point writes`
| 30.09 | 33,235,744.07 | 0.5% | 591.33 | 155.92 | 3.793 | 82.69 | 0.2% | 0.01 | `prefix writes`
| 35.77 | 27,956,388.03 | 1.4% | 682.25 | 187.63 | 3.636 | 96.12 | 0.1% | 0.01 | `range writes`
| 74.04 | 13,505,408.41 | 2.7% | 1,448.95 | 392.10 | 3.695 | 260.53 | 0.1% | 0.01 | `monotonic increasing point writes`
| 330,984.50 | 3,021.29 | 1.9% | 3,994,153.50 | 1,667,309.00 | 2.396 | 806,019.50 | 0.0% | 0.01 | `worst case for radix tree`
| 92.46 | 10,814,961.65 | 0.5% | 1,800.00 | 463.41 | 3.884 | 297.00 | 0.0% | 0.01 | `create and destroy`
| 12.88 | 77,653,350.77 | 0.5% | 185.37 | 64.45 | 2.876 | 41.51 | 0.4% | 0.01 | `point reads`
| 14.67 | 68,179,354.49 | 0.1% | 271.44 | 73.40 | 3.698 | 53.70 | 0.3% | 0.01 | `prefix reads`
| 34.84 | 28,701,444.36 | 0.3% | 715.74 | 175.27 | 4.084 | 127.30 | 0.2% | 0.01 | `range reads`
| 17.12 | 58,422,988.28 | 0.2% | 314.30 | 86.11 | 3.650 | 39.82 | 0.4% | 0.01 | `point writes`
| 31.42 | 31,830,804.65 | 0.1% | 591.06 | 158.07 | 3.739 | 82.67 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 2.2% | 681.98 | 188.95 | 3.609 | 96.10 | 0.1% | 0.01 | `range writes`
| 76.72 | 13,035,140.63 | 2.3% | 1,421.28 | 387.17 | 3.671 | 257.76 | 0.1% | 0.01 | `monotonic increasing point writes`
| 297,452.00 | 3,361.89 | 0.9% | 3,508,083.00 | 1,500,834.67 | 2.337 | 727,525.33 | 0.1% | 0.01 | `worst case for radix tree`
| 87.70 | 11,402,490.60 | 1.0% | 1,795.00 | 442.09 | 4.060 | 297.00 | 0.0% | 0.01 | `create and destroy`
# "Real data" test

View File

@@ -21,7 +21,7 @@
std::atomic<int64_t> transactions;
constexpr int kBaseSearchDepth = 32;
constexpr int kBaseSearchDepth = 115;
constexpr int kWindowSize = 10000000;
std::string numToKey(int64_t num) {

View File

@@ -22,6 +22,7 @@
#include "ConflictSet.h"
#include "Internal.h"
#include "Metrics.h"
#include <algorithm>
#include <span>
@@ -434,13 +435,14 @@ public:
return result;
}
void detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
// Return number of iterations of main loop
int detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
const int M = 16;
int nextJob[M];
CheckMax inProgress[M];
if (!count)
return;
return 0;
int started = std::min(M, count);
for (int i = 0; i < started; i++) {
@@ -451,8 +453,9 @@ public:
int prevJob = started - 1;
int job = 0;
int iters = 0;
// vtune: 340 parts
while (true) {
for (;; ++iters) {
if (inProgress[job].advance()) {
if (started == count) {
if (prevJob == job)
@@ -468,6 +471,7 @@ public:
prevJob = job;
job = nextJob[job];
}
return iters;
}
void find(const StringRef *values, Finger *results, int *temp, int count) {
@@ -702,15 +706,27 @@ private:
};
};
struct ReadContext {
int64_t commits_accum = 0;
int64_t conflicts_accum = 0;
int64_t too_olds_accum = 0;
int64_t check_bytes_accum = 0;
};
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
Impl(int64_t oldestVersion)
: oldestVersion(oldestVersion), newestVersion(oldestVersion),
skipList(oldestVersion) {}
skipList(oldestVersion) {
metrics = initMetrics(metricsList, metricsCount);
}
~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); }
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
int count) const {
int count) {
ReadContext tls;
Arena arena;
auto *ranges = new (arena) ReadConflictRange[count];
for (int i = 0; i < count; ++i) {
tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len;
ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)};
ranges[i].end = reads[i].end.len > 0
? StringRef{reads[i].end.p, size_t(reads[i].end.len)}
@@ -718,13 +734,22 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
ranges[i].version = reads[i].readVersion;
results[i] = ConflictSet::Commit;
}
skipList.detectConflicts(ranges, count, results);
int iters = skipList.detectConflicts(ranges, count, results);
for (int i = 0; i < count; ++i) {
if (reads[i].readVersion < oldestVersion ||
reads[i].readVersion < newestVersion - 2e9) {
results[i] = TooOld;
}
tls.commits_accum += results[i] == Commit;
tls.conflicts_accum += results[i] == Conflict;
tls.too_olds_accum += results[i] == TooOld;
}
range_read_iterations_total.add(iters);
range_read_total.add(count);
commits_total.add(tls.commits_accum);
conflicts_total.add(tls.conflicts_accum);
too_olds_total.add(tls.too_olds_accum);
check_bytes_total.add(tls.check_bytes_accum);
}
void addWrites(const ConflictSet::WriteRange *writes, int count,
@@ -788,6 +813,9 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
}
void setOldestVersion(int64_t oldestVersion) {
// This isn't 100% accurate. It overcounts if you hit the end
gc_iterations_total.add(keyUpdates);
assert(oldestVersion >= this->oldestVersion);
this->oldestVersion = oldestVersion;
SkipList::Finger finger;
@@ -802,6 +830,54 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
int64_t totalBytes = 0;
MetricsV1 *metrics;
int metricsCount = 0;
Metric *metricsList = nullptr;
#define GAUGE(name, help) \
Gauge name { metricsList, metricsCount, #name, help }
#define COUNTER(name, help) \
Counter name { metricsList, metricsCount, #name, help }
// ==================== METRICS DEFINITIONS ====================
COUNTER(range_read_total, "Total number of range reads checked");
COUNTER(range_read_iterations_total,
"Total number of iterations of the main loops for range read checks");
COUNTER(commits_total,
"Total number of checks where the result is \"commit\"");
COUNTER(conflicts_total,
"Total number of checks where the result is \"conflict\"");
COUNTER(too_olds_total,
"Total number of checks where the result is \"too old\"");
COUNTER(check_bytes_total, "Total number of key bytes checked");
GAUGE(memory_bytes, "Total number of bytes in use");
COUNTER(nodes_allocated_total,
"The total number of physical tree nodes allocated");
COUNTER(nodes_released_total,
"The total number of physical tree nodes released");
COUNTER(insert_iterations_total,
"The total number of iterations of the main loop for insertion. "
"Includes searches where the entry already existed, and so insertion "
"did not take place");
COUNTER(entries_inserted_total,
"The total number of entries inserted in the tree");
COUNTER(entries_erased_total,
"The total number of entries erased from the tree");
COUNTER(
gc_iterations_total,
"The total number of iterations of the main loop for garbage collection");
COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites");
GAUGE(oldest_version,
"The lowest version that doesn't result in \"TooOld\" for checks");
GAUGE(newest_version, "The version of the most recent call to addWrites");
// ==================== END METRICS DEFINITIONS ====================
#undef GAUGE
#undef COUNTER
void getMetricsV1(MetricsV1 **metrics, int *count) {
*metrics = this->metrics;
*count = metricsCount;
}
private:
int64_t keyUpdates = 0;
Arena removalArena;
@@ -824,6 +900,7 @@ void internal_addWrites(ConflictSet::Impl *impl,
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -835,6 +912,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -858,12 +936,11 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; }
void internal_getMetricsV1(ConflictSet::Impl *impl,
ConflictSet::MetricsV1 **metrics, int *count) {
*metrics = nullptr;
*count = 0;
return impl->getMetricsV1(metrics, count);
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return 0;
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
}
void ConflictSet::check(const ReadRange *reads, Result *results,