21 Commits

Author SHA1 Message Date
ed67486077 Reordering seems to improve codegen
Some checks failed
Tests / Clang total: 3244, passed: 3244
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / 64 bit versions total: 3244, passed: 3244
Tests / Debug total: 3242, passed: 3242
Tests / SIMD fallback total: 3244, passed: 3244
Tests / Release [gcc] total: 3244, passed: 3244
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [gcc,aarch64] total: 2419, passed: 2419
Tests / Coverage total: 2437, passed: 2437
Code Coverage #### Project Overview No changes detected, that affect the code coverage. * Line Coverage: 98.98% (1938/1958) * Branch Coverage: 68.67% (1497/2180) * Complexity Density: 0.00 * Lines of Code: 1958 #### Quality Gates Summary Output truncated.
weaselab/conflict-set/pipeline/head There was a failure building this commit
2024-09-23 15:28:51 -07:00
b376f6fdd5 WIP
Some checks failed
Tests / Clang total: 3244, passed: 3244
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / 64 bit versions total: 3244, passed: 3244
Tests / Debug total: 3242, passed: 3242
Tests / SIMD fallback total: 3244, passed: 3244
Tests / Release [gcc] total: 3244, passed: 3244
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [gcc,aarch64] total: 2419, passed: 2419
Tests / Coverage total: 2437, passed: 2437
Code Coverage #### Project Overview No changes detected, that affect the code coverage. * Line Coverage: 98.98% (1938/1958) * Branch Coverage: 68.67% (1497/2180) * Complexity Density: 0.00 * Lines of Code: 1958 #### Quality Gates Summary Output truncated.
weaselab/conflict-set/pipeline/head There was a failure building this commit
2024-09-23 15:11:48 -07:00
6de63dd3fe Use preserve_none and put continuation array in CheckAll 2024-09-23 14:53:16 -07:00
3e5f13bf54 WIP - tests pass 2024-09-23 13:32:56 -07:00
e7e1d1f7f5 Add tail-call based interleaving approach 2024-09-23 12:52:30 -07:00
442658e983 Target ~1GB memory usage in server bench 2024-09-21 14:28:15 -07:00
26f602215e Accentuate cache misses for point reads in server_bench 2024-09-14 22:42:40 -07:00
98236f81cb Add missing __builtin_prefetch 2024-09-14 22:41:58 -07:00
3593b72880 Disallow checking SIM_CACHE_MISSES=1 2024-09-10 22:23:37 -07:00
814aac4ea7 Experiment with causing cache misses 2024-09-10 22:06:00 -07:00
0550fa0016 Add "iter" state 2024-09-10 17:22:10 -07:00
fe5cfb0336 Remove redundant cast 2024-09-10 17:06:45 -07:00
82203515a0 check_point_read_state_machine::down_left_spine 2024-09-10 17:05:09 -07:00
465372c734 Scaffolding to prepare for interleaving checks 2024-09-10 16:10:57 -07:00
867136ff1b Return pointer to next function 2024-09-09 21:59:49 -07:00
4b8f7320d3 Call function pointer in job 2024-09-09 21:00:31 -07:00
6628092384 Tinker with interleaveBoundedCyclicList 2024-09-09 20:25:40 -07:00
a0a4f1afea Only compile nanobench once 2024-09-09 20:10:55 -07:00
ca479c03ce Induce a cache miss in interleaving test 2024-09-09 17:55:35 -07:00
0a2e133ab9 Add InterleavingTest to explore #5 2024-09-09 17:27:58 -07:00
b0b31419b0 Remove vestigial emscripten from cmake
All checks were successful
Tests / Clang total: 3244, passed: 3244
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / 64 bit versions total: 3244, passed: 3244
Tests / Debug total: 3242, passed: 3242
Tests / SIMD fallback total: 3244, passed: 3244
Tests / Release [gcc] total: 3244, passed: 3244
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [gcc,aarch64] total: 2419, passed: 2419
Tests / Coverage total: 2437, passed: 2437
Code Coverage #### Project Overview No changes detected, that affect the code coverage. * Line Coverage: 99.23% (1803/1817) * Branch Coverage: 68.36% (1426/2086) * Complexity Density: 0.00 * Lines of Code: 1817 #### Quality Gates Summary Output truncated.
weaselab/conflict-set/pipeline/head This commit looks good
2024-09-06 20:29:59 -07:00
7 changed files with 627 additions and 90 deletions

