Compare commits
3 Commits
cf-integri
...
fe5cfb0336
Author | SHA1 | Date | |
---|---|---|---|
fe5cfb0336 | |||
82203515a0 | |||
465372c734 |
195
ConflictSet.cpp
195
ConflictSet.cpp
@@ -3009,34 +3009,203 @@ Node *firstGeqPhysical(Node *n, const std::span<const uint8_t> key) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct CheckJob {
|
||||||
|
Node *n;
|
||||||
|
std::span<const uint8_t> begin;
|
||||||
|
InternalVersionT readVersion;
|
||||||
|
ReadContext *tls;
|
||||||
|
ConflictSet::Result *result;
|
||||||
|
|
||||||
|
void setResult(bool ok) {
|
||||||
|
*result = ok ? ConflictSet::Commit : ConflictSet::Conflict;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef void (*typeErasedContinuation)(void *);
|
||||||
|
|
||||||
|
// The type of a function that takes a CheckJob* and returns its own type
|
||||||
|
struct continuation {
|
||||||
|
typedef continuation (*functionPtrType)(CheckJob *);
|
||||||
|
functionPtrType func;
|
||||||
|
continuation operator()(CheckJob *job) { return func(job); }
|
||||||
|
/*implicit*/ continuation(functionPtrType func) : func(func) {}
|
||||||
|
continuation() = default;
|
||||||
|
operator bool() { return func != nullptr; }
|
||||||
|
};
|
||||||
|
|
||||||
|
continuation next;
|
||||||
|
void init(const ConflictSet::ReadRange *read, ConflictSet::Result *result,
|
||||||
|
Node *root, int64_t oldestVersionFullPrecision, ReadContext *tls);
|
||||||
|
};
|
||||||
|
|
||||||
|
namespace check_point_read_state_machine {
|
||||||
|
|
||||||
|
CheckJob::continuation down_left_spine(CheckJob *job);
|
||||||
|
|
||||||
|
// Logically this is the same as performing firstGeq and then checking against
|
||||||
|
// point or range version according to cmp, but this version short circuits as
|
||||||
|
// soon as it can prove that there's no conflict.
|
||||||
|
CheckJob::continuation begin(CheckJob *job) {
|
||||||
|
++job->tls->point_read_accum;
|
||||||
|
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
||||||
|
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
|
||||||
|
#endif
|
||||||
|
for (;; ++job->tls->point_read_iterations_accum) {
|
||||||
|
if (job->begin.size() == 0) {
|
||||||
|
if (job->n->entryPresent) {
|
||||||
|
job->setResult(job->n->entry.pointVersion <= job->readVersion);
|
||||||
|
return nullptr; // Done
|
||||||
|
}
|
||||||
|
job->n = getFirstChildExists(job->n);
|
||||||
|
return down_left_spine;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto [child, maxV] = getChildAndMaxVersion(job->n, job->begin[0]);
|
||||||
|
if (child == nullptr) {
|
||||||
|
auto c = getChildGeq(job->n, job->begin[0]);
|
||||||
|
if (c != nullptr) {
|
||||||
|
job->n = c;
|
||||||
|
return down_left_spine;
|
||||||
|
} else {
|
||||||
|
job->n = nextSibling(job->n);
|
||||||
|
if (job->n == nullptr) {
|
||||||
|
job->setResult(true);
|
||||||
|
return nullptr; // Done
|
||||||
|
}
|
||||||
|
return down_left_spine;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
job->n = child;
|
||||||
|
job->begin = job->begin.subspan(1, job->begin.size() - 1);
|
||||||
|
|
||||||
|
if (job->n->partialKeyLen > 0) {
|
||||||
|
int commonLen = std::min<int>(job->n->partialKeyLen, job->begin.size());
|
||||||
|
int i = longestCommonPrefix(job->n->partialKey(), job->begin.data(),
|
||||||
|
commonLen);
|
||||||
|
if (i < commonLen) {
|
||||||
|
auto c = job->n->partialKey()[i] <=> job->begin[i];
|
||||||
|
if (c > 0) {
|
||||||
|
return down_left_spine;
|
||||||
|
} else {
|
||||||
|
job->n = nextSibling(job->n);
|
||||||
|
if (job->n == nullptr) {
|
||||||
|
job->setResult(true);
|
||||||
|
return nullptr; // Done
|
||||||
|
}
|
||||||
|
return down_left_spine;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (commonLen == job->n->partialKeyLen) {
|
||||||
|
// partial key matches
|
||||||
|
job->begin =
|
||||||
|
job->begin.subspan(commonLen, job->begin.size() - commonLen);
|
||||||
|
} else if (job->n->partialKeyLen > int(job->begin.size())) {
|
||||||
|
// n is the first physical node greater than remaining, and there's no
|
||||||
|
// eq node
|
||||||
|
return down_left_spine;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxV <= job->readVersion) {
|
||||||
|
++job->tls->point_read_short_circuit_accum;
|
||||||
|
job->setResult(true);
|
||||||
|
return nullptr; // Done
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CheckJob::continuation down_left_spine(CheckJob *job) {
|
||||||
|
if (job->n->entryPresent) {
|
||||||
|
job->setResult(job->n->entry.rangeVersion <= job->readVersion);
|
||||||
|
return nullptr; // Done
|
||||||
|
}
|
||||||
|
job->n = getFirstChildExists(job->n);
|
||||||
|
return down_left_spine;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace check_point_read_state_machine
|
||||||
|
|
||||||
|
void CheckJob::init(const ConflictSet::ReadRange *read,
|
||||||
|
ConflictSet::Result *result, Node *root,
|
||||||
|
int64_t oldestVersionFullPrecision, ReadContext *tls) {
|
||||||
|
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
|
||||||
|
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
|
||||||
|
if (read->readVersion < oldestVersionFullPrecision) {
|
||||||
|
*result = ConflictSet::TooOld;
|
||||||
|
next = +[](CheckJob *) -> continuation { return nullptr; };
|
||||||
|
} else if (end.size() == 0) {
|
||||||
|
this->begin = begin;
|
||||||
|
this->n = root;
|
||||||
|
this->readVersion = InternalVersionT(read->readVersion);
|
||||||
|
this->result = result;
|
||||||
|
this->tls = tls;
|
||||||
|
this->next = check_point_read_state_machine::begin;
|
||||||
|
} else {
|
||||||
|
*result = checkRangeRead(root, begin, end,
|
||||||
|
InternalVersionT(read->readVersion), tls)
|
||||||
|
? ConflictSet::Commit
|
||||||
|
: ConflictSet::Conflict;
|
||||||
|
next = +[](CheckJob *) -> continuation { return nullptr; };
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||||
|
|
||||||
void check(const ReadRange *reads, Result *result, int count) {
|
void check(const ReadRange *reads, Result *result, int count) {
|
||||||
|
assert(oldestVersionFullPrecision >=
|
||||||
|
newestVersionFullPrecision - kNominalVersionWindow);
|
||||||
|
|
||||||
|
if (count == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
ReadContext tls;
|
ReadContext tls;
|
||||||
tls.impl = this;
|
tls.impl = this;
|
||||||
int64_t check_byte_accum = 0;
|
int64_t check_byte_accum = 0;
|
||||||
|
|
||||||
|
constexpr int kConcurrent = 32;
|
||||||
|
CheckJob inProgress[kConcurrent];
|
||||||
|
int nextJob[kConcurrent];
|
||||||
|
|
||||||
|
int started = std::min(kConcurrent, count);
|
||||||
|
for (int i = 0; i < started; i++) {
|
||||||
|
inProgress[i].init(reads + i, result + i, root,
|
||||||
|
oldestVersionFullPrecision, &tls);
|
||||||
|
nextJob[i] = i + 1;
|
||||||
|
}
|
||||||
|
nextJob[started - 1] = 0;
|
||||||
|
|
||||||
|
int prevJob = started - 1;
|
||||||
|
int job = 0;
|
||||||
|
for (;;) {
|
||||||
|
auto next = inProgress[job].next(inProgress + job);
|
||||||
|
inProgress[job].next = next;
|
||||||
|
if (!next) {
|
||||||
|
if (started == count) {
|
||||||
|
if (prevJob == job)
|
||||||
|
break;
|
||||||
|
nextJob[prevJob] = nextJob[job];
|
||||||
|
job = prevJob;
|
||||||
|
} else {
|
||||||
|
int temp = started++;
|
||||||
|
inProgress[job].init(reads + temp, result + temp, root,
|
||||||
|
oldestVersionFullPrecision, &tls);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
prevJob = job;
|
||||||
|
job = nextJob[job];
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
assert(reads[i].readVersion >= 0);
|
assert(reads[i].readVersion >= 0);
|
||||||
assert(reads[i].readVersion <= newestVersionFullPrecision);
|
assert(reads[i].readVersion <= newestVersionFullPrecision);
|
||||||
const auto &r = reads[i];
|
const auto &r = reads[i];
|
||||||
check_byte_accum += r.begin.len + r.end.len;
|
check_byte_accum += r.begin.len + r.end.len;
|
||||||
auto begin = std::span<const uint8_t>(r.begin.p, r.begin.len);
|
|
||||||
auto end = std::span<const uint8_t>(r.end.p, r.end.len);
|
|
||||||
assert(oldestVersionFullPrecision >=
|
|
||||||
newestVersionFullPrecision - kNominalVersionWindow);
|
|
||||||
result[i] =
|
|
||||||
reads[i].readVersion < oldestVersionFullPrecision ? TooOld
|
|
||||||
: (end.size() > 0
|
|
||||||
? checkRangeRead(root, begin, end,
|
|
||||||
InternalVersionT(reads[i].readVersion), &tls)
|
|
||||||
: checkPointRead(root, begin,
|
|
||||||
InternalVersionT(reads[i].readVersion), &tls))
|
|
||||||
? Commit
|
|
||||||
: Conflict;
|
|
||||||
tls.commits_accum += result[i] == Commit;
|
tls.commits_accum += result[i] == Commit;
|
||||||
tls.conflicts_accum += result[i] == Conflict;
|
tls.conflicts_accum += result[i] == Conflict;
|
||||||
tls.too_olds_accum += result[i] == TooOld;
|
tls.too_olds_accum += result[i] == TooOld;
|
||||||
}
|
}
|
||||||
|
|
||||||
point_read_total.add(tls.point_read_accum);
|
point_read_total.add(tls.point_read_accum);
|
||||||
prefix_read_total.add(tls.prefix_read_accum);
|
prefix_read_total.add(tls.prefix_read_accum);
|
||||||
range_read_total.add(tls.range_read_accum);
|
range_read_total.add(tls.range_read_accum);
|
||||||
|
Reference in New Issue
Block a user