Scaffolding to prepare for interleaving checks

This commit is contained in:
2024-09-10 16:10:57 -07:00
parent 867136ff1b
commit 465372c734

View File

@@ -3009,34 +3009,86 @@ Node *firstGeqPhysical(Node *n, const std::span<const uint8_t> key) {
}
}
struct CheckJob {
// Returned void* is a function pointer to the next continuation. We have to
// use void* because otherwise the type would be recursive.
typedef void *(*continuation)(CheckJob *);
continuation next;
void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result,
Node *root, int64_t oldestVersionFullPrecision, ReadContext *tls) {
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
*result = read->readVersion < oldestVersionFullPrecision
? ConflictSet::TooOld
: (end.size() > 0
? checkRangeRead(root, begin, end,
InternalVersionT(read->readVersion), tls)
: checkPointRead(root, begin,
InternalVersionT(read->readVersion), tls))
? ConflictSet::Commit
: ConflictSet::Conflict;
next = +[](CheckJob *) -> void * { return nullptr; };
}
};
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
void check(const ReadRange *reads, Result *result, int count) {
assert(oldestVersionFullPrecision >=
newestVersionFullPrecision - kNominalVersionWindow);
if (count == 0) {
return;
}
ReadContext tls;
tls.impl = this;
int64_t check_byte_accum = 0;
constexpr int kConcurrent = 32;
CheckJob inProgress[kConcurrent];
int nextJob[kConcurrent];
int started = std::min(kConcurrent, count);
for (int i = 0; i < started; i++) {
inProgress[i].init(reads + i, result + i, root,
oldestVersionFullPrecision, &tls);
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
for (;;) {
auto next =
(CheckJob::continuation)inProgress[job].next(inProgress + job);
inProgress[job].next = next;
if (next == nullptr) {
if (started == count) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else {
int temp = started++;
inProgress[job].init(reads + temp, result + temp, root,
oldestVersionFullPrecision, &tls);
}
}
prevJob = job;
job = nextJob[job];
}
for (int i = 0; i < count; ++i) {
assert(reads[i].readVersion >= 0);
assert(reads[i].readVersion <= newestVersionFullPrecision);
const auto &r = reads[i];
check_byte_accum += r.begin.len + r.end.len;
auto begin = std::span<const uint8_t>(r.begin.p, r.begin.len);
auto end = std::span<const uint8_t>(r.end.p, r.end.len);
assert(oldestVersionFullPrecision >=
newestVersionFullPrecision - kNominalVersionWindow);
result[i] =
reads[i].readVersion < oldestVersionFullPrecision ? TooOld
: (end.size() > 0
? checkRangeRead(root, begin, end,
InternalVersionT(reads[i].readVersion), &tls)
: checkPointRead(root, begin,
InternalVersionT(reads[i].readVersion), &tls))
? Commit
: Conflict;
tls.commits_accum += result[i] == Commit;
tls.conflicts_accum += result[i] == Conflict;
tls.too_olds_accum += result[i] == TooOld;
}
point_read_total.add(tls.point_read_accum);
prefix_read_total.add(tls.prefix_read_accum);
range_read_total.add(tls.range_read_accum);