29 Commits

Author SHA1 Message Date
5a132799a4 Add cycles_total
Some checks failed
Tests / Clang total: 2840, passed: 2840
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Debug total: 2838, passed: 2838
weaselab/conflict-set/pipeline/head There was a failure building this commit
2024-08-15 15:13:00 -07:00
72469ebb6e Erase along left spine. Not faster 2024-08-15 15:07:44 -07:00
6c79847a42 Add instructions_total for linux 2024-08-15 15:06:53 -07:00
405a2ca161 Fix typo 2024-08-15 13:52:51 -07:00
f93466316a Pass in-tree reference to mergeWithChild 2024-08-15 13:52:06 -07:00
5626cd09d9 Add to corpus 2024-08-15 11:50:04 -07:00
41840220c3 Optimize version handling in mergeWithChild 2024-08-15 11:49:13 -07:00
7ff00e7846 Extract mergeWithChild to function 2024-08-15 11:40:52 -07:00
6242f40d48 Require that eraseBetween leave at least one child or entryPresent 2024-08-15 11:37:36 -07:00
403d70a1d3 Prefer not copying node in eraseBetween
If numChildren + entryPresent is enough, we don't have to copy even if
it would fit in a smaller node.

If we have to copy, we might as well use the smallest acceptable node
type.
2024-08-15 11:33:16 -07:00
9763452713 Separate beginIsPrefix path and simplify slightly 2024-08-15 11:29:15 -07:00
73d0593fca Remove separate prefix write codepath for now 2024-08-14 21:29:43 -07:00
23c2a3e1c6 SIMD for eraseBetween (Node16)
Some checks failed
Tests / Clang total: 2688, passed: 2688
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Debug total: 2686, passed: 2686
weaselab/conflict-set/pipeline/head There was a failure building this commit
2024-08-14 18:12:46 -07:00
a64e792964 Remove unused function 2024-08-14 17:40:04 -07:00
5e362d5330 Add to corpus 2024-08-14 17:37:18 -07:00
cc526cb6ba Call eraseBetween on useAsRoot in addWriteRange 2024-08-14 17:08:55 -07:00
7e49888bec More eraseBetween optimizations 2024-08-14 16:40:29 -07:00
e64ebabced eraseBetween optimizations 2024-08-14 16:13:37 -07:00
1e34951a77 Fix use-of-uninit in eraseBetween (Node256) 2024-08-14 15:25:10 -07:00
baf64520d6 Have eraseBetween take in-tree node by reference 2024-08-14 15:04:11 -07:00
3499626127 Fix potential strict aliasing issues 2024-08-14 15:01:34 -07:00
b7f9084694 destroyTree -> eraseTree. Use freelist 2024-08-14 14:47:22 -07:00
4b82502946 Accept node by ref for eraseBetween again 2024-08-14 14:43:19 -07:00
68bbacb69a Use getInTree in eraseBetween 2024-08-14 14:43:19 -07:00
3078845673 Fix nodes_released accounting 2024-08-14 14:43:19 -07:00
43f6126cc4 Add a missing assert, call to removeNode 2024-08-14 14:43:19 -07:00
b911d87d55 eraseBetween bug fixes 2024-08-14 14:43:19 -07:00
0c65a82b78 Separate codepath for prefix writes
Uses the newly-added eraseBetween
2024-08-14 14:43:19 -07:00
e024cb8291 Track entriesErased in destroyTree 2024-08-14 14:43:19 -07:00
306 changed files with 1145 additions and 967 deletions

View File

