diff --git a/ConflictSet.cpp b/ConflictSet.cpp index 495da63..b6d4469 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -3009,34 +3009,86 @@ Node *firstGeqPhysical(Node *n, const std::span key) { } } +struct CheckJob { + // Returned void* is a function pointer to the next continuation. We have to + // use void* because otherwise the type would be recursive. + typedef void *(*continuation)(CheckJob *); + continuation next; + void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result, + Node *root, int64_t oldestVersionFullPrecision, ReadContext *tls) { + auto begin = std::span(read->begin.p, read->begin.len); + auto end = std::span(read->end.p, read->end.len); + *result = read->readVersion < oldestVersionFullPrecision + ? ConflictSet::TooOld + : (end.size() > 0 + ? checkRangeRead(root, begin, end, + InternalVersionT(read->readVersion), tls) + : checkPointRead(root, begin, + InternalVersionT(read->readVersion), tls)) + ? ConflictSet::Commit + : ConflictSet::Conflict; + next = +[](CheckJob *) -> void * { return nullptr; }; + } +}; + struct __attribute__((visibility("hidden"))) ConflictSet::Impl { void check(const ReadRange *reads, Result *result, int count) { + assert(oldestVersionFullPrecision >= + newestVersionFullPrecision - kNominalVersionWindow); + + if (count == 0) { + return; + } + ReadContext tls; tls.impl = this; int64_t check_byte_accum = 0; + + constexpr int kConcurrent = 32; + CheckJob inProgress[kConcurrent]; + int nextJob[kConcurrent]; + + int started = std::min(kConcurrent, count); + for (int i = 0; i < started; i++) { + inProgress[i].init(reads + i, result + i, root, + oldestVersionFullPrecision, &tls); + nextJob[i] = i + 1; + } + nextJob[started - 1] = 0; + + int prevJob = started - 1; + int job = 0; + for (;;) { + auto next = + (CheckJob::continuation)inProgress[job].next(inProgress + job); + inProgress[job].next = next; + if (next == nullptr) { + if (started == count) { + if (prevJob == job) + break; + nextJob[prevJob] = nextJob[job]; + job = prevJob; + } else { + int temp = started++; + inProgress[job].init(reads + temp, result + temp, root, + oldestVersionFullPrecision, &tls); + } + } + prevJob = job; + job = nextJob[job]; + } + for (int i = 0; i < count; ++i) { assert(reads[i].readVersion >= 0); assert(reads[i].readVersion <= newestVersionFullPrecision); const auto &r = reads[i]; check_byte_accum += r.begin.len + r.end.len; - auto begin = std::span(r.begin.p, r.begin.len); - auto end = std::span(r.end.p, r.end.len); - assert(oldestVersionFullPrecision >= - newestVersionFullPrecision - kNominalVersionWindow); - result[i] = - reads[i].readVersion < oldestVersionFullPrecision ? TooOld - : (end.size() > 0 - ? checkRangeRead(root, begin, end, - InternalVersionT(reads[i].readVersion), &tls) - : checkPointRead(root, begin, - InternalVersionT(reads[i].readVersion), &tls)) - ? Commit - : Conflict; tls.commits_accum += result[i] == Commit; tls.conflicts_accum += result[i] == Conflict; tls.too_olds_accum += result[i] == TooOld; } + point_read_total.add(tls.point_read_accum); prefix_read_total.add(tls.prefix_read_accum); range_read_total.add(tls.range_read_accum);