Compare commits
2 Commits
ed274c24d7
...
ffd1dfe74d
Author | SHA1 | Date | |
---|---|---|---|
ffd1dfe74d | |||
c39af9117f |
143
ConflictSet.cpp
143
ConflictSet.cpp
@@ -587,8 +587,6 @@ struct Counter : private Metric {
|
|||||||
};
|
};
|
||||||
|
|
||||||
template <class T> struct BoundedFreeListAllocator {
|
template <class T> struct BoundedFreeListAllocator {
|
||||||
Counter *allocated_total = nullptr;
|
|
||||||
Counter *released_total = nullptr;
|
|
||||||
|
|
||||||
static_assert(sizeof(T) >= sizeof(void *));
|
static_assert(sizeof(T) >= sizeof(void *));
|
||||||
static_assert(std::derived_from<T, Node>);
|
static_assert(std::derived_from<T, Node>);
|
||||||
@@ -623,7 +621,6 @@ template <class T> struct BoundedFreeListAllocator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
T *allocate(int partialKeyCapacity) {
|
T *allocate(int partialKeyCapacity) {
|
||||||
allocated_total->add(1);
|
|
||||||
T *result = allocate_helper(partialKeyCapacity);
|
T *result = allocate_helper(partialKeyCapacity);
|
||||||
if constexpr (!std::is_same_v<T, Node0>) {
|
if constexpr (!std::is_same_v<T, Node0>) {
|
||||||
memset(result->children, 0, sizeof(result->children));
|
memset(result->children, 0, sizeof(result->children));
|
||||||
@@ -640,7 +637,6 @@ template <class T> struct BoundedFreeListAllocator {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void release(T *p) {
|
void release(T *p) {
|
||||||
released_total->add(1);
|
|
||||||
if (freeListBytes >= kFreeListMaxMemory) {
|
if (freeListBytes >= kFreeListMaxMemory) {
|
||||||
removeNode(p);
|
removeNode(p);
|
||||||
return safe_free(p, sizeof(T) + p->partialKeyCapacity);
|
return safe_free(p, sizeof(T) + p->partialKeyCapacity);
|
||||||
@@ -1715,23 +1711,31 @@ struct SearchStepWise {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
thread_local double accume_point_read = 0;
|
thread_local double point_read_accum = 0;
|
||||||
thread_local double accume_prefix_read = 0;
|
thread_local double prefix_read_accum = 0;
|
||||||
thread_local double accume_range_read = 0;
|
thread_local double range_read_accum = 0;
|
||||||
thread_local double accume_max_between = 0;
|
thread_local double point_read_short_circuit_accum = 0;
|
||||||
|
thread_local double prefix_read_short_circuit_accum = 0;
|
||||||
|
thread_local double range_read_short_circuit_accum = 0;
|
||||||
|
thread_local double point_read_iterations_accum = 0;
|
||||||
|
thread_local double prefix_read_iterations_accum = 0;
|
||||||
|
thread_local double range_read_iterations_accum = 0;
|
||||||
|
thread_local double range_read_node_scan_accum = 0;
|
||||||
|
|
||||||
// Logically this is the same as performing firstGeq and then checking against
|
// Logically this is the same as performing firstGeq and then checking against
|
||||||
// point or range version according to cmp, but this version short circuits as
|
// point or range version according to cmp, but this version short circuits as
|
||||||
// soon as it can prove that there's no conflict.
|
// soon as it can prove that there's no conflict.
|
||||||
bool checkPointRead(Node *n, const std::span<const uint8_t> key,
|
bool checkPointRead(Node *n, const std::span<const uint8_t> key,
|
||||||
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
||||||
++accume_point_read;
|
++point_read_accum;
|
||||||
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
||||||
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
|
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
|
||||||
#endif
|
#endif
|
||||||
auto remaining = key;
|
auto remaining = key;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
++point_read_iterations_accum;
|
||||||
if (maxVersion(n, impl) <= readVersion) {
|
if (maxVersion(n, impl) <= readVersion) {
|
||||||
|
++point_read_short_circuit_accum;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (remaining.size() == 0) {
|
if (remaining.size() == 0) {
|
||||||
@@ -1803,18 +1807,20 @@ downLeftSpine:
|
|||||||
// short circuits as soon as it can prove that there's no conflict.
|
// short circuits as soon as it can prove that there's no conflict.
|
||||||
bool checkPrefixRead(Node *n, const std::span<const uint8_t> key,
|
bool checkPrefixRead(Node *n, const std::span<const uint8_t> key,
|
||||||
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
||||||
++accume_prefix_read;
|
++prefix_read_accum;
|
||||||
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
||||||
fprintf(stderr, "Check prefix read: %s\n", printable(key).c_str());
|
fprintf(stderr, "Check prefix read: %s\n", printable(key).c_str());
|
||||||
#endif
|
#endif
|
||||||
auto remaining = key;
|
auto remaining = key;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
++prefix_read_iterations_accum;
|
||||||
auto m = maxVersion(n, impl);
|
auto m = maxVersion(n, impl);
|
||||||
if (remaining.size() == 0) {
|
if (remaining.size() == 0) {
|
||||||
return m <= readVersion;
|
return m <= readVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (m <= readVersion) {
|
if (m <= readVersion) {
|
||||||
|
++prefix_read_short_circuit_accum;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -2045,7 +2051,7 @@ scan16(const InternalVersionT *vs, int begin, int end,
|
|||||||
template <bool kAVX512>
|
template <bool kAVX512>
|
||||||
bool checkMaxBetweenExclusive(Node *n, int begin, int end,
|
bool checkMaxBetweenExclusive(Node *n, int begin, int end,
|
||||||
InternalVersionT readVersion) {
|
InternalVersionT readVersion) {
|
||||||
++accume_max_between;
|
++range_read_node_scan_accum;
|
||||||
assume(-1 <= begin);
|
assume(-1 <= begin);
|
||||||
assume(begin <= 256);
|
assume(begin <= 256);
|
||||||
assume(-1 <= end);
|
assume(-1 <= end);
|
||||||
@@ -2643,7 +2649,7 @@ template <bool kAVX512>
|
|||||||
bool checkRangeReadImpl(Node *n, std::span<const uint8_t> begin,
|
bool checkRangeReadImpl(Node *n, std::span<const uint8_t> begin,
|
||||||
std::span<const uint8_t> end,
|
std::span<const uint8_t> end,
|
||||||
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
InternalVersionT readVersion, ConflictSet::Impl *impl) {
|
||||||
++accume_range_read;
|
++range_read_accum;
|
||||||
int lcp = longestCommonPrefix(begin.data(), end.data(),
|
int lcp = longestCommonPrefix(begin.data(), end.data(),
|
||||||
std::min(begin.size(), end.size()));
|
std::min(begin.size(), end.size()));
|
||||||
if (lcp == int(begin.size()) && end.size() == begin.size() + 1 &&
|
if (lcp == int(begin.size()) && end.size() == begin.size() + 1 &&
|
||||||
@@ -2658,10 +2664,12 @@ bool checkRangeReadImpl(Node *n, std::span<const uint8_t> begin,
|
|||||||
SearchStepWise search{n, begin.subspan(0, lcp)};
|
SearchStepWise search{n, begin.subspan(0, lcp)};
|
||||||
Arena arena;
|
Arena arena;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
|
++range_read_iterations_accum;
|
||||||
assert(getSearchPath(arena, search.n) <=>
|
assert(getSearchPath(arena, search.n) <=>
|
||||||
begin.subspan(0, lcp - search.remaining.size()) ==
|
begin.subspan(0, lcp - search.remaining.size()) ==
|
||||||
0);
|
0);
|
||||||
if (maxVersion(search.n, impl) <= readVersion) {
|
if (maxVersion(search.n, impl) <= readVersion) {
|
||||||
|
++range_read_short_circuit_accum;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
if (search.step()) {
|
if (search.step()) {
|
||||||
@@ -2700,20 +2708,24 @@ bool checkRangeReadImpl(Node *n, std::span<const uint8_t> begin,
|
|||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
bool leftDone = checkRangeLeftSide.step();
|
bool leftDone = checkRangeLeftSide.step();
|
||||||
|
++range_read_iterations_accum;
|
||||||
bool rightDone = checkRangeRightSide.step();
|
bool rightDone = checkRangeRightSide.step();
|
||||||
|
++range_read_iterations_accum;
|
||||||
if (!leftDone && !rightDone) {
|
if (!leftDone && !rightDone) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (leftDone && rightDone) {
|
if (leftDone && rightDone) {
|
||||||
break;
|
break;
|
||||||
} else if (leftDone) {
|
} else if (leftDone) {
|
||||||
while (!checkRangeRightSide.step())
|
while (!checkRangeRightSide.step()) {
|
||||||
;
|
++range_read_iterations_accum;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
assert(rightDone);
|
assert(rightDone);
|
||||||
while (!checkRangeLeftSide.step())
|
while (!checkRangeLeftSide.step()) {
|
||||||
;
|
++range_read_iterations_accum;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -3061,10 +3073,9 @@ Node *firstGeqPhysical(Node *n, const std::span<const uint8_t> key) {
|
|||||||
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||||
|
|
||||||
void check(const ReadRange *reads, Result *result, int count) {
|
void check(const ReadRange *reads, Result *result, int count) {
|
||||||
assert(accume_point_read == 0);
|
int commits_accum = 0;
|
||||||
assert(accume_prefix_read == 0);
|
int conflicts_accum = 0;
|
||||||
assert(accume_range_read == 0);
|
int too_olds_accum = 0;
|
||||||
assert(accume_max_between == 0);
|
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
const auto &r = reads[i];
|
const auto &r = reads[i];
|
||||||
auto begin = std::span<const uint8_t>(r.begin.p, r.begin.len);
|
auto begin = std::span<const uint8_t>(r.begin.p, r.begin.len);
|
||||||
@@ -3080,11 +3091,30 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
InternalVersionT(reads[i].readVersion), this))
|
InternalVersionT(reads[i].readVersion), this))
|
||||||
? Commit
|
? Commit
|
||||||
: Conflict;
|
: Conflict;
|
||||||
|
commits_accum += result[i] == Commit;
|
||||||
|
conflicts_accum += result[i] == Conflict;
|
||||||
|
too_olds_accum += result[i] == TooOld;
|
||||||
}
|
}
|
||||||
read_point_total.add(std::exchange(accume_point_read, 0));
|
point_read_total.add(std::exchange(point_read_accum, 0));
|
||||||
read_prefix_total.add(std::exchange(accume_prefix_read, 0));
|
prefix_read_total.add(std::exchange(prefix_read_accum, 0));
|
||||||
read_range_total.add(std::exchange(accume_range_read, 0));
|
range_read_total.add(std::exchange(range_read_accum, 0));
|
||||||
read_range_node_total.add(std::exchange(accume_max_between, 0));
|
range_read_node_scan_total.add(
|
||||||
|
std::exchange(range_read_node_scan_accum, 0));
|
||||||
|
point_read_short_circuit_total.add(
|
||||||
|
std::exchange(point_read_short_circuit_accum, 0));
|
||||||
|
prefix_read_short_circuit_total.add(
|
||||||
|
std::exchange(prefix_read_short_circuit_accum, 0));
|
||||||
|
range_read_short_circuit_total.add(
|
||||||
|
std::exchange(range_read_short_circuit_accum, 0));
|
||||||
|
point_read_iterations_total.add(
|
||||||
|
std::exchange(point_read_iterations_accum, 0));
|
||||||
|
prefix_read_iterations_total.add(
|
||||||
|
std::exchange(prefix_read_iterations_accum, 0));
|
||||||
|
range_read_iterations_total.add(
|
||||||
|
std::exchange(range_read_iterations_accum, 0));
|
||||||
|
commits_total.add(commits_accum);
|
||||||
|
conflicts_total.add(conflicts_accum);
|
||||||
|
too_olds_total.add(too_olds_accum);
|
||||||
}
|
}
|
||||||
|
|
||||||
void addWrites(const WriteRange *writes, int count, int64_t writeVersion) {
|
void addWrites(const WriteRange *writes, int count, int64_t writeVersion) {
|
||||||
@@ -3202,16 +3232,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
|
|
||||||
allocators.~NodeAllocators();
|
allocators.~NodeAllocators();
|
||||||
new (&allocators) NodeAllocators();
|
new (&allocators) NodeAllocators();
|
||||||
allocators.node0.allocated_total = &node0_allocated_total;
|
|
||||||
allocators.node3.allocated_total = &node3_allocated_total;
|
|
||||||
allocators.node16.allocated_total = &node16_allocated_total;
|
|
||||||
allocators.node48.allocated_total = &node48_allocated_total;
|
|
||||||
allocators.node256.allocated_total = &node256_allocated_total;
|
|
||||||
allocators.node0.released_total = &node0_released_total;
|
|
||||||
allocators.node3.released_total = &node3_released_total;
|
|
||||||
allocators.node16.released_total = &node16_released_total;
|
|
||||||
allocators.node48.released_total = &node48_released_total;
|
|
||||||
allocators.node256.released_total = &node256_released_total;
|
|
||||||
|
|
||||||
removalKeyArena = Arena{};
|
removalKeyArena = Arena{};
|
||||||
removalKey = {};
|
removalKey = {};
|
||||||
@@ -3280,33 +3300,34 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
#define COUNTER(name, help) \
|
#define COUNTER(name, help) \
|
||||||
Counter name { this, #name, help }
|
Counter name { this, #name, help }
|
||||||
// ==================== METRICS DEFINITIONS ====================
|
// ==================== METRICS DEFINITIONS ====================
|
||||||
COUNTER(node0_allocated_total,
|
GAUGE(memory_bytes, "Total number of bytes in use");
|
||||||
"Total number of nodes of type \"Node0\" that have been allocated");
|
COUNTER(point_read_total, "Total number of point reads checked");
|
||||||
COUNTER(node3_allocated_total,
|
COUNTER(point_read_short_circuit_total,
|
||||||
"Total number of nodes of type \"Node3\" that have been allocated");
|
"Total number of point reads that did not require a full search to "
|
||||||
COUNTER(node16_allocated_total,
|
"check");
|
||||||
"Total number of nodes of type \"Node16\" that have been allocated");
|
COUNTER(point_read_iterations_total,
|
||||||
COUNTER(node48_allocated_total,
|
"Total number of iterations of the main loop for point read checks");
|
||||||
"Total number of nodes of type \"Node48\" that have been allocated");
|
COUNTER(prefix_read_total, "Total number of prefix reads checked");
|
||||||
COUNTER(node256_allocated_total,
|
COUNTER(prefix_read_short_circuit_total,
|
||||||
"Total number of nodes of type \"Node256\" that have been allocated");
|
"Total number of prefix reads that did not require a full search to "
|
||||||
COUNTER(node0_released_total,
|
"check");
|
||||||
"Total number of nodes of type \"Node0\" that have been released");
|
COUNTER(prefix_read_iterations_total,
|
||||||
COUNTER(node3_released_total,
|
"Total number of iterations of the main loop for prefix read checks");
|
||||||
"Total number of nodes of type \"Node3\" that have been released");
|
COUNTER(range_read_total, "Total number of range reads checked");
|
||||||
COUNTER(node16_released_total,
|
COUNTER(range_read_short_circuit_total,
|
||||||
"Total number of nodes of type \"Node16\" that have been released");
|
"Total number of range reads that did not require a full search to "
|
||||||
COUNTER(node48_released_total,
|
"check");
|
||||||
"Total number of nodes of type \"Node48\" that have been released");
|
COUNTER(range_read_iterations_total,
|
||||||
COUNTER(node256_released_total,
|
"Total number of iterations of the main loops for range read checks");
|
||||||
"Total number of nodes of type \"Node256\" that have been released");
|
COUNTER(range_read_node_scan_total,
|
||||||
GAUGE(memory_bytes, "Total number of bytes in use.");
|
"Total number of scans of individual nodes while "
|
||||||
COUNTER(read_point_total, "Total number of point reads checked.");
|
"checking a range read");
|
||||||
COUNTER(read_prefix_total, "Total number of prefix reads checked.");
|
COUNTER(commits_total,
|
||||||
COUNTER(read_range_total, "Total number of range reads checked.");
|
"Total number of checks where the result is \"commit\"");
|
||||||
COUNTER(read_range_node_total,
|
COUNTER(conflicts_total,
|
||||||
"Total number of range checks of individual nodes while "
|
"Total number of checks where the result is \"conflict\"");
|
||||||
"checking a range read.");
|
COUNTER(too_olds_total,
|
||||||
|
"Total number of checks where the result is \"too old\"");
|
||||||
// ==================== END METRICS DEFINITIONS ====================
|
// ==================== END METRICS DEFINITIONS ====================
|
||||||
#undef GAUGE
|
#undef GAUGE
|
||||||
#undef COUNTER
|
#undef COUNTER
|
||||||
|
Reference in New Issue
Block a user