@@ -361,7 +361,21 @@ void benchWorstCaseForRadixRangeRead() {
void benchCreateAndDestroy() {
ankerl::nanobench::Bench bench;
bench.run("create and destroy", [&]() { ConflictSet cs{0}; });
bench.run("create and destroy", [&]() {
ConflictSet cs{0};
ConflictSet::WriteRange w;
uint8_t b[9];
b[8] = 0;
for (int64_t i = 0; i < 1000; i += 7) {
auto x = __builtin_bswap64(i);
memcpy(b, &x, 8);
w.begin.p = b;
w.begin.len = 8;
w.end.len = 0;
w.end.p = b;
cs.addWrites(&w, 1, 1);
}
});
}
int main(void) {

View File

@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.18)
project(
conflict-set
VERSION 0.0.14
VERSION 0.0.12
DESCRIPTION
"A data structure for optimistic concurrency control on ranges of bitwise-lexicographically-ordered keys."
HOMEPAGE_URL "https://git.weaselab.dev/weaselab/conflict-set"
@@ -276,15 +276,9 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
find_program(VALGRIND_EXE valgrind)
if(VALGRIND_EXE AND NOT CMAKE_CROSSCOMPILING)
list(LENGTH CORPUS_TESTS len)
math(EXPR last "${len} - 1")
set(partition_size 100)
foreach(i RANGE 0 ${last} ${partition_size})
list(SUBLIST CORPUS_TESTS ${i} ${partition_size} partition)
add_test(NAME conflict_set_blackbox_valgrind_${i}
COMMAND ${VALGRIND_EXE} --error-exitcode=99 --
$<TARGET_FILE:driver> ${partition})
endforeach()
add_test(NAME conflict_set_blackbox_valgrind
COMMAND ${VALGRIND_EXE} --error-exitcode=99 --
$<TARGET_FILE:driver> ${CORPUS_TESTS})
endif()
# api smoke tests

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,7 @@ using namespace weaselab;
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>
#include <callgrind.h>
@@ -747,10 +748,7 @@ struct TestDriver {
fprintf(stderr, "%p Set oldest version: %" PRId64 "\n", this,
oldestVersion);
#endif
CALLGRIND_START_INSTRUMENTATION;
cs.setOldestVersion(oldestVersion);
CALLGRIND_STOP_INSTRUMENTATION;
if constexpr (kEnableAssertions) {
refImpl.setOldestVersion(oldestVersion);
}

13
Jenkinsfile vendored
View File

@@ -48,17 +48,6 @@ pipeline {
recordIssues(tools: [clang()])
}
}
stage('64 bit versions') {
agent {
dockerfile {
args '-v /home/jenkins/ccache:/ccache'
reuseNode true
}
}
steps {
CleanBuildAndTest("-DCMAKE_CXX_FLAGS=-DUSE_64_BIT=1")
}
}
stage('Debug') {
agent {
dockerfile {
@@ -129,7 +118,7 @@ pipeline {
}
steps {
script {
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h"
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h"
}
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
sh """

View File

@@ -1,64 +0,0 @@
#pragma once
#include "ConflictSet.h"
#include "Internal.h"
#include <assert.h>
#include <atomic>
#include <tuple>
struct Metric {
Metric *prev;
const char *name;
const char *help;
weaselab::ConflictSet::MetricsV1::Type type;
std::atomic<int64_t> value;
protected:
Metric(Metric *&metricList, int &metricsCount, const char *name,
const char *help, weaselab::ConflictSet::MetricsV1::Type type)
: prev(std::exchange(metricList, this)), name(name), help(help),
type(type), value(0) {
++metricsCount;
}
};
struct Gauge : private Metric {
Gauge(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Gauge) {}
void set(int64_t value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(int64_t value) {
assert(value >= 0);
static_assert(std::atomic<int64_t>::is_always_lock_free);
this->value.fetch_add(value, std::memory_order_relaxed);
}
};
inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList,
int metricsCount) {
weaselab::ConflictSet::MetricsV1 *metrics =
(weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount *
sizeof(metrics[0]));
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0;
--i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].p = m;
metrics[i].type = m->type;
}
return metrics;
}

View File

@@ -24,15 +24,16 @@ Hardware for all benchmarks is an AMD Ryzen 9 7900 with (2x32GB) 5600MT/s CL28-3
| ns/op | op/s | err% | ins/op | cyc/op | IPC | bra/op | miss% | total | benchmark
|--------------------:|--------------------:|--------:|----------------:|----------------:|-------:|---------------:|--------:|----------:|:----------
| 12.88 | 77,653,350.77 | 0.5% | 185.37 | 64.45 | 2.876 | 41.51 | 0.4% | 0.01 | `point reads`
| 14.67 | 68,179,354.49 | 0.1% | 271.44 | 73.40 | 3.698 | 53.70 | 0.3% | 0.01 | `prefix reads`
| 34.84 | 28,701,444.36 | 0.3% | 715.74 | 175.27 | 4.084 | 127.30 | 0.2% | 0.01 | `range reads`
| 17.12 | 58,422,988.28 | 0.2% | 314.30 | 86.11 | 3.650 | 39.82 | 0.4% | 0.01 | `point writes`
| 31.42 | 31,830,804.65 | 0.1% | 591.06 | 158.07 | 3.739 | 82.67 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 2.2% | 681.98 | 188.95 | 3.609 | 96.10 | 0.1% | 0.01 | `range writes`
| 76.72 | 13,035,140.63 | 2.3% | 1,421.28 | 387.17 | 3.671 | 257.76 | 0.1% | 0.01 | `monotonic increasing point writes`
| 297,452.00 | 3,361.89 | 0.9% | 3,508,083.00 | 1,500,834.67 | 2.337 | 727,525.33 | 0.1% | 0.01 | `worst case for radix tree`
| 87.70 | 11,402,490.60 | 1.0% | 1,795.00 | 442.09 | 4.060 | 297.00 | 0.0% | 0.01 | `create and destroy`
| 10.80 | 92,600,541.52 | 0.6% | 180.38 | 54.49 | 3.310 | 41.51 | 0.4% | 0.01 | `point reads`
| 15.00 | 66,687,691.68 | 0.4% | 278.44 | 76.44 | 3.642 | 55.56 | 0.3% | 0.01 | `prefix reads`
| 36.81 | 27,163,394.61 | 0.4% | 795.06 | 187.91 | 4.231 | 142.67 | 0.2% | 0.01 | `range reads`
| 18.14 | 55,137,674.01 | 1.2% | 338.19 | 92.86 | 3.642 | 42.81 | 0.4% | 0.01 | `point writes`
| 33.19 | 30,127,119.71 | 0.1% | 681.03 | 170.05 | 4.005 | 98.68 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 1.9% | 779.70 | 195.45 | 3.989 | 114.21 | 0.0% | 0.01 | `range writes`
| 74.36 | 13,448,582.47 | 1.9% | 1,425.68 | 389.08 | 3.664 | 258.88 | 0.1% | 0.01 | `monotonic increasing point writes`
| 316,928.00 | 3,155.29 | 1.5% | 3,992,986.00 | 1,699,813.00 | 2.349 | 806,226.50 | 0.0% | 0.01 | `worst case for radix tree`
| 75.26 | 13,286,517.16 | 0.5% | 1,590.01 | 386.67 | 4.112 | 258.00 | 0.0% | 0.01 | `create and destroy`
# "Real data" test
@@ -47,7 +48,7 @@ Check: 4.47891 seconds, 364.05 MB/s, Add: 4.55599 seconds, 123.058 MB/s, Gc rati
## radix tree
```
Check: 0.953012 seconds, 1710.94 MB/s, Add: 1.30025 seconds, 431.188 MB/s, Gc ratio: 43.9816%, Peak idle memory: 2.28375e+06
Check: 0.910234 seconds, 1791.35 MB/s, Add: 1.25908 seconds, 445.287 MB/s, Gc ratio: 44.0415%
```
## hash table

View File

@@ -6,26 +6,22 @@
#include <string.h>
#include <string>
#include <string_view>
#include <sys/ioctl.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <thread>
#include <unistd.h>
#include <utility>
#include <vector>
#include "ConflictSet.h"
#include "third_party/nadeau.h"
std::atomic<int64_t> transactions;
constexpr int kBaseSearchDepth = 115;
constexpr int kBaseSearchDepth = 32;
constexpr int kWindowSize = 10000000;
std::string numToKey(int64_t num) {
std::string result;
std::basic_string<uint8_t> numToKey(int64_t num) {
std::basic_string<uint8_t> result;
result.resize(kBaseSearchDepth + sizeof(int64_t));
memset(result.data(), 0, kBaseSearchDepth);
int64_t be = __builtin_bswap64(num);
@@ -45,13 +41,13 @@ void workload(weaselab::ConflictSet *cs) {
auto pointK = numToKey(pointRv);
weaselab::ConflictSet::ReadRange reads[] = {
{
{(const uint8_t *)pointK.data(), int(pointK.size())},
{pointK.data(), int(pointK.size())},
{nullptr, 0},
pointRv,
},
{
{(const uint8_t *)beginK.data(), int(beginK.size())},
{(const uint8_t *)endK.data(), int(endK.size())},
{beginK.data(), int(beginK.size())},
{endK.data(), int(endK.size())},
version - 2,
},
};
@@ -70,7 +66,7 @@ void workload(weaselab::ConflictSet *cs) {
{
weaselab::ConflictSet::WriteRange w;
auto k = numToKey(version);
w.begin.p = (const uint8_t *)k.data();
w.begin.p = k.data();
w.end.len = 0;
if (version % (kWindowSize / 2) == 0) {
for (int l = 0; l <= k.size(); ++l) {
@@ -83,9 +79,9 @@ void workload(weaselab::ConflictSet *cs) {
int64_t beginN = version - kWindowSize + rand() % kWindowSize;
auto b = numToKey(beginN);
auto e = numToKey(beginN + 1000);
w.begin.p = (const uint8_t *)b.data();
w.begin.p = b.data();
w.begin.len = b.size();
w.end.p = (const uint8_t *)e.data();
w.end.p = e.data();
w.end.len = e.size();
cs->addWrites(&w, 1, version);
}
@@ -168,29 +164,36 @@ double toSeconds(timeval t) {
return double(t.tv_sec) + double(t.tv_usec) * 1e-6;
}
#ifdef __linux__
#include <linux/perf_event.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#ifdef __linux__
struct PerfCounter {
PerfCounter(int type, int config, const std::string &labels = {},
int groupLeaderFd = -1)
: labels(labels) {
explicit PerfCounter(int event) {
struct perf_event_attr pe;
memset(&pe, 0, sizeof(pe));
pe.type = type;
pe.type = PERF_TYPE_HARDWARE;
pe.size = sizeof(pe);
pe.config = config;
pe.config = event;
pe.inherit = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
fd = perf_event_open(&pe, 0, -1, groupLeaderFd, 0);
if (fd < 0 && errno != ENOENT && errno != EINVAL) {
perror(labels.c_str());
fd = perf_event_open(&pe, 0, -1, -1, 0);
if (fd == -1) {
fprintf(stderr, "Error opening leader %llx\n", pe.config);
exit(EXIT_FAILURE);
}
}
int64_t total() const {
int64_t total() {
int64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
perror("read instructions from perf");
@@ -199,27 +202,10 @@ struct PerfCounter {
return count;
}
PerfCounter(PerfCounter &&other)
: fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {}
PerfCounter &operator=(PerfCounter &&other) {
fd = std::exchange(other.fd, -1);
labels = std::move(other.labels);
return *this;
}
~PerfCounter() {
if (fd >= 0) {
close(fd);
}
}
bool ok() const { return fd >= 0; }
const std::string &getLabels() const { return labels; }
int getFd() const { return fd; }
~PerfCounter() { close(fd); }
private:
int fd;
std::string labels;
static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
int cpu, int group_fd, unsigned long flags) {
int ret;
@@ -228,6 +214,11 @@ private:
return ret;
}
};
#else
struct PerfCounter {
explicit PerPerfCounter(int) {}
int64_t total() { return 0; }
};
#endif
int main(int argc, char **argv) {
@@ -242,50 +233,8 @@ int main(int argc, char **argv) {
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
#ifdef __linux__
PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS};
PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES, "",
instructions.getFd()};
std::vector<PerfCounter> cacheCounters;
for (auto [id, idStr] : std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_L1D, "l1d"},
{PERF_COUNT_HW_CACHE_L1I, "l1i"},
{PERF_COUNT_HW_CACHE_LL, "ll"},
{PERF_COUNT_HW_CACHE_DTLB, "dtlb"},
{PERF_COUNT_HW_CACHE_ITLB, "itlb"},
{PERF_COUNT_HW_CACHE_BPU, "bpu"},
{PERF_COUNT_HW_CACHE_NODE, "node"},
}) {
for (auto [op, opStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_OP_READ, "read"},
{PERF_COUNT_HW_CACHE_OP_WRITE, "write"},
{PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"},
}) {
int groupLeaderFd = -1;
for (auto [result, resultStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"},
{PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"},
}) {
auto labels = "{id=\"" + idStr + "\", op=\"" + opStr +
"\", result=\"" + resultStr + "\"}";
cacheCounters.emplace_back(PERF_TYPE_HW_CACHE,
id | (op << 8) | (result << 16), labels,
groupLeaderFd);
if (!cacheCounters.back().ok()) {
cacheCounters.pop_back();
} else {
if (groupLeaderFd == -1) {
groupLeaderFd = cacheCounters.back().getFd();
}
}
}
}
}
#endif
PerfCounter instructions{PERF_COUNT_HW_INSTRUCTIONS};
PerfCounter cycles{PERF_COUNT_HW_CPU_CYCLES};
auto w = std::thread{workload, &cs};
for (;;) {
@@ -313,7 +262,6 @@ int main(int argc, char **argv) {
"transactions_total ";
body += std::to_string(transactions.load(std::memory_order_relaxed));
body += "\n";
#ifdef __linux__
body += "# HELP instructions_total Total number of instructions\n"
"# TYPE instructions_total counter\n"
"instructions_total ";
@@ -324,13 +272,6 @@ int main(int argc, char **argv) {
"cycles_total ";
body += std::to_string(cycles.total());
body += "\n";
body += "# HELP cache_event_total Total number of cache events\n"
"# TYPE cache_event_total counter\n";
for (const auto &counter : cacheCounters) {
body += "cache_event_total" + counter.getLabels() + " " +
std::to_string(counter.total()) + "\n";
}
#endif
for (int i = 0; i < metricsCount; ++i) {
body += "# HELP ";

View File

@@ -22,11 +22,9 @@
#include "ConflictSet.h"
#include "Internal.h"
#include "Metrics.h"
#include <algorithm>
#include <span>
#include <vector>
std::span<const uint8_t> keyAfter(Arena &arena, std::span<const uint8_t> key) {
auto result =
@@ -117,6 +115,15 @@ bool operator==(const KeyInfo &lhs, const KeyInfo &rhs) {
return !(lhs < rhs || rhs < lhs);
}
void swapSort(std::vector<KeyInfo> &points, int a, int b) {
if (points[b] < points[a]) {
KeyInfo temp;
temp = points[a];
points[a] = points[b];
points[b] = temp;
}
}
struct SortTask {
int begin;
int size;
@@ -176,6 +183,13 @@ void sortPoints(std::vector<KeyInfo> &points) {
}
}
static thread_local uint32_t g_seed = 0;
static inline int skfastrand() {
g_seed = g_seed * 1664525L + 1013904223L;
return g_seed;
}
static int compare(const StringRef &a, const StringRef &b) {
int c = memcmp(a.data(), b.data(), std::min(a.size(), b.size()));
if (c < 0)
@@ -201,24 +215,20 @@ struct ReadConflictRange {
}
};
static constexpr int MaxLevels = 26;
struct RandomLevel {
explicit RandomLevel(uint32_t seed) : seed(seed) {}
int next() {
int result = __builtin_clz(seed | (uint32_t(-1) >> (MaxLevels - 1)));
seed = seed * 1664525L + 1013904223L;
return result;
}
private:
uint32_t seed;
};
class SkipList {
private:
RandomLevel randomLevel{0};
static constexpr int MaxLevels = 26;
int randomLevel() const {
uint32_t i = uint32_t(skfastrand()) >> (32 - (MaxLevels - 1));
int level = 0;
while (i & 1) {
i >>= 1;
level++;
}
assert(level < MaxLevels);
return level;
}
// Represent a node in the SkipList. The node has multiple (i.e., level)
// pointers to other nodes, and keeps a record of the max versions for each
@@ -416,33 +426,27 @@ public:
}
void swap(SkipList &other) { std::swap(header, other.header); }
// Returns the change in the number of entries
int64_t addConflictRanges(const Finger *fingers, int rangeCount,
Version version) {
int64_t result = rangeCount;
void addConflictRanges(const Finger *fingers, int rangeCount,
Version version) {
for (int r = rangeCount - 1; r >= 0; r--) {
const Finger &startF = fingers[r * 2];
const Finger &endF = fingers[r * 2 + 1];
if (endF.found() == nullptr) {
++result;
if (endF.found() == nullptr)
insert(endF, endF.finger[0]->getMaxVersion(0));
}
result -= remove(startF, endF);
remove(startF, endF);
insert(startF, version);
}
return result;
}
// Return number of iterations of main loop
int detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
void detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
const int M = 16;
int nextJob[M];
CheckMax inProgress[M];
if (!count)
return 0;
return;
int started = std::min(M, count);
for (int i = 0; i < started; i++) {
@@ -453,9 +457,8 @@ public:
int prevJob = started - 1;
int job = 0;
int iters = 0;
// vtune: 340 parts
for (;; ++iters) {
while (true) {
if (inProgress[job].advance()) {
if (started == count) {
if (prevJob == job)
@@ -471,7 +474,6 @@ public:
prevJob = job;
job = nextJob[job];
}
return iters;
}
void find(const StringRef *values, Finger *results, int *temp, int count) {
@@ -565,10 +567,9 @@ public:
}
private:
// Returns the number of entries removed
int64_t remove(const Finger &start, const Finger &end) {
void remove(const Finger &start, const Finger &end) {
if (start.finger[0] == end.finger[0])
return 0;
return;
Node *x = start.finger[0]->getNext(0);
@@ -577,20 +578,17 @@ private:
if (start.finger[i] != end.finger[i])
start.finger[i]->setNext(i, end.finger[i]->getNext(i));
int64_t result = 0;
while (true) {
Node *next = x->getNext(0);
x->destroy();
++result;
if (x == end.finger[0])
break;
x = next;
}
return result;
}
void insert(const Finger &f, Version version) {
int level = randomLevel.next();
int level = randomLevel();
// std::cout << std::string((const char*)value,length) << " level: " <<
// level << std::endl;
Node *x = Node::create(f.value, level);
@@ -706,27 +704,17 @@ private:
};
};
struct ReadContext {
int64_t commits_accum = 0;
int64_t conflicts_accum = 0;
int64_t too_olds_accum = 0;
int64_t check_bytes_accum = 0;
};
struct SkipListConflictSet {};
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
Impl(int64_t oldestVersion)
: oldestVersion(oldestVersion), newestVersion(oldestVersion),
skipList(oldestVersion) {
metrics = initMetrics(metricsList, metricsCount);
}
~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); }
skipList(oldestVersion) {}
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
int count) {
ReadContext tls;
int count) const {
Arena arena;
auto *ranges = new (arena) ReadConflictRange[count];
for (int i = 0; i < count; ++i) {
tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len;
ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)};
ranges[i].end = reads[i].end.len > 0
? StringRef{reads[i].end.p, size_t(reads[i].end.len)}
@@ -734,22 +722,13 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
ranges[i].version = reads[i].readVersion;
results[i] = ConflictSet::Commit;
}
int iters = skipList.detectConflicts(ranges, count, results);
skipList.detectConflicts(ranges, count, results);
for (int i = 0; i < count; ++i) {
if (reads[i].readVersion < oldestVersion ||
reads[i].readVersion < newestVersion - 2e9) {
results[i] = TooOld;
}
tls.commits_accum += results[i] == Commit;
tls.conflicts_accum += results[i] == Conflict;
tls.too_olds_accum += results[i] == TooOld;
}
range_read_iterations_total.add(iters);
range_read_total.add(count);
commits_total.add(tls.commits_accum);
conflicts_total.add(tls.conflicts_accum);
too_olds_total.add(tls.too_olds_accum);
check_bytes_total.add(tls.check_bytes_accum);
}
void addWrites(const ConflictSet::WriteRange *writes, int count,
@@ -796,33 +775,27 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
StringRef values[stripeSize];
int64_t writeVersions[stripeSize / 2];
int ss = stringCount - (stripes - 1) * stripeSize;
int64_t entryDelta = 0;
for (int s = stripes - 1; s >= 0; s--) {
for (int i = 0; i * 2 < ss; ++i) {
const auto &w = combinedWriteConflictRanges[s * stripeSize / 2 + i];
values[i * 2] = w.first;
values[i * 2 + 1] = w.second;
keyUpdates += 3;
}
skipList.find(values, fingers, temp, ss);
entryDelta += skipList.addConflictRanges(fingers, ss / 2, writeVersion);
skipList.addConflictRanges(fingers, ss / 2, writeVersion);
ss = stripeSize;
}
// Run gc at least 200% the rate we're inserting entries
keyUpdates += std::max<int64_t>(entryDelta, 0) * 2;
}
void setOldestVersion(int64_t oldestVersion) {
// This isn't 100% accurate. It overcounts if you hit the end
gc_iterations_total.add(keyUpdates);
assert(oldestVersion >= this->oldestVersion);
this->oldestVersion = oldestVersion;
SkipList::Finger finger;
int temp;
std::span<const uint8_t> key = removalKey;
skipList.find(&key, &finger, &temp, 1);
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 0));
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 10));
removalArena = Arena();
removalKey = copyToArena(
removalArena, {finger.getValue().data(), finger.getValue().size()});
@@ -830,56 +803,8 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
int64_t totalBytes = 0;
MetricsV1 *metrics;
int metricsCount = 0;
Metric *metricsList = nullptr;
#define GAUGE(name, help) \
Gauge name { metricsList, metricsCount, #name, help }
#define COUNTER(name, help) \
Counter name { metricsList, metricsCount, #name, help }
// ==================== METRICS DEFINITIONS ====================
COUNTER(range_read_total, "Total number of range reads checked");
COUNTER(range_read_iterations_total,
"Total number of iterations of the main loops for range read checks");
COUNTER(commits_total,
"Total number of checks where the result is \"commit\"");
COUNTER(conflicts_total,
"Total number of checks where the result is \"conflict\"");
COUNTER(too_olds_total,
"Total number of checks where the result is \"too old\"");
COUNTER(check_bytes_total, "Total number of key bytes checked");
GAUGE(memory_bytes, "Total number of bytes in use");
COUNTER(nodes_allocated_total,
"The total number of physical tree nodes allocated");
COUNTER(nodes_released_total,
"The total number of physical tree nodes released");
COUNTER(insert_iterations_total,
"The total number of iterations of the main loop for insertion. "
"Includes searches where the entry already existed, and so insertion "
"did not take place");
COUNTER(entries_inserted_total,
"The total number of entries inserted in the tree");
COUNTER(entries_erased_total,
"The total number of entries erased from the tree");
COUNTER(
gc_iterations_total,
"The total number of iterations of the main loop for garbage collection");
COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites");
GAUGE(oldest_version,
"The lowest version that doesn't result in \"TooOld\" for checks");
GAUGE(newest_version, "The version of the most recent call to addWrites");
// ==================== END METRICS DEFINITIONS ====================
#undef GAUGE
#undef COUNTER
void getMetricsV1(MetricsV1 **metrics, int *count) {
*metrics = this->metrics;
*count = metricsCount;
}
private:
int64_t keyUpdates = 0;
int64_t keyUpdates = 10;
Arena removalArena;
std::span<const uint8_t> removalKey;
int64_t oldestVersion;
@@ -900,7 +825,6 @@ void internal_addWrites(ConflictSet::Impl *impl,
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -912,7 +836,6 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -936,11 +859,12 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; }
void internal_getMetricsV1(ConflictSet::Impl *impl,
ConflictSet::MetricsV1 **metrics, int *count) {
return impl->getMetricsV1(metrics, count);
*metrics = nullptr;
*count = 0;
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
return 0;
}
void ConflictSet::check(const ReadRange *reads, Result *results,

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More