7 Commits

Author SHA1 Message Date
4b82502946 Accept node by ref for eraseBetween again 2024-08-14 14:43:19 -07:00
68bbacb69a Use getInTree in eraseBetween 2024-08-14 14:43:19 -07:00
3078845673 Fix nodes_released accounting 2024-08-14 14:43:19 -07:00
43f6126cc4 Add a missing assert, call to removeNode 2024-08-14 14:43:19 -07:00
b911d87d55 eraseBetween bug fixes 2024-08-14 14:43:19 -07:00
0c65a82b78 Separate codepath for prefix writes
Uses the newly-added eraseBetween
2024-08-14 14:43:19 -07:00
e024cb8291 Track entriesErased in destroyTree 2024-08-14 14:43:19 -07:00
278 changed files with 1201 additions and 1572 deletions

View File

@@ -7,6 +7,7 @@
void showMemory(const ConflictSet &cs);
#endif
#define ANKERL_NANOBENCH_IMPLEMENT
#include "third_party/nanobench.h"
constexpr int kNumKeys = 1000000;
@@ -360,7 +361,21 @@ void benchWorstCaseForRadixRangeRead() {
void benchCreateAndDestroy() {
ankerl::nanobench::Bench bench;
bench.run("create and destroy", [&]() { ConflictSet cs{0}; });
bench.run("create and destroy", [&]() {
ConflictSet cs{0};
ConflictSet::WriteRange w;
uint8_t b[9];
b[8] = 0;
for (int64_t i = 0; i < 1000; i += 7) {
auto x = __builtin_bswap64(i);
memcpy(b, &x, 8);
w.begin.p = b;
w.begin.len = 8;
w.end.len = 0;
w.end.p = b;
cs.addWrites(&w, 1, 1);
}
});
}
int main(void) {

View File

@@ -1,7 +1,7 @@
cmake_minimum_required(VERSION 3.18)
project(
conflict-set
VERSION 0.0.14
VERSION 0.0.12
DESCRIPTION
"A data structure for optimistic concurrency control on ranges of bitwise-lexicographically-ordered keys."
HOMEPAGE_URL "https://git.weaselab.dev/weaselab/conflict-set"
@@ -72,6 +72,12 @@ else()
add_link_options(-Wl,--gc-sections)
endif()
if(EMSCRIPTEN)
# https://github.com/emscripten-core/emscripten/issues/15377#issuecomment-1285167486
add_link_options(-lnodefs.js -lnoderawfs.js)
add_link_options(-s ALLOW_MEMORY_GROWTH)
endif()
if(NOT USE_SIMD_FALLBACK)
cmake_push_check_state()
list(APPEND CMAKE_REQUIRED_FLAGS -mavx)
@@ -138,8 +144,6 @@ include(CTest)
# disable tests if this is being used through e.g. FetchContent
if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_library(nanobench ${CMAKE_CURRENT_SOURCE_DIR}/nanobench.cpp)
set(TEST_FLAGS -Wall -Wextra -Wunreachable-code -Wpedantic -UNDEBUG)
# corpus tests, which are tests curated by libfuzzer. The goal is to get broad
@@ -187,7 +191,6 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
target_include_directories(conflict_set_main
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_definitions(conflict_set_main PRIVATE ENABLE_MAIN)
target_link_libraries(conflict_set_main PRIVATE nanobench)
if(NOT APPLE)
# libfuzzer target, to generate/manage corpus
@@ -248,19 +251,6 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_test(NAME conflict_set_blackbox_${hash} COMMAND driver ${TEST})
endforeach()
find_program(VALGRIND_EXE valgrind)
if(VALGRIND_EXE AND NOT CMAKE_CROSSCOMPILING)
list(LENGTH CORPUS_TESTS len)
math(EXPR last "${len} - 1")
set(partition_size 100)
foreach(i RANGE 0 ${last} ${partition_size})
list(SUBLIST CORPUS_TESTS ${i} ${partition_size} partition)
add_test(NAME conflict_set_blackbox_valgrind_${i}
COMMAND ${VALGRIND_EXE} --error-exitcode=99 --
$<TARGET_FILE:driver> ${partition})
endforeach()
endif()
# scripted tests. Written manually to fill in anything libfuzzer couldn't
# find.
if(NOT CMAKE_CROSSCOMPILING)
@@ -281,17 +271,16 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
${Python3_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/test_conflict_set.py test ${TEST}
--build-dir ${CMAKE_CURRENT_BINARY_DIR})
if(VALGRIND_EXE AND NOT CMAKE_CROSSCOMPILING)
add_test(
NAME script_test_${TEST}_valgrind
COMMAND
${VALGRIND_EXE} ${Python3_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/test_conflict_set.py test ${TEST}
--build-dir ${CMAKE_CURRENT_BINARY_DIR})
endif()
endforeach()
endif()
find_program(VALGRIND_EXE valgrind)
if(VALGRIND_EXE AND NOT CMAKE_CROSSCOMPILING)
add_test(NAME conflict_set_blackbox_valgrind
COMMAND ${VALGRIND_EXE} --error-exitcode=99 --
$<TARGET_FILE:driver> ${CORPUS_TESTS})
endif()
# api smoke tests
# c90
@@ -341,7 +330,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
# bench
add_executable(conflict_set_bench Bench.cpp)
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME} nanobench)
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME})
set_target_properties(conflict_set_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(real_data_bench RealDataBench.cpp)
target_link_libraries(real_data_bench PRIVATE ${PROJECT_NAME})
@@ -356,14 +345,6 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_executable(server_bench ServerBench.cpp)
target_link_libraries(server_bench PRIVATE ${PROJECT_NAME})
set_target_properties(server_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(interleaving_test InterleavingTest.cpp)
# work around lack of musttail for gcc
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_BUILD_TYPE STREQUAL "Debug")
target_compile_options(interleaving_test PRIVATE -Og
-foptimize-sibling-calls)
endif()
target_link_libraries(interleaving_test PRIVATE nanobench)
endif()
# packaging

File diff suppressed because it is too large Load Diff

View File

@@ -1,256 +0,0 @@
#include <alloca.h>
#include <cassert>
#ifdef __x86_64__
#include <immintrin.h>
#endif
#include "third_party/nanobench.h"
struct Job {
int *input;
// Returned void* is a function pointer to the next continuation. We have to
// use void* because otherwise the type would be recursive.
typedef void *(*continuation)(Job *);
continuation next;
};
void *stepJob(Job *j) {
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
return done ? nullptr : (void *)stepJob;
}
void sequential(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
do {
jobs[i]->next = (Job::continuation)jobs[i]->next(jobs[i]);
} while (jobs[i]->next);
}
}
void sequentialNoFuncPtr(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
while (stepJob(jobs[i]))
;
}
}
void interleaveSwapping(Job **jobs, int remaining) {
int current = 0;
while (remaining > 0) {
auto next = (Job::continuation)jobs[current]->next(jobs[current]);
jobs[current]->next = next;
if (next == nullptr) {
jobs[current] = jobs[remaining - 1];
--remaining;
} else {
++current;
}
if (current == remaining) {
current = 0;
}
}
}
void interleaveBoundedCyclicList(Job **jobs, int count) {
if (count == 0) {
return;
}
constexpr int kConcurrent = 32;
Job *inProgress[kConcurrent];
int nextJob[kConcurrent];
int started = std::min(kConcurrent, count);
for (int i = 0; i < started; i++) {
inProgress[i] = jobs[i];
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)inProgress[job]->next(inProgress[job]);
inProgress[job]->next = next;
if (next == nullptr) {
if (started == count) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else {
int temp = started++;
inProgress[job] = jobs[temp];
}
}
prevJob = job;
job = nextJob[job];
}
}
#ifndef __has_attribute
#define __has_attribute(x) 0
#endif
#if __has_attribute(musttail)
#define MUSTTAIL __attribute__((musttail))
#else
#define MUSTTAIL
#endif
struct Context {
constexpr static int kConcurrent = 32;
Job **jobs;
Job *inProgress[kConcurrent];
void (*continuation[kConcurrent])(Context *, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
int nextJob[kConcurrent];
};
void keepGoing(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
void complete(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
if (started == count) {
if (prevJob == job) {
return;
}
context->nextJob[prevJob] = context->nextJob[job];
job = prevJob;
} else {
context->inProgress[job] = context->jobs[started++];
context->continuation[job] = stepJobTailCall;
}
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
auto *j = context->inProgress[job];
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
if (done) {
MUSTTAIL return complete(context, prevJob, job, started, count);
} else {
context->continuation[job] = stepJobTailCall;
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
}
void useTailCalls(Job **jobs, int count) {
if (count == 0) {
return;
}
Context context;
context.jobs = jobs;
int64_t started = std::min(Context::kConcurrent, count);
for (int i = 0; i < started; i++) {
context.inProgress[i] = jobs[i];
context.nextJob[i] = i + 1;
context.continuation[i] = stepJobTailCall;
}
context.nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
return context.continuation[job](&context, prevJob, job, started, count);
}
void interleaveCyclicList(Job **jobs, int count) {
auto *nextJob = (int *)alloca(sizeof(int) * count);
for (int i = 0; i < count - 1; ++i) {
nextJob[i] = i + 1;
}
nextJob[count - 1] = 0;
int prevJob = count - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)jobs[job]->next(jobs[job]);
jobs[job]->next = next;
if (next == nullptr) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
}
prevJob = job;
job = nextJob[job];
}
}
int main() {
ankerl::nanobench::Bench bench;
constexpr int kNumJobs = 10000;
bench.relative(true);
Job jobs[kNumJobs];
Job jobsCopy[kNumJobs];
int iters = 0;
int originalInput[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
originalInput[i] = rand() % 5 + 3;
jobs[i].input = new int{originalInput[i]};
jobs[i].next = stepJob;
iters += *jobs[i].input;
}
bench.batch(iters);
for (auto [scheduler, name] :
{std::make_pair(sequentialNoFuncPtr, "sequentialNoFuncPtr"),
std::make_pair(sequential, "sequential"),
std::make_pair(useTailCalls, "useTailCalls"),
std::make_pair(interleaveSwapping, "interleavingSwapping"),
std::make_pair(interleaveBoundedCyclicList,
"interleaveBoundedCyclicList"),
std::make_pair(interleaveCyclicList, "interleaveCyclicList")}) {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
for (int i = 0; i < kNumJobs; ++i) {
if (*jobsCopy[i].input != 0) {
fprintf(stderr, "%s failed\n", name);
abort();
}
}
bench.run(name, [&]() {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
});
}
for (int i = 0; i < kNumJobs; ++i) {
delete jobs[i].input;
}
}

