From 8b1a0afc5870e2a6e2627627dfaeb64503cafad0 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Fri, 11 Oct 2024 15:45:25 -0700 Subject: [PATCH] First continuation for interleaved check range read --- ConflictSet.cpp | 51 ++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/ConflictSet.cpp b/ConflictSet.cpp index 4c418c0..ac30fce 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -3240,10 +3240,11 @@ struct CheckJob { } void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result, - Node *root, int64_t oldestVersionFullPrecision, ReadContext *tls); + Node *root, int64_t oldestVersionFullPrecision); Node *n; std::span begin; + std::span end; // range read only InternalVersionT readVersion; ConflictSet::Result *result; Continuation continuation; @@ -3290,7 +3291,7 @@ FLATTEN PRESERVE_NONE void complete(CheckJob *job, CheckContext *context) { } else { int temp = context->started++; job->init(context->queries + temp, context->results + temp, context->root, - context->oldestVersionFullPrecision, context->tls); + context->oldestVersionFullPrecision); } MUSTTAIL return keepGoing(job, context); } @@ -3428,9 +3429,40 @@ void down_left_spine(CheckJob *job, CheckContext *context) { } // namespace check_point_read_state_machine +namespace check_range_read_state_machine { +FLATTEN PRESERVE_NONE void begin(CheckJob *, CheckContext *); + +FLATTEN PRESERVE_NONE void begin(CheckJob *job, CheckContext *context) { + int lcp = longestCommonPrefix(job->begin.data(), job->end.data(), + std::min(job->begin.size(), job->end.size())); + if (lcp == int(job->begin.size()) && + job->end.size() == job->begin.size() + 1 && job->end.back() == 0) { + job->continuation = check_point_read_state_machine::begin; + // Call directly since we have nothing to prefetch + MUSTTAIL return job->continuation(job, context); + } + if (lcp == int(job->begin.size() - 1) && + job->end.size() == job->begin.size() && + int(job->begin.back()) + 1 == int(job->end.back())) { + *job->result = + checkPrefixRead(job->n, job->begin, job->readVersion, context->tls) + ? ConflictSet::Commit + : ConflictSet::Conflict; + return complete(job, context); + } + + *job->result = checkRangeRead(job->n, job->begin, job->end, job->readVersion, + context->tls) + ? ConflictSet::Commit + : ConflictSet::Conflict; + return complete(job, context); +} + +} // namespace check_range_read_state_machine + void CheckJob::init(const ConflictSet::ReadRange *read, ConflictSet::Result *result, Node *root, - int64_t oldestVersionFullPrecision, ReadContext *tls) { + int64_t oldestVersionFullPrecision) { auto begin = std::span(read->begin.p, read->begin.len); auto end = std::span(read->end.p, read->end.len); if (read->readVersion < oldestVersionFullPrecision) [[unlikely]] { @@ -3443,11 +3475,12 @@ void CheckJob::init(const ConflictSet::ReadRange *read, this->result = result; continuation = check_point_read_state_machine::begin; } else { - *result = checkRangeRead(root, begin, end, - InternalVersionT(read->readVersion), tls) - ? ConflictSet::Commit - : ConflictSet::Conflict; - continuation = complete; + this->begin = begin; + this->end = end; + this->n = root; + this->readVersion = InternalVersionT(read->readVersion); + this->result = result; + continuation = check_range_read_state_machine::begin; } } @@ -3477,7 +3510,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { context.started = started; for (int i = 0; i < started; i++) { inProgress[i].init(reads + i, result + i, root, - oldestVersionFullPrecision, &tls); + oldestVersionFullPrecision); } for (int i = 0; i < started - 1; i++) { inProgress[i].next = inProgress + i + 1;