View File

@@ -24,6 +24,14 @@ repos:
entry: "^#define SHOW_MEMORY 1$"
language: pygrep
types: [c++]
- repo: local
hooks:
- id: sim cache misses check
name: disallow checking in SIM_CACHE_MISSES=1
description: disallow checking in SIM_CACHE_MISSES=1
entry: "^#define SIM_CACHE_MISSES 1$"
language: pygrep
types: [c++]
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: a23f6b85d0fdd5bb9d564e2579e678033debbdff # frozen: v0.10.0.1
hooks:

View File

@@ -7,7 +7,6 @@
void showMemory(const ConflictSet &cs);
#endif
#define ANKERL_NANOBENCH_IMPLEMENT
#include "third_party/nanobench.h"
constexpr int kNumKeys = 1000000;

View File

@@ -72,12 +72,6 @@ else()
add_link_options(-Wl,--gc-sections)
endif()
if(EMSCRIPTEN)
# https://github.com/emscripten-core/emscripten/issues/15377#issuecomment-1285167486
add_link_options(-lnodefs.js -lnoderawfs.js)
add_link_options(-s ALLOW_MEMORY_GROWTH)
endif()
if(NOT USE_SIMD_FALLBACK)
cmake_push_check_state()
list(APPEND CMAKE_REQUIRED_FLAGS -mavx)
@@ -144,6 +138,8 @@ include(CTest)
# disable tests if this is being used through e.g. FetchContent
if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_library(nanobench ${CMAKE_CURRENT_SOURCE_DIR}/nanobench.cpp)
set(TEST_FLAGS -Wall -Wextra -Wunreachable-code -Wpedantic -UNDEBUG)
# corpus tests, which are tests curated by libfuzzer. The goal is to get broad
@@ -191,6 +187,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
target_include_directories(conflict_set_main
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_definitions(conflict_set_main PRIVATE ENABLE_MAIN)
target_link_libraries(conflict_set_main PRIVATE nanobench)
if(NOT APPLE)
# libfuzzer target, to generate/manage corpus
@@ -336,7 +333,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
# bench
add_executable(conflict_set_bench Bench.cpp)
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME})
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME} nanobench)
set_target_properties(conflict_set_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(real_data_bench RealDataBench.cpp)
target_link_libraries(real_data_bench PRIVATE ${PROJECT_NAME})
@@ -351,6 +348,14 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_executable(server_bench ServerBench.cpp)
target_link_libraries(server_bench PRIVATE ${PROJECT_NAME})
set_target_properties(server_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(interleaving_test InterleavingTest.cpp)
# work around lack of musttail for gcc
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_BUILD_TYPE STREQUAL "Debug")
target_compile_options(interleaving_test PRIVATE -Og
-foptimize-sibling-calls)
endif()
target_link_libraries(interleaving_test PRIVATE nanobench)
endif()
# packaging

View File

@@ -48,6 +48,17 @@ limitations under the License.
#endif
#endif
#define SIM_CACHE_MISSES 0
#if SIM_CACHE_MISSES
constexpr void simCacheMiss(void *x) {
if (x) {
_mm_clflush(x);
}
}
#else
constexpr void simCacheMiss(void *) {}
#endif
#include <memcheck.h>
using namespace weaselab;
@@ -836,21 +847,29 @@ int getNodeIndex(Node16 *self, uint8_t index) {
// Precondition - an entry for index must exist in the node
Node *&getChildExists(Node3 *self, uint8_t index) {
return self->children[getNodeIndex(self, index)];
auto &result = self->children[getNodeIndex(self, index)];
simCacheMiss(result);
return result;
}
// Precondition - an entry for index must exist in the node
Node *&getChildExists(Node16 *self, uint8_t index) {
return self->children[getNodeIndex(self, index)];
auto &result = self->children[getNodeIndex(self, index)];
simCacheMiss(result);
return result;
}
// Precondition - an entry for index must exist in the node
Node *&getChildExists(Node48 *self, uint8_t index) {
assert(self->bitSet.test(index));
return self->children[self->index[index]];
auto &result = self->children[self->index[index]];
simCacheMiss(result);
return result;
}
// Precondition - an entry for index must exist in the node
Node *&getChildExists(Node256 *self, uint8_t index) {
assert(self->bitSet.test(index));
return self->children[index];
auto &result = self->children[index];
simCacheMiss(result);
return result;
}
// Precondition - an entry for index must exist in the node
@@ -1030,6 +1049,7 @@ ChildAndMaxVersion getChildAndMaxVersion(Node3 *self, uint8_t index) {
if (i < 0) {
return {};
}
simCacheMiss(self->children[i]);
return {self->children[i], self->childMaxVersion[i]};
}
ChildAndMaxVersion getChildAndMaxVersion(Node16 *self, uint8_t index) {
@@ -1037,6 +1057,7 @@ ChildAndMaxVersion getChildAndMaxVersion(Node16 *self, uint8_t index) {
if (i < 0) {
return {};
}
simCacheMiss(self->children[i]);
return {self->children[i], self->childMaxVersion[i]};
}
ChildAndMaxVersion getChildAndMaxVersion(Node48 *self, uint8_t index) {
@@ -1044,9 +1065,11 @@ ChildAndMaxVersion getChildAndMaxVersion(Node48 *self, uint8_t index) {
if (i < 0) {
return {};
}
simCacheMiss(self->children[i]);
return {self->children[i], self->childMaxVersion[i]};
}
ChildAndMaxVersion getChildAndMaxVersion(Node256 *self, uint8_t index) {
simCacheMiss(self->children[index]);
return {self->children[index], self->childMaxVersion[index]};
}
@@ -1072,6 +1095,7 @@ Node *getChildGeq(Node0 *, int) { return nullptr; }
Node *getChildGeq(Node3 *n, int child) {
for (int i = 0; i < n->numChildren; ++i) {
if (n->index[i] >= child) {
simCacheMiss(n->children[i]);
return n->children[i];
}
}
@@ -1090,7 +1114,10 @@ Node *getChildGeq(Node16 *self, int child) {
__m128i results = _mm_cmpeq_epi8(key_vec, _mm_min_epu8(key_vec, indices));
int mask = (1 << self->numChildren) - 1;
uint32_t bitfield = _mm_movemask_epi8(results) & mask;
return bitfield == 0 ? nullptr : self->children[std::countr_zero(bitfield)];
auto *result =
bitfield == 0 ? nullptr : self->children[std::countr_zero(bitfield)];
simCacheMiss(result);
return result;
#elif defined(HAS_ARM_NEON)
uint8x16_t indices;
memcpy(&indices, self->index, sizeof(self->index));
@@ -1126,13 +1153,16 @@ Node *getChildGeq(Node48 *self, int child) {
if (c < 0) {
return nullptr;
}
return self->children[self->index[c]];
auto *result = self->children[self->index[c]];
simCacheMiss(result);
return result;
}
Node *getChildGeq(Node256 *self, int child) {
int c = self->bitSet.firstSetGeq(child);
if (c < 0) {
return nullptr;
}
simCacheMiss(self->children[c]);
return self->children[c];
}
@@ -1156,20 +1186,26 @@ Node *getChildGeq(Node *self, int child) {
// Precondition: self has a child
Node *getFirstChildExists(Node3 *self) {
assert(self->numChildren > 0);
simCacheMiss(self->children[0]);
return self->children[0];
}
// Precondition: self has a child
Node *getFirstChildExists(Node16 *self) {
assert(self->numChildren > 0);
simCacheMiss(self->children[0]);
return self->children[0];
}
// Precondition: self has a child
Node *getFirstChildExists(Node48 *self) {
return self->children[self->index[self->bitSet.firstSetGeq(0)]];
auto *result = self->children[self->index[self->bitSet.firstSetGeq(0)]];
simCacheMiss(result);
return result;
}
// Precondition: self has a child
Node *getFirstChildExists(Node256 *self) {
return self->children[self->bitSet.firstSetGeq(0)];
auto *result = self->children[self->bitSet.firstSetGeq(0)];
simCacheMiss(result);
return result;
}
// Precondition: self has a child
@@ -3009,34 +3045,288 @@ Node *firstGeqPhysical(Node *n, const std::span<const uint8_t> key) {
}
}
#ifndef __has_attribute
#define __has_attribute(x) 0
#endif
#if __has_attribute(musttail)
#define MUSTTAIL __attribute__((musttail))
#else
#define MUSTTAIL
#endif
#if __has_attribute(preserve_none)
#define CONTINUATION_CALLING_CONVENTION __attribute__((preserve_none))
#else
#define CONTINUATION_CALLING_CONVENTION
#endif
typedef CONTINUATION_CALLING_CONVENTION void (*continuation)(struct CheckAll *,
int64_t prevJob,
int64_t job,
int64_t started,
int64_t count);
// State relevant to a particular query
struct CheckJob {
void setResult(bool ok) {
*result = ok ? ConflictSet::Commit : ConflictSet::Conflict;
}
[[nodiscard]] continuation init(const ConflictSet::ReadRange *read,
ConflictSet::Result *result, Node *root,
int64_t oldestVersionFullPrecision,
ReadContext *tls);
Node *n;
ChildAndMaxVersion childAndVersion;
std::span<const uint8_t> begin;
InternalVersionT readVersion;
ConflictSet::Result *result;
};
// State relevant to all queries
struct CheckAll {
constexpr static int kConcurrent = 32;
CheckJob inProgress[kConcurrent];
continuation next[kConcurrent];
int nextJob[kConcurrent];
Node *root;
int64_t oldestVersionFullPrecision;
ReadContext *tls;
const ConflictSet::ReadRange *queries;
ConflictSet::Result *results;
};
CONTINUATION_CALLING_CONVENTION void keepGoing(CheckAll *context,
int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->next[job](context, prevJob, job, started, count);
}
CONTINUATION_CALLING_CONVENTION void complete(CheckAll *context,
int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
if (started == count) {
if (prevJob == job) {
return;
}
context->nextJob[prevJob] = context->nextJob[job];
job = prevJob;
} else {
int temp = started++;
context->next[job] = context->inProgress[job].init(
context->queries + temp, context->results + temp, context->root,
context->oldestVersionFullPrecision, context->tls);
}
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
namespace check_point_read_state_machine {
CONTINUATION_CALLING_CONVENTION void
down_left_spine(struct CheckAll *, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
CONTINUATION_CALLING_CONVENTION void iter(struct CheckAll *, int64_t prevJob,
int64_t job, int64_t started,
int64_t count);
CONTINUATION_CALLING_CONVENTION void begin(struct CheckAll *, int64_t prevJob,
int64_t job, int64_t started,
int64_t count);
void begin(struct CheckAll *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
++context->tls->point_read_accum;
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
#endif
auto *j = context->inProgress + job;
if (j->begin.size() == 0) {
if (j->n->entryPresent) {
j->setResult(j->n->entry.pointVersion <= j->readVersion);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
j->n = getFirstChildExists(j->n);
context->next[job] = down_left_spine;
__builtin_prefetch(j->n);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
j->childAndVersion = getChildAndMaxVersion(j->n, j->begin[0]);
context->next[job] = iter;
__builtin_prefetch(j->childAndVersion.child);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
void iter(struct CheckAll *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
auto *j = context->inProgress + job;
if (j->childAndVersion.child == nullptr) {
auto c = getChildGeq(j->n, j->begin[0]);
if (c != nullptr) {
j->n = c;
context->next[job] = down_left_spine;
__builtin_prefetch(j->n);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
} else {
j->n = nextSibling(j->n);
if (j->n == nullptr) {
j->setResult(true);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
context->next[job] = down_left_spine;
__builtin_prefetch(j->n);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
}
j->n = j->childAndVersion.child;
j->begin = j->begin.subspan(1, j->begin.size() - 1);
if (j->n->partialKeyLen > 0) {
int commonLen = std::min<int>(j->n->partialKeyLen, j->begin.size());
int i = longestCommonPrefix(j->n->partialKey(), j->begin.data(), commonLen);
if (i < commonLen) {
auto c = j->n->partialKey()[i] <=> j->begin[i];
if (c > 0) {
context->next[job] = down_left_spine;
MUSTTAIL return down_left_spine(context, prevJob, job, started, count);
} else {
j->n = nextSibling(j->n);
if (j->n == nullptr) {
j->setResult(true);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
context->next[job] = down_left_spine;
__builtin_prefetch(j->n);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
}
if (commonLen == j->n->partialKeyLen) {
// partial key matches
j->begin = j->begin.subspan(commonLen, j->begin.size() - commonLen);
} else if (j->n->partialKeyLen > int(j->begin.size())) {
// n is the first physical node greater than remaining, and there's no
// eq node
context->next[job] = down_left_spine;
MUSTTAIL return down_left_spine(context, prevJob, job, started, count);
}
}
if (j->childAndVersion.maxVersion <= j->readVersion) {
++context->tls->point_read_short_circuit_accum;
j->setResult(true);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
++context->tls->point_read_iterations_accum;
if (j->begin.size() == 0) {
if (j->n->entryPresent) {
j->setResult(j->n->entry.pointVersion <= j->readVersion);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
j->n = getFirstChildExists(j->n);
context->next[job] = down_left_spine;
__builtin_prefetch(j->n);
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
j->childAndVersion = getChildAndMaxVersion(j->n, j->begin[0]);
__builtin_prefetch(j->childAndVersion.child);
// j->next is already iter
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
void down_left_spine(struct CheckAll *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
auto *j = context->inProgress + job;
if (j->n->entryPresent) {
j->setResult(j->n->entry.rangeVersion <= j->readVersion);
MUSTTAIL return complete(context, prevJob, job, started, count);
}
j->n = getFirstChildExists(j->n);
__builtin_prefetch(j->n);
// j->next is already down_left_spine
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
} // namespace check_point_read_state_machine
continuation CheckJob::init(const ConflictSet::ReadRange *read,
ConflictSet::Result *result, Node *root,
int64_t oldestVersionFullPrecision,
ReadContext *tls) {
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
if (read->readVersion < oldestVersionFullPrecision) {
*result = ConflictSet::TooOld;
return complete;
} else if (end.size() == 0) {
this->begin = begin;
this->n = root;
this->readVersion = InternalVersionT(read->readVersion);
this->result = result;
return check_point_read_state_machine::begin;
// *result =
// checkPointRead(root, begin, InternalVersionT(read->readVersion), tls)
// ? ConflictSet::Commit
// : ConflictSet::Conflict;
// return complete;
} else {
*result = checkRangeRead(root, begin, end,
InternalVersionT(read->readVersion), tls)
? ConflictSet::Commit
: ConflictSet::Conflict;
return complete;
}
}
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
void check(const ReadRange *reads, Result *result, int count) {
assert(oldestVersionFullPrecision >=
newestVersionFullPrecision - kNominalVersionWindow);
if (count == 0) {
return;
}
ReadContext tls;
tls.impl = this;
int64_t check_byte_accum = 0;
CheckAll context;
context.oldestVersionFullPrecision = oldestVersionFullPrecision;
context.queries = reads;
context.results = result;
context.root = root;
context.tls = &tls;
int64_t started = std::min(context.kConcurrent, count);
for (int i = 0; i < started; i++) {
context.next[i] = context.inProgress[i].init(
reads + i, result + i, root, oldestVersionFullPrecision, &tls);
context.nextJob[i] = i + 1;
}
context.nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
context.next[job](&context, prevJob, job, started, count);
for (int i = 0; i < count; ++i) {
assert(reads[i].readVersion >= 0);
assert(reads[i].readVersion <= newestVersionFullPrecision);
const auto &r = reads[i];
check_byte_accum += r.begin.len + r.end.len;
auto begin = std::span<const uint8_t>(r.begin.p, r.begin.len);
auto end = std::span<const uint8_t>(r.end.p, r.end.len);
assert(oldestVersionFullPrecision >=
newestVersionFullPrecision - kNominalVersionWindow);
result[i] =
reads[i].readVersion < oldestVersionFullPrecision ? TooOld
: (end.size() > 0
? checkRangeRead(root, begin, end,
InternalVersionT(reads[i].readVersion), &tls)
: checkPointRead(root, begin,
InternalVersionT(reads[i].readVersion), &tls))
? Commit
: Conflict;
tls.commits_accum += result[i] == Commit;
tls.conflicts_accum += result[i] == Conflict;
tls.too_olds_accum += result[i] == TooOld;
}
point_read_total.add(tls.point_read_accum);
prefix_read_total.add(tls.prefix_read_accum);
range_read_total.add(tls.range_read_accum);
@@ -3923,7 +4213,6 @@ struct __attribute__((visibility("default"))) PeakPrinter {
#ifdef ENABLE_MAIN
#define ANKERL_NANOBENCH_IMPLEMENT
#include "third_party/nanobench.h"
template <int kN> void benchRezero() {

256
InterleavingTest.cpp Normal file
View File

@@ -0,0 +1,256 @@
#include <alloca.h>
#include <cassert>
#ifdef __x86_64__
#include <immintrin.h>
#endif
#include "third_party/nanobench.h"
struct Job {
int *input;
// Returned void* is a function pointer to the next continuation. We have to
// use void* because otherwise the type would be recursive.
typedef void *(*continuation)(Job *);
continuation next;
};
void *stepJob(Job *j) {
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
return done ? nullptr : (void *)stepJob;
}
void sequential(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
do {
jobs[i]->next = (Job::continuation)jobs[i]->next(jobs[i]);
} while (jobs[i]->next);
}
}
void sequentialNoFuncPtr(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
while (stepJob(jobs[i]))
;
}
}
void interleaveSwapping(Job **jobs, int remaining) {
int current = 0;
while (remaining > 0) {
auto next = (Job::continuation)jobs[current]->next(jobs[current]);
jobs[current]->next = next;
if (next == nullptr) {
jobs[current] = jobs[remaining - 1];
--remaining;
} else {
++current;
}
if (current == remaining) {
current = 0;
}
}
}
void interleaveBoundedCyclicList(Job **jobs, int count) {
if (count == 0) {
return;
}
constexpr int kConcurrent = 32;
Job *inProgress[kConcurrent];
int nextJob[kConcurrent];
int started = std::min(kConcurrent, count);
for (int i = 0; i < started; i++) {
inProgress[i] = jobs[i];
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)inProgress[job]->next(inProgress[job]);
inProgress[job]->next = next;
if (next == nullptr) {
if (started == count) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else {
int temp = started++;
inProgress[job] = jobs[temp];
}
}
prevJob = job;
job = nextJob[job];
}
}
#ifndef __has_attribute
#define __has_attribute(x) 0
#endif
#if __has_attribute(musttail)
#define MUSTTAIL __attribute__((musttail))
#else
#define MUSTTAIL
#endif
struct Context {
constexpr static int kConcurrent = 32;
Job **jobs;
Job *inProgress[kConcurrent];
void (*continuation[kConcurrent])(Context *, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
int nextJob[kConcurrent];
};
void keepGoing(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
void complete(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
if (started == count) {
if (prevJob == job) {
return;
}
context->nextJob[prevJob] = context->nextJob[job];
job = prevJob;
} else {
context->inProgress[job] = context->jobs[started++];
context->continuation[job] = stepJobTailCall;
}
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
auto *j = context->inProgress[job];
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
if (done) {
MUSTTAIL return complete(context, prevJob, job, started, count);
} else {
context->continuation[job] = stepJobTailCall;
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
}
void useTailCalls(Job **jobs, int count) {
if (count == 0) {
return;
}
Context context;
context.jobs = jobs;
int64_t started = std::min(Context::kConcurrent, count);
for (int i = 0; i < started; i++) {
context.inProgress[i] = jobs[i];
context.nextJob[i] = i + 1;
context.continuation[i] = stepJobTailCall;
}
context.nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
return context.continuation[job](&context, prevJob, job, started, count);
}
void interleaveCyclicList(Job **jobs, int count) {
auto *nextJob = (int *)alloca(sizeof(int) * count);
for (int i = 0; i < count - 1; ++i) {
nextJob[i] = i + 1;
}
nextJob[count - 1] = 0;
int prevJob = count - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)jobs[job]->next(jobs[job]);
jobs[job]->next = next;
if (next == nullptr) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
}
prevJob = job;
job = nextJob[job];
}
}
int main() {
ankerl::nanobench::Bench bench;
constexpr int kNumJobs = 10000;
bench.relative(true);
Job jobs[kNumJobs];
Job jobsCopy[kNumJobs];
int iters = 0;
int originalInput[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
originalInput[i] = rand() % 5 + 3;
jobs[i].input = new int{originalInput[i]};
jobs[i].next = stepJob;
iters += *jobs[i].input;
}
bench.batch(iters);
for (auto [scheduler, name] :
{std::make_pair(sequentialNoFuncPtr, "sequentialNoFuncPtr"),
std::make_pair(sequential, "sequential"),
std::make_pair(useTailCalls, "useTailCalls"),
std::make_pair(interleaveSwapping, "interleavingSwapping"),
std::make_pair(interleaveBoundedCyclicList,
"interleaveBoundedCyclicList"),
std::make_pair(interleaveCyclicList, "interleaveCyclicList")}) {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
for (int i = 0; i < kNumJobs; ++i) {
if (*jobsCopy[i].input != 0) {
fprintf(stderr, "%s failed\n", name);
abort();
}
}
bench.run(name, [&]() {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
});
}
for (int i = 0; i < kNumJobs; ++i) {
delete jobs[i].input;
}
}

View File

@@ -1,4 +1,5 @@
#include <atomic>
#include <cstdint>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
@@ -21,78 +22,55 @@
std::atomic<int64_t> transactions;
constexpr int kBaseSearchDepth = 115;
constexpr int kWindowSize = 10000000;
std::string numToKey(int64_t num) {
constexpr int kNumPrefixes = 250000;
std::string makeKey(int64_t num, int suffixLen) {
std::string result;
result.resize(kBaseSearchDepth + sizeof(int64_t));
memset(result.data(), 0, kBaseSearchDepth);
result.resize(sizeof(int64_t) + suffixLen);
int64_t be = __builtin_bswap64(num);
memcpy(result.data() + kBaseSearchDepth, &be, sizeof(int64_t));
memcpy(result.data(), &be, sizeof(int64_t));
memset(result.data() + sizeof(int64_t), 0, suffixLen);
return result;
}
void workload(weaselab::ConflictSet *cs) {
int64_t version = kWindowSize;
cs->addWrites(nullptr, 0, version);
for (int i = 0; i < kNumPrefixes; ++i) {
for (int j = 0; j < 50; ++j) {
weaselab::ConflictSet::WriteRange wr;
auto k = makeKey(i, j);
wr.begin.p = (const uint8_t *)k.data();
wr.begin.len = k.size();
wr.end.len = 0;
cs->addWrites(&wr, 1, version);
}
}
++version;
for (int i = 0; i < kNumPrefixes; ++i) {
weaselab::ConflictSet::WriteRange wr;
auto k = makeKey(i, 50);
wr.begin.p = (const uint8_t *)k.data();
wr.begin.len = k.size();
wr.end.len = 0;
cs->addWrites(&wr, 1, version);
}
std::vector<weaselab::ConflictSet::Result> results(10);
for (;; transactions.fetch_add(1, std::memory_order_relaxed)) {
// Reads
{
auto beginK = numToKey(version - kWindowSize);
auto endK = numToKey(version - 1);
auto pointRv = version - kWindowSize + rand() % kWindowSize + 1;
auto pointK = numToKey(pointRv);
weaselab::ConflictSet::ReadRange reads[] = {
{
{(const uint8_t *)pointK.data(), int(pointK.size())},
{nullptr, 0},
pointRv,
},
{
{(const uint8_t *)beginK.data(), int(beginK.size())},
{(const uint8_t *)endK.data(), int(endK.size())},
version - 2,
},
};
weaselab::ConflictSet::Result result[sizeof(reads) / sizeof(reads[0])];
cs->check(reads, result, sizeof(reads) / sizeof(reads[0]));
// for (int i = 0; i < sizeof(reads) / sizeof(reads[0]); ++i) {
// if (result[i] != weaselab::ConflictSet::Commit) {
// fprintf(stderr, "Unexpected conflict: [%s, %s) @ %" PRId64 "\n",
// printable(reads[i].begin).c_str(),
// printable(reads[i].end).c_str(), reads[i].readVersion);
// abort();
// }
// }
std::vector<std::string> keys(10);
for (auto &k : keys) {
k = makeKey(rand() % kNumPrefixes, 49);
}
// Writes
{
weaselab::ConflictSet::WriteRange w;
auto k = numToKey(version);
w.begin.p = (const uint8_t *)k.data();
w.end.len = 0;
if (version % (kWindowSize / 2) == 0) {
for (int l = 0; l <= k.size(); ++l) {
w.begin.len = l;
cs->addWrites(&w, 1, version);
}
} else {
w.begin.len = k.size();
cs->addWrites(&w, 1, version);
int64_t beginN = version - kWindowSize + rand() % kWindowSize;
auto b = numToKey(beginN);
auto e = numToKey(beginN + 1000);
w.begin.p = (const uint8_t *)b.data();
w.begin.len = b.size();
w.end.p = (const uint8_t *)e.data();
w.end.len = e.size();
cs->addWrites(&w, 1, version);
}
std::vector<weaselab::ConflictSet::ReadRange> reads(10);
for (int i = 0; i < reads.size(); ++i) {
reads[i].begin.p = (const uint8_t *)(keys[i].data());
reads[i].begin.len = keys[i].size();
reads[i].end.len = 0;
reads[i].readVersion = version - 1;
}
// GC
cs->setOldestVersion(version - kWindowSize);
++version;
cs->check(reads.data(), results.data(), 10);
}
}

2
nanobench.cpp Normal file
View File

@@ -0,0 +1,2 @@
#define ANKERL_NANOBENCH_IMPLEMENT
#include "third_party/nanobench.h"