View File

@@ -20,6 +20,7 @@ using namespace weaselab;
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>
#include <callgrind.h>
@@ -747,10 +748,7 @@ struct TestDriver {
fprintf(stderr, "%p Set oldest version: %" PRId64 "\n", this,
oldestVersion);
#endif
CALLGRIND_START_INSTRUMENTATION;
cs.setOldestVersion(oldestVersion);
CALLGRIND_STOP_INSTRUMENTATION;
if constexpr (kEnableAssertions) {
refImpl.setOldestVersion(oldestVersion);
}

13
Jenkinsfile vendored
View File

@@ -48,17 +48,6 @@ pipeline {
recordIssues(tools: [clang()])
}
}
stage('64 bit versions') {
agent {
dockerfile {
args '-v /home/jenkins/ccache:/ccache'
reuseNode true
}
}
steps {
CleanBuildAndTest("-DCMAKE_CXX_FLAGS=-DUSE_64_BIT=1")
}
}
stage('Debug') {
agent {
dockerfile {
@@ -129,7 +118,7 @@ pipeline {
}
steps {
script {
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h"
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h"
}
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
sh """

View File

@@ -1,64 +0,0 @@
#pragma once
#include "ConflictSet.h"
#include "Internal.h"
#include <assert.h>
#include <atomic>
#include <tuple>
struct Metric {
Metric *prev;
const char *name;
const char *help;
weaselab::ConflictSet::MetricsV1::Type type;
std::atomic<int64_t> value;
protected:
Metric(Metric *&metricList, int &metricsCount, const char *name,
const char *help, weaselab::ConflictSet::MetricsV1::Type type)
: prev(std::exchange(metricList, this)), name(name), help(help),
type(type), value(0) {
++metricsCount;
}
};
struct Gauge : private Metric {
Gauge(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Gauge) {}
void set(int64_t value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(int64_t value) {
assert(value >= 0);
static_assert(std::atomic<int64_t>::is_always_lock_free);
this->value.fetch_add(value, std::memory_order_relaxed);
}
};
inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList,
int metricsCount) {
weaselab::ConflictSet::MetricsV1 *metrics =
(weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount *
sizeof(metrics[0]));
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0;
--i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].p = m;
metrics[i].type = m->type;
}
return metrics;
}

View File

@@ -24,15 +24,16 @@ Hardware for all benchmarks is an AMD Ryzen 9 7900 with (2x32GB) 5600MT/s CL28-3
| ns/op | op/s | err% | ins/op | cyc/op | IPC | bra/op | miss% | total | benchmark
|--------------------:|--------------------:|--------:|----------------:|----------------:|-------:|---------------:|--------:|----------:|:----------
| 12.88 | 77,653,350.77 | 0.5% | 185.37 | 64.45 | 2.876 | 41.51 | 0.4% | 0.01 | `point reads`
| 14.67 | 68,179,354.49 | 0.1% | 271.44 | 73.40 | 3.698 | 53.70 | 0.3% | 0.01 | `prefix reads`
| 34.84 | 28,701,444.36 | 0.3% | 715.74 | 175.27 | 4.084 | 127.30 | 0.2% | 0.01 | `range reads`
| 17.12 | 58,422,988.28 | 0.2% | 314.30 | 86.11 | 3.650 | 39.82 | 0.4% | 0.01 | `point writes`
| 31.42 | 31,830,804.65 | 0.1% | 591.06 | 158.07 | 3.739 | 82.67 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 2.2% | 681.98 | 188.95 | 3.609 | 96.10 | 0.1% | 0.01 | `range writes`
| 76.72 | 13,035,140.63 | 2.3% | 1,421.28 | 387.17 | 3.671 | 257.76 | 0.1% | 0.01 | `monotonic increasing point writes`
| 297,452.00 | 3,361.89 | 0.9% | 3,508,083.00 | 1,500,834.67 | 2.337 | 727,525.33 | 0.1% | 0.01 | `worst case for radix tree`
| 87.70 | 11,402,490.60 | 1.0% | 1,795.00 | 442.09 | 4.060 | 297.00 | 0.0% | 0.01 | `create and destroy`
| 10.80 | 92,600,541.52 | 0.6% | 180.38 | 54.49 | 3.310 | 41.51 | 0.4% | 0.01 | `point reads`
| 15.00 | 66,687,691.68 | 0.4% | 278.44 | 76.44 | 3.642 | 55.56 | 0.3% | 0.01 | `prefix reads`
| 36.81 | 27,163,394.61 | 0.4% | 795.06 | 187.91 | 4.231 | 142.67 | 0.2% | 0.01 | `range reads`
| 18.14 | 55,137,674.01 | 1.2% | 338.19 | 92.86 | 3.642 | 42.81 | 0.4% | 0.01 | `point writes`
| 33.19 | 30,127,119.71 | 0.1% | 681.03 | 170.05 | 4.005 | 98.68 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 1.9% | 779.70 | 195.45 | 3.989 | 114.21 | 0.0% | 0.01 | `range writes`
| 74.36 | 13,448,582.47 | 1.9% | 1,425.68 | 389.08 | 3.664 | 258.88 | 0.1% | 0.01 | `monotonic increasing point writes`
| 316,928.00 | 3,155.29 | 1.5% | 3,992,986.00 | 1,699,813.00 | 2.349 | 806,226.50 | 0.0% | 0.01 | `worst case for radix tree`
| 75.26 | 13,286,517.16 | 0.5% | 1,590.01 | 386.67 | 4.112 | 258.00 | 0.0% | 0.01 | `create and destroy`
# "Real data" test
@@ -47,7 +48,7 @@ Check: 4.47891 seconds, 364.05 MB/s, Add: 4.55599 seconds, 123.058 MB/s, Gc rati
## radix tree
```
Check: 0.953012 seconds, 1710.94 MB/s, Add: 1.30025 seconds, 431.188 MB/s, Gc ratio: 43.9816%, Peak idle memory: 2.28375e+06
Check: 0.910234 seconds, 1791.35 MB/s, Add: 1.25908 seconds, 445.287 MB/s, Gc ratio: 44.0415%
```
## hash table

View File

@@ -6,26 +6,22 @@
#include <string.h>
#include <string>
#include <string_view>
#include <sys/ioctl.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <thread>
#include <unistd.h>
#include <utility>
#include <vector>
#include "ConflictSet.h"
#include "third_party/nadeau.h"
std::atomic<int64_t> transactions;
constexpr int kBaseSearchDepth = 115;
constexpr int kBaseSearchDepth = 32;
constexpr int kWindowSize = 10000000;
std::string numToKey(int64_t num) {
std::string result;
std::basic_string<uint8_t> numToKey(int64_t num) {
std::basic_string<uint8_t> result;
result.resize(kBaseSearchDepth + sizeof(int64_t));
memset(result.data(), 0, kBaseSearchDepth);
int64_t be = __builtin_bswap64(num);
@@ -45,13 +41,13 @@ void workload(weaselab::ConflictSet *cs) {
auto pointK = numToKey(pointRv);
weaselab::ConflictSet::ReadRange reads[] = {
{
{(const uint8_t *)pointK.data(), int(pointK.size())},
{pointK.data(), int(pointK.size())},
{nullptr, 0},
pointRv,
},
{
{(const uint8_t *)beginK.data(), int(beginK.size())},
{(const uint8_t *)endK.data(), int(endK.size())},
{beginK.data(), int(beginK.size())},
{endK.data(), int(endK.size())},
version - 2,
},
};
@@ -70,7 +66,7 @@ void workload(weaselab::ConflictSet *cs) {
{
weaselab::ConflictSet::WriteRange w;
auto k = numToKey(version);
w.begin.p = (const uint8_t *)k.data();
w.begin.p = k.data();
w.end.len = 0;
if (version % (kWindowSize / 2) == 0) {
for (int l = 0; l <= k.size(); ++l) {
@@ -83,9 +79,9 @@ void workload(weaselab::ConflictSet *cs) {
int64_t beginN = version - kWindowSize + rand() % kWindowSize;
auto b = numToKey(beginN);
auto e = numToKey(beginN + 1000);
w.begin.p = (const uint8_t *)b.data();
w.begin.p = b.data();
w.begin.len = b.size();
w.end.p = (const uint8_t *)e.data();
w.end.p = e.data();
w.end.len = e.size();
cs->addWrites(&w, 1, version);
}
@@ -168,68 +164,6 @@ double toSeconds(timeval t) {
return double(t.tv_sec) + double(t.tv_usec) * 1e-6;
}
#ifdef __linux__
#include <linux/perf_event.h>
struct PerfCounter {
PerfCounter(int type, int config, const std::string &labels = {},
int groupLeaderFd = -1)
: labels(labels) {
struct perf_event_attr pe;
memset(&pe, 0, sizeof(pe));
pe.type = type;
pe.size = sizeof(pe);
pe.config = config;
pe.inherit = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
fd = perf_event_open(&pe, 0, -1, groupLeaderFd, 0);
if (fd < 0 && errno != ENOENT && errno != EINVAL) {
perror(labels.c_str());
}
}
int64_t total() const {
int64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
perror("read instructions from perf");
abort();
}
return count;
}
PerfCounter(PerfCounter &&other)
: fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {}
PerfCounter &operator=(PerfCounter &&other) {
fd = std::exchange(other.fd, -1);
labels = std::move(other.labels);
return *this;
}
~PerfCounter() {
if (fd >= 0) {
close(fd);
}
}
bool ok() const { return fd >= 0; }
const std::string &getLabels() const { return labels; }
int getFd() const { return fd; }
private:
int fd;
std::string labels;
static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
int cpu, int group_fd, unsigned long flags) {
int ret;
ret = syscall(SYS_perf_event_open, hw_event, pid, cpu, group_fd, flags);
return ret;
}
};
#endif
int main(int argc, char **argv) {
if (argc != 3) {
goto fail;
@@ -242,50 +176,6 @@ int main(int argc, char **argv) {
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
#ifdef __linux__
PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS};
PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES, "",
instructions.getFd()};
std::vector<PerfCounter> cacheCounters;
for (auto [id, idStr] : std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_L1D, "l1d"},
{PERF_COUNT_HW_CACHE_L1I, "l1i"},
{PERF_COUNT_HW_CACHE_LL, "ll"},
{PERF_COUNT_HW_CACHE_DTLB, "dtlb"},
{PERF_COUNT_HW_CACHE_ITLB, "itlb"},
{PERF_COUNT_HW_CACHE_BPU, "bpu"},
{PERF_COUNT_HW_CACHE_NODE, "node"},
}) {
for (auto [op, opStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_OP_READ, "read"},
{PERF_COUNT_HW_CACHE_OP_WRITE, "write"},
{PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"},
}) {
int groupLeaderFd = -1;
for (auto [result, resultStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"},
{PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"},
}) {
auto labels = "{id=\"" + idStr + "\", op=\"" + opStr +
"\", result=\"" + resultStr + "\"}";
cacheCounters.emplace_back(PERF_TYPE_HW_CACHE,
id | (op << 8) | (result << 16), labels,
groupLeaderFd);
if (!cacheCounters.back().ok()) {
cacheCounters.pop_back();
} else {
if (groupLeaderFd == -1) {
groupLeaderFd = cacheCounters.back().getFd();
}
}
}
}
}
#endif
auto w = std::thread{workload, &cs};
for (;;) {
@@ -313,24 +203,6 @@ int main(int argc, char **argv) {
"transactions_total ";
body += std::to_string(transactions.load(std::memory_order_relaxed));
body += "\n";
#ifdef __linux__
body += "# HELP instructions_total Total number of instructions\n"
"# TYPE instructions_total counter\n"
"instructions_total ";
body += std::to_string(instructions.total());
body += "\n";
body += "# HELP cycles_total Total number of cycles\n"
"# TYPE cycles_total counter\n"
"cycles_total ";
body += std::to_string(cycles.total());
body += "\n";
body += "# HELP cache_event_total Total number of cache events\n"
"# TYPE cache_event_total counter\n";
for (const auto &counter : cacheCounters) {
body += "cache_event_total" + counter.getLabels() + " " +
std::to_string(counter.total()) + "\n";
}
#endif
for (int i = 0; i < metricsCount; ++i) {
body += "# HELP ";

View File

@@ -22,11 +22,9 @@
#include "ConflictSet.h"
#include "Internal.h"
#include "Metrics.h"
#include <algorithm>
#include <span>
#include <vector>
std::span<const uint8_t> keyAfter(Arena &arena, std::span<const uint8_t> key) {
auto result =
@@ -117,6 +115,15 @@ bool operator==(const KeyInfo &lhs, const KeyInfo &rhs) {
return !(lhs < rhs || rhs < lhs);
}
void swapSort(std::vector<KeyInfo> &points, int a, int b) {
if (points[b] < points[a]) {
KeyInfo temp;
temp = points[a];
points[a] = points[b];
points[b] = temp;
}
}
struct SortTask {
int begin;
int size;
@@ -176,6 +183,13 @@ void sortPoints(std::vector<KeyInfo> &points) {
}
}
static thread_local uint32_t g_seed = 0;
static inline int skfastrand() {
g_seed = g_seed * 1664525L + 1013904223L;
return g_seed;
}
static int compare(const StringRef &a, const StringRef &b) {
int c = memcmp(a.data(), b.data(), std::min(a.size(), b.size()));
if (c < 0)
@@ -201,24 +215,20 @@ struct ReadConflictRange {
}
};
static constexpr int MaxLevels = 26;
struct RandomLevel {
explicit RandomLevel(uint32_t seed) : seed(seed) {}
int next() {
int result = __builtin_clz(seed | (uint32_t(-1) >> (MaxLevels - 1)));
seed = seed * 1664525L + 1013904223L;
return result;
}
private:
uint32_t seed;
};
class SkipList {
private:
RandomLevel randomLevel{0};
static constexpr int MaxLevels = 26;
int randomLevel() const {
uint32_t i = uint32_t(skfastrand()) >> (32 - (MaxLevels - 1));
int level = 0;
while (i & 1) {
i >>= 1;
level++;
}
assert(level < MaxLevels);
return level;
}
// Represent a node in the SkipList. The node has multiple (i.e., level)
// pointers to other nodes, and keeps a record of the max versions for each
@@ -416,33 +426,27 @@ public:
}
void swap(SkipList &other) { std::swap(header, other.header); }
// Returns the change in the number of entries
int64_t addConflictRanges(const Finger *fingers, int rangeCount,
Version version) {
int64_t result = rangeCount;
void addConflictRanges(const Finger *fingers, int rangeCount,
Version version) {
for (int r = rangeCount - 1; r >= 0; r--) {
const Finger &startF = fingers[r * 2];
const Finger &endF = fingers[r * 2 + 1];
if (endF.found() == nullptr) {
++result;
if (endF.found() == nullptr)
insert(endF, endF.finger[0]->getMaxVersion(0));
}
result -= remove(startF, endF);
remove(startF, endF);
insert(startF, version);
}
return result;
}
// Return number of iterations of main loop
int detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
void detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
const int M = 16;
int nextJob[M];
CheckMax inProgress[M];
if (!count)
return 0;
return;
int started = std::min(M, count);
for (int i = 0; i < started; i++) {
@@ -453,9 +457,8 @@ public:
int prevJob = started - 1;
int job = 0;
int iters = 0;
// vtune: 340 parts
for (;; ++iters) {
while (true) {
if (inProgress[job].advance()) {
if (started == count) {
if (prevJob == job)
@@ -471,7 +474,6 @@ public:
prevJob = job;
job = nextJob[job];
}
return iters;
}
void find(const StringRef *values, Finger *results, int *temp, int count) {
@@ -565,10 +567,9 @@ public:
}
private:
// Returns the number of entries removed
int64_t remove(const Finger &start, const Finger &end) {
void remove(const Finger &start, const Finger &end) {
if (start.finger[0] == end.finger[0])
return 0;
return;
Node *x = start.finger[0]->getNext(0);
@@ -577,20 +578,17 @@ private:
if (start.finger[i] != end.finger[i])
start.finger[i]->setNext(i, end.finger[i]->getNext(i));
int64_t result = 0;
while (true) {
Node *next = x->getNext(0);
x->destroy();
++result;
if (x == end.finger[0])
break;
x = next;
}
return result;
}
void insert(const Finger &f, Version version) {
int level = randomLevel.next();
int level = randomLevel();
// std::cout << std::string((const char*)value,length) << " level: " <<
// level << std::endl;
Node *x = Node::create(f.value, level);
@@ -706,27 +704,17 @@ private:
};
};
struct ReadContext {
int64_t commits_accum = 0;
int64_t conflicts_accum = 0;
int64_t too_olds_accum = 0;
int64_t check_bytes_accum = 0;
};
struct SkipListConflictSet {};
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
Impl(int64_t oldestVersion)
: oldestVersion(oldestVersion), newestVersion(oldestVersion),
skipList(oldestVersion) {
metrics = initMetrics(metricsList, metricsCount);
}
~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); }
skipList(oldestVersion) {}
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
int count) {
ReadContext tls;
int count) const {
Arena arena;
auto *ranges = new (arena) ReadConflictRange[count];
for (int i = 0; i < count; ++i) {
tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len;
ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)};
ranges[i].end = reads[i].end.len > 0
? StringRef{reads[i].end.p, size_t(reads[i].end.len)}
@@ -734,22 +722,13 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
ranges[i].version = reads[i].readVersion;
results[i] = ConflictSet::Commit;
}
int iters = skipList.detectConflicts(ranges, count, results);
skipList.detectConflicts(ranges, count, results);
for (int i = 0; i < count; ++i) {
if (reads[i].readVersion < oldestVersion ||
reads[i].readVersion < newestVersion - 2e9) {
results[i] = TooOld;
}
tls.commits_accum += results[i] == Commit;
tls.conflicts_accum += results[i] == Conflict;
tls.too_olds_accum += results[i] == TooOld;
}
range_read_iterations_total.add(iters);
range_read_total.add(count);
commits_total.add(tls.commits_accum);
conflicts_total.add(tls.conflicts_accum);
too_olds_total.add(tls.too_olds_accum);
check_bytes_total.add(tls.check_bytes_accum);
}
void addWrites(const ConflictSet::WriteRange *writes, int count,
@@ -796,33 +775,27 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
StringRef values[stripeSize];
int64_t writeVersions[stripeSize / 2];
int ss = stringCount - (stripes - 1) * stripeSize;
int64_t entryDelta = 0;
for (int s = stripes - 1; s >= 0; s--) {
for (int i = 0; i * 2 < ss; ++i) {
const auto &w = combinedWriteConflictRanges[s * stripeSize / 2 + i];
values[i * 2] = w.first;
values[i * 2 + 1] = w.second;
keyUpdates += 3;
}
skipList.find(values, fingers, temp, ss);
entryDelta += skipList.addConflictRanges(fingers, ss / 2, writeVersion);
skipList.addConflictRanges(fingers, ss / 2, writeVersion);
ss = stripeSize;
}
// Run gc at least 200% the rate we're inserting entries
keyUpdates += std::max<int64_t>(entryDelta, 0) * 2;
}
void setOldestVersion(int64_t oldestVersion) {
// This isn't 100% accurate. It overcounts if you hit the end
gc_iterations_total.add(keyUpdates);
assert(oldestVersion >= this->oldestVersion);
this->oldestVersion = oldestVersion;
SkipList::Finger finger;
int temp;
std::span<const uint8_t> key = removalKey;
skipList.find(&key, &finger, &temp, 1);
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 0));
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 10));
removalArena = Arena();
removalKey = copyToArena(
removalArena, {finger.getValue().data(), finger.getValue().size()});
@@ -830,56 +803,8 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
int64_t totalBytes = 0;
MetricsV1 *metrics;
int metricsCount = 0;
Metric *metricsList = nullptr;
#define GAUGE(name, help) \
Gauge name { metricsList, metricsCount, #name, help }
#define COUNTER(name, help) \
Counter name { metricsList, metricsCount, #name, help }
// ==================== METRICS DEFINITIONS ====================
COUNTER(range_read_total, "Total number of range reads checked");
COUNTER(range_read_iterations_total,
"Total number of iterations of the main loops for range read checks");
COUNTER(commits_total,
"Total number of checks where the result is \"commit\"");
COUNTER(conflicts_total,
"Total number of checks where the result is \"conflict\"");
COUNTER(too_olds_total,
"Total number of checks where the result is \"too old\"");
COUNTER(check_bytes_total, "Total number of key bytes checked");
GAUGE(memory_bytes, "Total number of bytes in use");
COUNTER(nodes_allocated_total,
"The total number of physical tree nodes allocated");
COUNTER(nodes_released_total,
"The total number of physical tree nodes released");
COUNTER(insert_iterations_total,
"The total number of iterations of the main loop for insertion. "
"Includes searches where the entry already existed, and so insertion "
"did not take place");
COUNTER(entries_inserted_total,
"The total number of entries inserted in the tree");
COUNTER(entries_erased_total,
"The total number of entries erased from the tree");
COUNTER(
gc_iterations_total,
"The total number of iterations of the main loop for garbage collection");
COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites");
GAUGE(oldest_version,
"The lowest version that doesn't result in \"TooOld\" for checks");
GAUGE(newest_version, "The version of the most recent call to addWrites");
// ==================== END METRICS DEFINITIONS ====================
#undef GAUGE
#undef COUNTER
void getMetricsV1(MetricsV1 **metrics, int *count) {
*metrics = this->metrics;
*count = metricsCount;
}
private:
int64_t keyUpdates = 0;
int64_t keyUpdates = 10;
Arena removalArena;
std::span<const uint8_t> removalKey;
int64_t oldestVersion;
@@ -900,7 +825,6 @@ void internal_addWrites(ConflictSet::Impl *impl,
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -912,7 +836,6 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
@@ -936,11 +859,12 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; }
void internal_getMetricsV1(ConflictSet::Impl *impl,
ConflictSet::MetricsV1 **metrics, int *count) {
return impl->getMetricsV1(metrics, count);
*metrics = nullptr;
*count = 0;
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
return 0;
}
void ConflictSet::check(const ReadRange *reads, Result *results,

Some files were not shown because too many files have changed in this diff Show More