First continuation for interleaved check range read
This commit is contained in:
@@ -3240,10 +3240,11 @@ struct CheckJob {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result,
|
void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result,
|
||||||
Node *root, int64_t oldestVersionFullPrecision, ReadContext *tls);
|
Node *root, int64_t oldestVersionFullPrecision);
|
||||||
|
|
||||||
Node *n;
|
Node *n;
|
||||||
std::span<const uint8_t> begin;
|
std::span<const uint8_t> begin;
|
||||||
|
std::span<const uint8_t> end; // range read only
|
||||||
InternalVersionT readVersion;
|
InternalVersionT readVersion;
|
||||||
ConflictSet::Result *result;
|
ConflictSet::Result *result;
|
||||||
Continuation continuation;
|
Continuation continuation;
|
||||||
@@ -3290,7 +3291,7 @@ FLATTEN PRESERVE_NONE void complete(CheckJob *job, CheckContext *context) {
|
|||||||
} else {
|
} else {
|
||||||
int temp = context->started++;
|
int temp = context->started++;
|
||||||
job->init(context->queries + temp, context->results + temp, context->root,
|
job->init(context->queries + temp, context->results + temp, context->root,
|
||||||
context->oldestVersionFullPrecision, context->tls);
|
context->oldestVersionFullPrecision);
|
||||||
}
|
}
|
||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
@@ -3428,9 +3429,40 @@ void down_left_spine(CheckJob *job, CheckContext *context) {
|
|||||||
|
|
||||||
} // namespace check_point_read_state_machine
|
} // namespace check_point_read_state_machine
|
||||||
|
|
||||||
|
namespace check_range_read_state_machine {
|
||||||
|
FLATTEN PRESERVE_NONE void begin(CheckJob *, CheckContext *);
|
||||||
|
|
||||||
|
FLATTEN PRESERVE_NONE void begin(CheckJob *job, CheckContext *context) {
|
||||||
|
int lcp = longestCommonPrefix(job->begin.data(), job->end.data(),
|
||||||
|
std::min(job->begin.size(), job->end.size()));
|
||||||
|
if (lcp == int(job->begin.size()) &&
|
||||||
|
job->end.size() == job->begin.size() + 1 && job->end.back() == 0) {
|
||||||
|
job->continuation = check_point_read_state_machine::begin;
|
||||||
|
// Call directly since we have nothing to prefetch
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
|
}
|
||||||
|
if (lcp == int(job->begin.size() - 1) &&
|
||||||
|
job->end.size() == job->begin.size() &&
|
||||||
|
int(job->begin.back()) + 1 == int(job->end.back())) {
|
||||||
|
*job->result =
|
||||||
|
checkPrefixRead(job->n, job->begin, job->readVersion, context->tls)
|
||||||
|
? ConflictSet::Commit
|
||||||
|
: ConflictSet::Conflict;
|
||||||
|
return complete(job, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
*job->result = checkRangeRead(job->n, job->begin, job->end, job->readVersion,
|
||||||
|
context->tls)
|
||||||
|
? ConflictSet::Commit
|
||||||
|
: ConflictSet::Conflict;
|
||||||
|
return complete(job, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace check_range_read_state_machine
|
||||||
|
|
||||||
void CheckJob::init(const ConflictSet::ReadRange *read,
|
void CheckJob::init(const ConflictSet::ReadRange *read,
|
||||||
ConflictSet::Result *result, Node *root,
|
ConflictSet::Result *result, Node *root,
|
||||||
int64_t oldestVersionFullPrecision, ReadContext *tls) {
|
int64_t oldestVersionFullPrecision) {
|
||||||
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
|
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
|
||||||
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
|
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
|
||||||
if (read->readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
if (read->readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
||||||
@@ -3443,11 +3475,12 @@ void CheckJob::init(const ConflictSet::ReadRange *read,
|
|||||||
this->result = result;
|
this->result = result;
|
||||||
continuation = check_point_read_state_machine::begin;
|
continuation = check_point_read_state_machine::begin;
|
||||||
} else {
|
} else {
|
||||||
*result = checkRangeRead(root, begin, end,
|
this->begin = begin;
|
||||||
InternalVersionT(read->readVersion), tls)
|
this->end = end;
|
||||||
? ConflictSet::Commit
|
this->n = root;
|
||||||
: ConflictSet::Conflict;
|
this->readVersion = InternalVersionT(read->readVersion);
|
||||||
continuation = complete;
|
this->result = result;
|
||||||
|
continuation = check_range_read_state_machine::begin;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -3477,7 +3510,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
context.started = started;
|
context.started = started;
|
||||||
for (int i = 0; i < started; i++) {
|
for (int i = 0; i < started; i++) {
|
||||||
inProgress[i].init(reads + i, result + i, root,
|
inProgress[i].init(reads + i, result + i, root,
|
||||||
oldestVersionFullPrecision, &tls);
|
oldestVersionFullPrecision);
|
||||||
}
|
}
|
||||||
for (int i = 0; i < started - 1; i++) {
|
for (int i = 0; i < started - 1; i++) {
|
||||||
inProgress[i].next = inProgress + i + 1;
|
inProgress[i].next = inProgress + i + 1;
|
||||||
|
Reference in New Issue
Block a user