Compare commits
10 Commits
8ff7a112b7
...
f27ca6d6af
| Author | SHA1 | Date | |
|---|---|---|---|
| f27ca6d6af | |||
| c0bb175b7e | |||
| 6a6fe5738a | |||
| dc16eccf06 | |||
| 3f15db7e82 | |||
| e8a8b5aef1 | |||
| b8fefff3ba | |||
| 2706b2f65e | |||
| f1292efe41 | |||
| a2d3d269ec |
472
ConflictSet.cpp
472
ConflictSet.cpp
@@ -264,7 +264,7 @@ struct Node {
|
|||||||
/* begin section that's copied to the next node */
|
/* begin section that's copied to the next node */
|
||||||
union {
|
union {
|
||||||
Entry entry;
|
Entry entry;
|
||||||
/* Set to the forwarding point for this node if deferredReleased is set */
|
/* Set to the forwarding point for this node if releaseDeferred is set */
|
||||||
Node *forwardTo;
|
Node *forwardTo;
|
||||||
};
|
};
|
||||||
Node *parent;
|
Node *parent;
|
||||||
@@ -278,15 +278,12 @@ struct Node {
|
|||||||
|
|
||||||
/* If set, this node has been replaced and the next node in the forwarding
|
/* If set, this node has been replaced and the next node in the forwarding
|
||||||
* chain is `forwardTo`*/
|
* chain is `forwardTo`*/
|
||||||
bool deferredReleased;
|
bool releaseDeferred;
|
||||||
|
|
||||||
uint8_t *partialKey();
|
uint8_t *partialKey();
|
||||||
Type getType() const {
|
Type getType() const { return type; }
|
||||||
assert(!deferredReleased);
|
|
||||||
return type;
|
|
||||||
}
|
|
||||||
int32_t getCapacity() const {
|
int32_t getCapacity() const {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return partialKeyCapacity;
|
return partialKeyCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -320,7 +317,7 @@ struct Node0 : Node {
|
|||||||
constexpr static auto kType = Type_Node0;
|
constexpr static auto kType = Type_Node0;
|
||||||
|
|
||||||
uint8_t *partialKey() {
|
uint8_t *partialKey() {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return (uint8_t *)(this + 1);
|
return (uint8_t *)(this + 1);
|
||||||
}
|
}
|
||||||
void copyChildrenAndKeyFrom(const Node0 &other);
|
void copyChildrenAndKeyFrom(const Node0 &other);
|
||||||
@@ -338,7 +335,7 @@ struct Node3 : Node {
|
|||||||
uint8_t index[kMaxNodes];
|
uint8_t index[kMaxNodes];
|
||||||
|
|
||||||
uint8_t *partialKey() {
|
uint8_t *partialKey() {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return (uint8_t *)(this + 1);
|
return (uint8_t *)(this + 1);
|
||||||
}
|
}
|
||||||
void copyChildrenAndKeyFrom(const Node0 &other);
|
void copyChildrenAndKeyFrom(const Node0 &other);
|
||||||
@@ -357,7 +354,7 @@ struct Node16 : Node {
|
|||||||
uint8_t index[kMaxNodes];
|
uint8_t index[kMaxNodes];
|
||||||
|
|
||||||
uint8_t *partialKey() {
|
uint8_t *partialKey() {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return (uint8_t *)(this + 1);
|
return (uint8_t *)(this + 1);
|
||||||
}
|
}
|
||||||
void copyChildrenAndKeyFrom(const Node3 &other);
|
void copyChildrenAndKeyFrom(const Node3 &other);
|
||||||
@@ -382,7 +379,7 @@ struct Node48 : Node {
|
|||||||
int8_t index[256];
|
int8_t index[256];
|
||||||
|
|
||||||
uint8_t *partialKey() {
|
uint8_t *partialKey() {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return (uint8_t *)(this + 1);
|
return (uint8_t *)(this + 1);
|
||||||
}
|
}
|
||||||
void copyChildrenAndKeyFrom(const Node16 &other);
|
void copyChildrenAndKeyFrom(const Node16 &other);
|
||||||
@@ -405,7 +402,7 @@ struct Node256 : Node {
|
|||||||
InternalVersionT maxOfMax[kMaxOfMaxTotalPages];
|
InternalVersionT maxOfMax[kMaxOfMaxTotalPages];
|
||||||
|
|
||||||
uint8_t *partialKey() {
|
uint8_t *partialKey() {
|
||||||
assert(!deferredReleased);
|
assert(!releaseDeferred);
|
||||||
return (uint8_t *)(this + 1);
|
return (uint8_t *)(this + 1);
|
||||||
}
|
}
|
||||||
void copyChildrenAndKeyFrom(const Node48 &other);
|
void copyChildrenAndKeyFrom(const Node48 &other);
|
||||||
@@ -708,7 +705,7 @@ template <class T> struct BoundedFreeListAllocator {
|
|||||||
T *allocate(int partialKeyCapacity) {
|
T *allocate(int partialKeyCapacity) {
|
||||||
T *result = allocate_helper(partialKeyCapacity);
|
T *result = allocate_helper(partialKeyCapacity);
|
||||||
result->endOfRange = false;
|
result->endOfRange = false;
|
||||||
result->deferredReleased = false;
|
result->releaseDeferred = false;
|
||||||
if constexpr (!std::is_same_v<T, Node0>) {
|
if constexpr (!std::is_same_v<T, Node0>) {
|
||||||
memset(result->children, 0, sizeof(result->children));
|
memset(result->children, 0, sizeof(result->children));
|
||||||
const auto z = InternalVersionT::zero;
|
const auto z = InternalVersionT::zero;
|
||||||
@@ -852,10 +849,13 @@ struct WriteContext {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Place in a list to be released in the next call to releaseDeferred.
|
// Place in a list to be released in the next call to releaseDeferred.
|
||||||
void deferRelease(Node *n) {
|
void deferRelease(Node *n, Node *forwardTo) {
|
||||||
|
n->releaseDeferred = true;
|
||||||
|
n->forwardTo = forwardTo;
|
||||||
n->parent = deferredList;
|
n->parent = deferredList;
|
||||||
deferredList = n;
|
deferredList = n;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release all nodes passed to deferRelease since the last call to
|
// Release all nodes passed to deferRelease since the last call to
|
||||||
// releaseDeferred.
|
// releaseDeferred.
|
||||||
void releaseDeferred() {
|
void releaseDeferred() {
|
||||||
@@ -1204,6 +1204,136 @@ TaggedNodePointer getChild(Node *self, uint8_t index) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TaggedNodePointer *getChildUpdatingMaxVersion(Node0 *,
|
||||||
|
std::span<const uint8_t> &,
|
||||||
|
InternalVersionT) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
TaggedNodePointer *
|
||||||
|
getChildUpdatingMaxVersion(Node3 *self, std::span<const uint8_t> &remaining,
|
||||||
|
InternalVersionT maxVersion) {
|
||||||
|
assert(remaining.size() > 0);
|
||||||
|
int index = remaining.front();
|
||||||
|
auto key = remaining.subspan(1, remaining.size() - 1);
|
||||||
|
int i = getNodeIndex(self, index);
|
||||||
|
if (i < 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
Node *c = self->children[i];
|
||||||
|
if (c->partialKeyLen > 0) {
|
||||||
|
int commonLen = std::min<int>(c->partialKeyLen, key.size());
|
||||||
|
int partialKeyIndex =
|
||||||
|
longestCommonPrefix(c->partialKey(), key.data(), commonLen);
|
||||||
|
if (partialKeyIndex < c->partialKeyLen) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
remaining = key.subspan(c->partialKeyLen, key.size() - c->partialKeyLen);
|
||||||
|
self->childMaxVersion[i] = maxVersion;
|
||||||
|
return &self->children[i];
|
||||||
|
}
|
||||||
|
TaggedNodePointer *
|
||||||
|
getChildUpdatingMaxVersion(Node16 *self, std::span<const uint8_t> &remaining,
|
||||||
|
InternalVersionT maxVersion) {
|
||||||
|
assert(remaining.size() > 0);
|
||||||
|
int index = remaining.front();
|
||||||
|
auto key = remaining.subspan(1, remaining.size() - 1);
|
||||||
|
int i = getNodeIndex(self, index);
|
||||||
|
if (i < 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
Node *c = self->children[i];
|
||||||
|
if (c->partialKeyLen > 0) {
|
||||||
|
int commonLen = std::min<int>(c->partialKeyLen, key.size());
|
||||||
|
int partialKeyIndex =
|
||||||
|
longestCommonPrefix(c->partialKey(), key.data(), commonLen);
|
||||||
|
if (partialKeyIndex < c->partialKeyLen) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
remaining = key.subspan(c->partialKeyLen, key.size() - c->partialKeyLen);
|
||||||
|
self->childMaxVersion[i] = maxVersion;
|
||||||
|
return &self->children[i];
|
||||||
|
}
|
||||||
|
TaggedNodePointer *
|
||||||
|
getChildUpdatingMaxVersion(Node48 *self, std::span<const uint8_t> &remaining,
|
||||||
|
InternalVersionT maxVersion) {
|
||||||
|
assert(remaining.size() > 0);
|
||||||
|
int index = remaining.front();
|
||||||
|
auto key = remaining.subspan(1, remaining.size() - 1);
|
||||||
|
int i = self->index[index];
|
||||||
|
if (i < 0) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
Node *c = self->children[i];
|
||||||
|
if (c->partialKeyLen > 0) {
|
||||||
|
int commonLen = std::min<int>(c->partialKeyLen, key.size());
|
||||||
|
int partialKeyIndex =
|
||||||
|
longestCommonPrefix(c->partialKey(), key.data(), commonLen);
|
||||||
|
if (partialKeyIndex < c->partialKeyLen) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
remaining = key.subspan(c->partialKeyLen, key.size() - c->partialKeyLen);
|
||||||
|
self->childMaxVersion[i] = maxVersion;
|
||||||
|
self->maxOfMax[i >> Node48::kMaxOfMaxShift] =
|
||||||
|
std::max(self->maxOfMax[i >> Node48::kMaxOfMaxShift], maxVersion);
|
||||||
|
return &self->children[i];
|
||||||
|
}
|
||||||
|
TaggedNodePointer *
|
||||||
|
getChildUpdatingMaxVersion(Node256 *self, std::span<const uint8_t> &remaining,
|
||||||
|
InternalVersionT maxVersion) {
|
||||||
|
assert(remaining.size() > 0);
|
||||||
|
int index = remaining.front();
|
||||||
|
auto key = remaining.subspan(1, remaining.size() - 1);
|
||||||
|
auto &n = self->children[index];
|
||||||
|
if (n == nullptr) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
Node *c = n;
|
||||||
|
if (c->partialKeyLen > 0) {
|
||||||
|
int commonLen = std::min<int>(c->partialKeyLen, key.size());
|
||||||
|
int partialKeyIndex =
|
||||||
|
longestCommonPrefix(c->partialKey(), key.data(), commonLen);
|
||||||
|
if (partialKeyIndex < c->partialKeyLen) {
|
||||||
|
return nullptr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
remaining = key.subspan(c->partialKeyLen, key.size() - c->partialKeyLen);
|
||||||
|
self->childMaxVersion[index] = maxVersion;
|
||||||
|
self->maxOfMax[index >> Node256::kMaxOfMaxShift] =
|
||||||
|
std::max(self->maxOfMax[index >> Node256::kMaxOfMaxShift], maxVersion);
|
||||||
|
return &n;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Precondition: remaining.size() > 0
|
||||||
|
// If a child of self lies along the search path of remaining, return a pointer
|
||||||
|
// to that child, update max version, and consume the matching prefix bytes from
|
||||||
|
// remaining. Otherwise return nullptr without changing the tree at all.
|
||||||
|
TaggedNodePointer *
|
||||||
|
getChildUpdatingMaxVersion(Node *self, std::span<const uint8_t> &remaining,
|
||||||
|
InternalVersionT maxVersion) {
|
||||||
|
switch (self->getType()) {
|
||||||
|
case Type_Node0:
|
||||||
|
return getChildUpdatingMaxVersion(static_cast<Node0 *>(self), remaining,
|
||||||
|
maxVersion);
|
||||||
|
case Type_Node3:
|
||||||
|
return getChildUpdatingMaxVersion(static_cast<Node3 *>(self), remaining,
|
||||||
|
maxVersion);
|
||||||
|
case Type_Node16:
|
||||||
|
return getChildUpdatingMaxVersion(static_cast<Node16 *>(self), remaining,
|
||||||
|
maxVersion);
|
||||||
|
case Type_Node48:
|
||||||
|
return getChildUpdatingMaxVersion(static_cast<Node48 *>(self), remaining,
|
||||||
|
maxVersion);
|
||||||
|
case Type_Node256:
|
||||||
|
return getChildUpdatingMaxVersion(static_cast<Node256 *>(self), remaining,
|
||||||
|
maxVersion);
|
||||||
|
default: // GCOVR_EXCL_LINE
|
||||||
|
__builtin_unreachable(); // GCOVR_EXCL_LINE
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct ChildAndMaxVersion {
|
struct ChildAndMaxVersion {
|
||||||
TaggedNodePointer child;
|
TaggedNodePointer child;
|
||||||
InternalVersionT maxVersion;
|
InternalVersionT maxVersion;
|
||||||
@@ -1550,7 +1680,7 @@ TaggedNodePointer &getOrCreateChild(TaggedNodePointer &self,
|
|||||||
|
|
||||||
auto *newSelf = writeContext->allocate<Node3>(self->partialKeyLen);
|
auto *newSelf = writeContext->allocate<Node3>(self->partialKeyLen);
|
||||||
newSelf->copyChildrenAndKeyFrom(*self0);
|
newSelf->copyChildrenAndKeyFrom(*self0);
|
||||||
writeContext->release(self0);
|
writeContext->deferRelease(self0, newSelf);
|
||||||
self = newSelf;
|
self = newSelf;
|
||||||
|
|
||||||
goto insert3;
|
goto insert3;
|
||||||
@@ -1560,7 +1690,7 @@ TaggedNodePointer &getOrCreateChild(TaggedNodePointer &self,
|
|||||||
auto *self3 = static_cast<Node3 *>(self);
|
auto *self3 = static_cast<Node3 *>(self);
|
||||||
auto *newSelf = writeContext->allocate<Node16>(self->partialKeyLen);
|
auto *newSelf = writeContext->allocate<Node16>(self->partialKeyLen);
|
||||||
newSelf->copyChildrenAndKeyFrom(*self3);
|
newSelf->copyChildrenAndKeyFrom(*self3);
|
||||||
writeContext->release(self3);
|
writeContext->deferRelease(self3, newSelf);
|
||||||
self = newSelf;
|
self = newSelf;
|
||||||
goto insert16;
|
goto insert16;
|
||||||
}
|
}
|
||||||
@@ -1589,7 +1719,7 @@ TaggedNodePointer &getOrCreateChild(TaggedNodePointer &self,
|
|||||||
auto *self16 = static_cast<Node16 *>(self);
|
auto *self16 = static_cast<Node16 *>(self);
|
||||||
auto *newSelf = writeContext->allocate<Node48>(self->partialKeyLen);
|
auto *newSelf = writeContext->allocate<Node48>(self->partialKeyLen);
|
||||||
newSelf->copyChildrenAndKeyFrom(*self16);
|
newSelf->copyChildrenAndKeyFrom(*self16);
|
||||||
writeContext->release(self16);
|
writeContext->deferRelease(self16, newSelf);
|
||||||
self = newSelf;
|
self = newSelf;
|
||||||
goto insert48;
|
goto insert48;
|
||||||
}
|
}
|
||||||
@@ -1620,7 +1750,7 @@ TaggedNodePointer &getOrCreateChild(TaggedNodePointer &self,
|
|||||||
auto *self48 = static_cast<Node48 *>(self);
|
auto *self48 = static_cast<Node48 *>(self);
|
||||||
auto *newSelf = writeContext->allocate<Node256>(self->partialKeyLen);
|
auto *newSelf = writeContext->allocate<Node256>(self->partialKeyLen);
|
||||||
newSelf->copyChildrenAndKeyFrom(*self48);
|
newSelf->copyChildrenAndKeyFrom(*self48);
|
||||||
writeContext->release(self48);
|
writeContext->deferRelease(self48, newSelf);
|
||||||
self = newSelf;
|
self = newSelf;
|
||||||
goto insert256;
|
goto insert256;
|
||||||
}
|
}
|
||||||
@@ -3103,11 +3233,12 @@ Node *firstGeqPhysical(Node *n, const std::span<const uint8_t> key) {
|
|||||||
#define PRESERVE_NONE
|
#define PRESERVE_NONE
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef PRESERVE_NONE void (*Continuation)(struct CheckJob *,
|
namespace check {
|
||||||
struct CheckContext *);
|
|
||||||
|
typedef PRESERVE_NONE void (*Continuation)(struct Job *, struct Context *);
|
||||||
|
|
||||||
// State relevant to an individual query
|
// State relevant to an individual query
|
||||||
struct CheckJob {
|
struct Job {
|
||||||
void setResult(bool ok) {
|
void setResult(bool ok) {
|
||||||
*result = ok ? ConflictSet::Commit : ConflictSet::Conflict;
|
*result = ok ? ConflictSet::Commit : ConflictSet::Conflict;
|
||||||
}
|
}
|
||||||
@@ -3126,12 +3257,12 @@ struct CheckJob {
|
|||||||
InternalVersionT readVersion;
|
InternalVersionT readVersion;
|
||||||
ConflictSet::Result *result;
|
ConflictSet::Result *result;
|
||||||
Continuation continuation;
|
Continuation continuation;
|
||||||
CheckJob *prev;
|
Job *prev;
|
||||||
CheckJob *next;
|
Job *next;
|
||||||
};
|
};
|
||||||
|
|
||||||
// State relevant to every query
|
// State relevant to every query
|
||||||
struct CheckContext {
|
struct Context {
|
||||||
int count;
|
int count;
|
||||||
int64_t oldestVersionFullPrecision;
|
int64_t oldestVersionFullPrecision;
|
||||||
Node *root;
|
Node *root;
|
||||||
@@ -3141,36 +3272,37 @@ struct CheckContext {
|
|||||||
ReadContext readContext;
|
ReadContext readContext;
|
||||||
};
|
};
|
||||||
|
|
||||||
PRESERVE_NONE void keepGoing(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void keepGoing(Job *job, Context *context) {
|
||||||
job = job->next;
|
job = job->next;
|
||||||
MUSTTAIL return job->continuation(job, context);
|
MUSTTAIL return job->continuation(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
PRESERVE_NONE void complete(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void complete(Job *job, Context *context) {
|
||||||
if (context->started == context->count) {
|
if (context->started == context->count) {
|
||||||
if (job->prev == job) {
|
if (job->prev == job) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
job->prev->next = job->next;
|
job->prev->next = job->next;
|
||||||
job->next->prev = job->prev;
|
job->next->prev = job->prev;
|
||||||
job = job->prev;
|
job = job->next;
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
} else {
|
} else {
|
||||||
int temp = context->started++;
|
int temp = context->started++;
|
||||||
job->init(context->queries + temp, context->results + temp, context->root,
|
job->init(context->queries + temp, context->results + temp, context->root,
|
||||||
context->oldestVersionFullPrecision);
|
context->oldestVersionFullPrecision);
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
}
|
}
|
||||||
MUSTTAIL return keepGoing(job, context);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void down_left_spine(CheckJob *job, CheckContext *context);
|
PRESERVE_NONE void down_left_spine(Job *job, Context *context);
|
||||||
|
|
||||||
static Continuation downLeftSpineTable[] = {
|
static Continuation downLeftSpineTable[] = {
|
||||||
down_left_spine<Node0>, down_left_spine<Node3>, down_left_spine<Node16>,
|
down_left_spine<Node0>, down_left_spine<Node3>, down_left_spine<Node16>,
|
||||||
down_left_spine<Node48>, down_left_spine<Node256>};
|
down_left_spine<Node48>, down_left_spine<Node256>};
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void down_left_spine(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void down_left_spine(Job *job, Context *context) {
|
||||||
assert(job->n->getType() == NodeT::kType);
|
assert(job->n->getType() == NodeT::kType);
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
if (n->entryPresent) {
|
if (n->entryPresent) {
|
||||||
@@ -3184,16 +3316,16 @@ PRESERVE_NONE void down_left_spine(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace check_point_read_state_machine {
|
namespace point_read_state_machine {
|
||||||
|
|
||||||
PRESERVE_NONE void begin(CheckJob *, CheckContext *);
|
PRESERVE_NONE void begin(Job *, Context *);
|
||||||
|
|
||||||
template <class NodeT> PRESERVE_NONE void iter(CheckJob *, CheckContext *);
|
template <class NodeT> PRESERVE_NONE void iter(Job *, Context *);
|
||||||
|
|
||||||
static Continuation iterTable[] = {iter<Node0>, iter<Node3>, iter<Node16>,
|
static Continuation iterTable[] = {iter<Node0>, iter<Node3>, iter<Node16>,
|
||||||
iter<Node48>, iter<Node256>};
|
iter<Node48>, iter<Node256>};
|
||||||
|
|
||||||
void begin(CheckJob *job, CheckContext *context) {
|
void begin(Job *job, Context *context) {
|
||||||
++context->readContext.point_read_accum;
|
++context->readContext.point_read_accum;
|
||||||
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
||||||
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
|
fprintf(stderr, "Check point read: %s\n", printable(key).c_str());
|
||||||
@@ -3228,7 +3360,7 @@ void begin(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class NodeT> void iter(CheckJob *job, CheckContext *context) {
|
template <class NodeT> void iter(Job *job, Context *context) {
|
||||||
|
|
||||||
assert(NodeT::kType == job->n->getType());
|
assert(NodeT::kType == job->n->getType());
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
@@ -3311,18 +3443,18 @@ template <class NodeT> void iter(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace check_point_read_state_machine
|
} // namespace point_read_state_machine
|
||||||
|
|
||||||
namespace check_prefix_read_state_machine {
|
namespace prefix_read_state_machine {
|
||||||
|
|
||||||
PRESERVE_NONE void begin(CheckJob *, CheckContext *);
|
PRESERVE_NONE void begin(Job *, Context *);
|
||||||
|
|
||||||
template <class NodeT> PRESERVE_NONE void iter(CheckJob *, CheckContext *);
|
template <class NodeT> PRESERVE_NONE void iter(Job *, Context *);
|
||||||
|
|
||||||
static Continuation iterTable[] = {iter<Node0>, iter<Node3>, iter<Node16>,
|
static Continuation iterTable[] = {iter<Node0>, iter<Node3>, iter<Node16>,
|
||||||
iter<Node48>, iter<Node256>};
|
iter<Node48>, iter<Node256>};
|
||||||
|
|
||||||
void begin(CheckJob *job, CheckContext *context) {
|
void begin(Job *job, Context *context) {
|
||||||
++context->readContext.prefix_read_accum;
|
++context->readContext.prefix_read_accum;
|
||||||
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
#if DEBUG_VERBOSE && !defined(NDEBUG)
|
||||||
fprintf(stderr, "Check prefix read: %s\n", printable(key).c_str());
|
fprintf(stderr, "Check prefix read: %s\n", printable(key).c_str());
|
||||||
@@ -3353,7 +3485,7 @@ void begin(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
template <class NodeT> void iter(CheckJob *job, CheckContext *context) {
|
template <class NodeT> void iter(Job *job, Context *context) {
|
||||||
|
|
||||||
assert(NodeT::kType == job->n->getType());
|
assert(NodeT::kType == job->n->getType());
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
@@ -3434,16 +3566,15 @@ template <class NodeT> void iter(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace check_prefix_read_state_machine
|
} // namespace prefix_read_state_machine
|
||||||
|
|
||||||
namespace check_range_read_state_machine {
|
namespace range_read_state_machine {
|
||||||
PRESERVE_NONE void begin(CheckJob *, CheckContext *);
|
PRESERVE_NONE void begin(Job *, Context *);
|
||||||
|
|
||||||
|
template <class NodeT> PRESERVE_NONE void common_prefix_iter(Job *, Context *);
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void common_prefix_iter(CheckJob *, CheckContext *);
|
PRESERVE_NONE void done_common_prefix_iter(Job *, Context *);
|
||||||
|
|
||||||
template <class NodeT>
|
|
||||||
PRESERVE_NONE void done_common_prefix_iter(CheckJob *, CheckContext *);
|
|
||||||
|
|
||||||
static Continuation commonPrefixIterTable[] = {
|
static Continuation commonPrefixIterTable[] = {
|
||||||
common_prefix_iter<Node0>, common_prefix_iter<Node3>,
|
common_prefix_iter<Node0>, common_prefix_iter<Node3>,
|
||||||
@@ -3455,43 +3586,41 @@ static Continuation doneCommonPrefixIterTable[] = {
|
|||||||
done_common_prefix_iter<Node16>, done_common_prefix_iter<Node48>,
|
done_common_prefix_iter<Node16>, done_common_prefix_iter<Node48>,
|
||||||
done_common_prefix_iter<Node256>};
|
done_common_prefix_iter<Node256>};
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT> PRESERVE_NONE void left_side_iter(Job *, Context *);
|
||||||
PRESERVE_NONE void left_side_iter(CheckJob *, CheckContext *);
|
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void left_side_down_left_spine(CheckJob *, CheckContext *);
|
PRESERVE_NONE void left_side_down_left_spine(Job *, Context *);
|
||||||
|
|
||||||
static Continuation leftSideDownLeftSpineTable[] = {
|
static Continuation leftSideDownLeftSpineTable[] = {
|
||||||
left_side_down_left_spine<Node0>, left_side_down_left_spine<Node3>,
|
left_side_down_left_spine<Node0>, left_side_down_left_spine<Node3>,
|
||||||
left_side_down_left_spine<Node16>, left_side_down_left_spine<Node48>,
|
left_side_down_left_spine<Node16>, left_side_down_left_spine<Node48>,
|
||||||
left_side_down_left_spine<Node256>};
|
left_side_down_left_spine<Node256>};
|
||||||
|
|
||||||
PRESERVE_NONE void done_left_side_iter(CheckJob *, CheckContext *);
|
PRESERVE_NONE void done_left_side_iter(Job *, Context *);
|
||||||
|
|
||||||
static Continuation leftSideIterTable[] = {
|
static Continuation leftSideIterTable[] = {
|
||||||
left_side_iter<Node0>, left_side_iter<Node3>, left_side_iter<Node16>,
|
left_side_iter<Node0>, left_side_iter<Node3>, left_side_iter<Node16>,
|
||||||
left_side_iter<Node48>, left_side_iter<Node256>};
|
left_side_iter<Node48>, left_side_iter<Node256>};
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT> PRESERVE_NONE void right_side_iter(Job *, Context *);
|
||||||
PRESERVE_NONE void right_side_iter(CheckJob *, CheckContext *);
|
|
||||||
|
|
||||||
static Continuation rightSideIterTable[] = {
|
static Continuation rightSideIterTable[] = {
|
||||||
right_side_iter<Node0>, right_side_iter<Node3>, right_side_iter<Node16>,
|
right_side_iter<Node0>, right_side_iter<Node3>, right_side_iter<Node16>,
|
||||||
right_side_iter<Node48>, right_side_iter<Node256>};
|
right_side_iter<Node48>, right_side_iter<Node256>};
|
||||||
|
|
||||||
PRESERVE_NONE void begin(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void begin(Job *job, Context *context) {
|
||||||
job->lcp = longestCommonPrefix(job->begin.data(), job->end.data(),
|
job->lcp = longestCommonPrefix(job->begin.data(), job->end.data(),
|
||||||
std::min(job->begin.size(), job->end.size()));
|
std::min(job->begin.size(), job->end.size()));
|
||||||
if (job->lcp == int(job->begin.size()) &&
|
if (job->lcp == int(job->begin.size()) &&
|
||||||
job->end.size() == job->begin.size() + 1 && job->end.back() == 0) {
|
job->end.size() == job->begin.size() + 1 && job->end.back() == 0) {
|
||||||
// Call directly since we have nothing to prefetch
|
// Call directly since we have nothing to prefetch
|
||||||
MUSTTAIL return check_point_read_state_machine::begin(job, context);
|
MUSTTAIL return check::point_read_state_machine::begin(job, context);
|
||||||
}
|
}
|
||||||
if (job->lcp == int(job->begin.size() - 1) &&
|
if (job->lcp == int(job->begin.size() - 1) &&
|
||||||
job->end.size() == job->begin.size() &&
|
job->end.size() == job->begin.size() &&
|
||||||
int(job->begin.back()) + 1 == int(job->end.back())) {
|
int(job->begin.back()) + 1 == int(job->end.back())) {
|
||||||
// Call directly since we have nothing to prefetch
|
// Call directly since we have nothing to prefetch
|
||||||
MUSTTAIL return check_prefix_read_state_machine::begin(job, context);
|
MUSTTAIL return check::prefix_read_state_machine::begin(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
++context->readContext.range_read_accum;
|
++context->readContext.range_read_accum;
|
||||||
@@ -3514,8 +3643,7 @@ PRESERVE_NONE void begin(CheckJob *job, CheckContext *context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Advance down common prefix, but stay on a physical path in the tree
|
// Advance down common prefix, but stay on a physical path in the tree
|
||||||
template <class NodeT>
|
template <class NodeT> void common_prefix_iter(Job *job, Context *context) {
|
||||||
void common_prefix_iter(CheckJob *job, CheckContext *context) {
|
|
||||||
|
|
||||||
assert(NodeT::kType == job->child->getType());
|
assert(NodeT::kType == job->child->getType());
|
||||||
NodeT *child = static_cast<NodeT *>(job->child);
|
NodeT *child = static_cast<NodeT *>(job->child);
|
||||||
@@ -3559,8 +3687,7 @@ void common_prefix_iter(CheckJob *job, CheckContext *context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void done_common_prefix_iter(CheckJob *job,
|
PRESERVE_NONE void done_common_prefix_iter(Job *job, Context *context) {
|
||||||
CheckContext *context) {
|
|
||||||
assert(NodeT::kType == job->n->getType());
|
assert(NodeT::kType == job->n->getType());
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
|
|
||||||
@@ -3666,7 +3793,7 @@ PRESERVE_NONE void done_common_prefix_iter(CheckJob *job,
|
|||||||
// Return true if the max version among all keys that start with key[:prefixLen]
|
// Return true if the max version among all keys that start with key[:prefixLen]
|
||||||
// that are >= key is <= readVersion
|
// that are >= key is <= readVersion
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void left_side_iter(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void left_side_iter(Job *job, Context *context) {
|
||||||
assert(NodeT::kType == job->n->getType());
|
assert(NodeT::kType == job->n->getType());
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
|
|
||||||
@@ -3765,7 +3892,7 @@ PRESERVE_NONE void left_side_iter(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
PRESERVE_NONE void done_left_side_iter(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void done_left_side_iter(Job *job, Context *context) {
|
||||||
|
|
||||||
job->n = job->commonPrefixNode;
|
job->n = job->commonPrefixNode;
|
||||||
job->remaining = job->end;
|
job->remaining = job->end;
|
||||||
@@ -3797,7 +3924,7 @@ PRESERVE_NONE void done_left_side_iter(CheckJob *job, CheckContext *context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
void left_side_down_left_spine(CheckJob *job, CheckContext *context) {
|
void left_side_down_left_spine(Job *job, Context *context) {
|
||||||
assert(job->n->getType() == NodeT::kType);
|
assert(job->n->getType() == NodeT::kType);
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
|
|
||||||
@@ -3818,7 +3945,7 @@ void left_side_down_left_spine(CheckJob *job, CheckContext *context) {
|
|||||||
// Return true if the max version among all keys that start with key[:prefixLen]
|
// Return true if the max version among all keys that start with key[:prefixLen]
|
||||||
// that are < key is <= readVersion
|
// that are < key is <= readVersion
|
||||||
template <class NodeT>
|
template <class NodeT>
|
||||||
PRESERVE_NONE void right_side_iter(CheckJob *job, CheckContext *context) {
|
PRESERVE_NONE void right_side_iter(Job *job, Context *context) {
|
||||||
assert(NodeT::kType == job->n->getType());
|
assert(NodeT::kType == job->n->getType());
|
||||||
NodeT *n = static_cast<NodeT *>(job->n);
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
|
|
||||||
@@ -3913,11 +4040,10 @@ PRESERVE_NONE void right_side_iter(CheckJob *job, CheckContext *context) {
|
|||||||
MUSTTAIL return keepGoing(job, context);
|
MUSTTAIL return keepGoing(job, context);
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace check_range_read_state_machine
|
} // namespace range_read_state_machine
|
||||||
|
|
||||||
void CheckJob::init(const ConflictSet::ReadRange *read,
|
void Job::init(const ConflictSet::ReadRange *read, ConflictSet::Result *result,
|
||||||
ConflictSet::Result *result, Node *root,
|
Node *root, int64_t oldestVersionFullPrecision) {
|
||||||
int64_t oldestVersionFullPrecision) {
|
|
||||||
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
|
auto begin = std::span<const uint8_t>(read->begin.p, read->begin.len);
|
||||||
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
|
auto end = std::span<const uint8_t>(read->end.p, read->end.len);
|
||||||
if (read->readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
if (read->readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
||||||
@@ -3928,16 +4054,114 @@ void CheckJob::init(const ConflictSet::ReadRange *read,
|
|||||||
this->n = root;
|
this->n = root;
|
||||||
this->readVersion = InternalVersionT(read->readVersion);
|
this->readVersion = InternalVersionT(read->readVersion);
|
||||||
this->result = result;
|
this->result = result;
|
||||||
continuation = check_point_read_state_machine::begin;
|
continuation = check::point_read_state_machine::begin;
|
||||||
} else {
|
} else {
|
||||||
this->begin = begin;
|
this->begin = begin;
|
||||||
this->end = end;
|
this->end = end;
|
||||||
this->n = root;
|
this->n = root;
|
||||||
this->readVersion = InternalVersionT(read->readVersion);
|
this->readVersion = InternalVersionT(read->readVersion);
|
||||||
this->result = result;
|
this->result = result;
|
||||||
continuation = check_range_read_state_machine::begin;
|
continuation = check::range_read_state_machine::begin;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} // namespace check
|
||||||
|
|
||||||
|
namespace interleaved_insert {
|
||||||
|
|
||||||
|
typedef PRESERVE_NONE void (*Continuation)(struct Job *, struct Context *);
|
||||||
|
|
||||||
|
// State relevant to an individual insertion
|
||||||
|
struct Job {
|
||||||
|
std::span<const uint8_t> remaining;
|
||||||
|
Node *n;
|
||||||
|
int index;
|
||||||
|
|
||||||
|
// State for context switching machinery - not application specific
|
||||||
|
Continuation continuation;
|
||||||
|
Job *prev;
|
||||||
|
Job *next;
|
||||||
|
void init(Context *, int index);
|
||||||
|
};
|
||||||
|
|
||||||
|
// Result of an insertion. The search path of insertionPoint + remaining == the
|
||||||
|
// original key, and there is existing node in the tree further along the search
|
||||||
|
// path of the original key
|
||||||
|
struct Result {
|
||||||
|
Node *insertionPoint;
|
||||||
|
std::span<const uint8_t> remaining;
|
||||||
|
};
|
||||||
|
|
||||||
|
// State relevant to every insertion
|
||||||
|
struct Context {
|
||||||
|
int count;
|
||||||
|
int64_t started;
|
||||||
|
const ConflictSet::WriteRange *writes;
|
||||||
|
Node *root;
|
||||||
|
InternalVersionT writeVersion;
|
||||||
|
Result *results;
|
||||||
|
};
|
||||||
|
|
||||||
|
PRESERVE_NONE void keepGoing(Job *job, Context *context) {
|
||||||
|
job = job->next;
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
PRESERVE_NONE void complete(Job *job, Context *context) {
|
||||||
|
if (context->started == context->count) {
|
||||||
|
if (job->prev == job) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
job->prev->next = job->next;
|
||||||
|
job->next->prev = job->prev;
|
||||||
|
job = job->next;
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
|
} else {
|
||||||
|
int temp = context->started++;
|
||||||
|
job->init(context, temp);
|
||||||
|
MUSTTAIL return job->continuation(job, context);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
template <class NodeT> PRESERVE_NONE void iter(Job *, Context *);
|
||||||
|
|
||||||
|
static Continuation iterTable[] = {iter<Node0>, iter<Node3>, iter<Node16>,
|
||||||
|
iter<Node48>, iter<Node256>};
|
||||||
|
|
||||||
|
template <class NodeT> void iter(Job *job, Context *context) {
|
||||||
|
assert(NodeT::kType == job->n->getType());
|
||||||
|
NodeT *n = static_cast<NodeT *>(job->n);
|
||||||
|
|
||||||
|
TaggedNodePointer *child =
|
||||||
|
getChildUpdatingMaxVersion(n, job->remaining, context->writeVersion);
|
||||||
|
if (child == nullptr) [[unlikely]] {
|
||||||
|
context->results[job->index] = {job->n, job->remaining};
|
||||||
|
MUSTTAIL return complete(job, context);
|
||||||
|
}
|
||||||
|
job->n = *child;
|
||||||
|
if (job->remaining.size() == 0) [[unlikely]] {
|
||||||
|
context->results[job->index] = {job->n, job->remaining};
|
||||||
|
MUSTTAIL return complete(job, context);
|
||||||
|
}
|
||||||
|
job->continuation = iterTable[child->getType()];
|
||||||
|
__builtin_prefetch(job->n);
|
||||||
|
MUSTTAIL return keepGoing(job, context);
|
||||||
|
}
|
||||||
|
|
||||||
|
void Job::init(Context *context, int index) {
|
||||||
|
this->index = index;
|
||||||
|
remaining = std::span<const uint8_t>(context->writes[index].begin.p,
|
||||||
|
context->writes[index].begin.len);
|
||||||
|
n = context->root;
|
||||||
|
|
||||||
|
if (remaining.size() == 0) [[unlikely]] {
|
||||||
|
context->results[index] = {n, remaining};
|
||||||
|
continuation = interleaved_insert::complete;
|
||||||
|
} else {
|
||||||
|
continuation = iterTable[n->getType()];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace interleaved_insert
|
||||||
|
|
||||||
// Sequential implementations
|
// Sequential implementations
|
||||||
namespace {
|
namespace {
|
||||||
@@ -4352,7 +4576,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
// We still have the sequential implementation for compilers that don't
|
// We still have the sequential implementation for compilers that don't
|
||||||
// support preserve_none and musttail
|
// support preserve_none and musttail
|
||||||
void useSequential(const ReadRange *reads, Result *result, int count,
|
void useSequential(const ReadRange *reads, Result *result, int count,
|
||||||
CheckContext &context) {
|
check::Context &context) {
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
if (reads[i].readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
if (reads[i].readVersion < oldestVersionFullPrecision) [[unlikely]] {
|
||||||
result[i] = TooOld;
|
result[i] = TooOld;
|
||||||
@@ -4384,7 +4608,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int64_t check_byte_accum = 0;
|
int64_t check_byte_accum = 0;
|
||||||
CheckContext context;
|
check::Context context;
|
||||||
context.readContext.impl = this;
|
context.readContext.impl = this;
|
||||||
|
|
||||||
#if __has_attribute(preserve_none) && __has_attribute(musttail)
|
#if __has_attribute(preserve_none) && __has_attribute(musttail)
|
||||||
@@ -4392,7 +4616,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
useSequential(reads, result, count, context);
|
useSequential(reads, result, count, context);
|
||||||
} else {
|
} else {
|
||||||
constexpr int kConcurrent = 16;
|
constexpr int kConcurrent = 16;
|
||||||
CheckJob inProgress[kConcurrent];
|
check::Job inProgress[kConcurrent];
|
||||||
context.count = count;
|
context.count = count;
|
||||||
context.oldestVersionFullPrecision = oldestVersionFullPrecision;
|
context.oldestVersionFullPrecision = oldestVersionFullPrecision;
|
||||||
context.root = root;
|
context.root = root;
|
||||||
@@ -4420,7 +4644,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
Arena arena;
|
Arena arena;
|
||||||
auto *results2 = new (arena) Result[count];
|
auto *results2 = new (arena) Result[count];
|
||||||
CheckContext context2;
|
check::Context context2;
|
||||||
context2.readContext.impl = this;
|
context2.readContext.impl = this;
|
||||||
useSequential(reads, results2, count, context2);
|
useSequential(reads, results2, count, context2);
|
||||||
assert(memcmp(result, results2, count) == 0);
|
assert(memcmp(result, results2, count) == 0);
|
||||||
@@ -4465,6 +4689,67 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
check_bytes_total.add(check_byte_accum);
|
check_bytes_total.add(check_byte_accum);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void interleavedPointWrites(const WriteRange *writes, int count,
|
||||||
|
InternalVersionT writeVersion) {
|
||||||
|
// Phase 1: Search for insertion points concurrently, without modifying the
|
||||||
|
// structure of the tree.
|
||||||
|
|
||||||
|
if (count == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
interleaved_insert::Result stackResults[100];
|
||||||
|
|
||||||
|
constexpr int kConcurrent = 16;
|
||||||
|
interleaved_insert::Job inProgress[kConcurrent];
|
||||||
|
interleaved_insert::Context context;
|
||||||
|
context.writeVersion = writeVersion;
|
||||||
|
context.count = count;
|
||||||
|
context.root = root;
|
||||||
|
context.writes = writes;
|
||||||
|
context.results = stackResults;
|
||||||
|
if (count > int(sizeof(stackResults) / sizeof(stackResults[0])))
|
||||||
|
[[unlikely]] {
|
||||||
|
context.results = (interleaved_insert::Result *)safe_malloc(
|
||||||
|
count * sizeof(interleaved_insert::Result));
|
||||||
|
}
|
||||||
|
int64_t started = std::min(kConcurrent, count);
|
||||||
|
context.started = started;
|
||||||
|
for (int i = 0; i < started; i++) {
|
||||||
|
inProgress[i].init(&context, i);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < started - 1; i++) {
|
||||||
|
inProgress[i].next = inProgress + i + 1;
|
||||||
|
}
|
||||||
|
for (int i = 1; i < started; i++) {
|
||||||
|
inProgress[i].prev = inProgress + i - 1;
|
||||||
|
}
|
||||||
|
inProgress[0].prev = inProgress + started - 1;
|
||||||
|
inProgress[started - 1].next = inProgress;
|
||||||
|
|
||||||
|
// Kick off the sequence of tail calls that finally returns once all jobs
|
||||||
|
// are done
|
||||||
|
inProgress->continuation(inProgress, &context);
|
||||||
|
|
||||||
|
// Phase 2: Perform insertions. Nodes may be upsized during this phase, but
|
||||||
|
// old nodes get forwarding pointers installed and are released after
|
||||||
|
// phase 2.
|
||||||
|
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
while (context.results[i].insertionPoint->releaseDeferred) {
|
||||||
|
context.results[i].insertionPoint =
|
||||||
|
context.results[i].insertionPoint->forwardTo;
|
||||||
|
}
|
||||||
|
addPointWrite(getInTree(context.results[i].insertionPoint, this),
|
||||||
|
context.results[i].remaining, writeVersion, &writeContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (count > int(sizeof(stackResults) / sizeof(stackResults[0])))
|
||||||
|
[[unlikely]] {
|
||||||
|
safe_free(context.results, count * sizeof(interleaved_insert::Result));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void addWrites(const WriteRange *writes, int count, int64_t writeVersion) {
|
void addWrites(const WriteRange *writes, int count, int64_t writeVersion) {
|
||||||
#if !USE_64_BIT
|
#if !USE_64_BIT
|
||||||
// There could be other conflict sets in the same thread. We need
|
// There could be other conflict sets in the same thread. We need
|
||||||
@@ -4506,19 +4791,36 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if __has_attribute(preserve_none) && __has_attribute(musttail)
|
||||||
|
bool allPointWrites = true;
|
||||||
for (int i = 0; i < count; ++i) {
|
for (int i = 0; i < count; ++i) {
|
||||||
const auto &w = writes[i];
|
if (writes[i].end.len > 0) {
|
||||||
writeContext.accum.write_bytes += w.begin.len + w.end.len;
|
allPointWrites = false;
|
||||||
auto begin = std::span<const uint8_t>(w.begin.p, w.begin.len);
|
break;
|
||||||
auto end = std::span<const uint8_t>(w.end.p, w.end.len);
|
|
||||||
if (w.end.len > 0) {
|
|
||||||
addWriteRange(root, begin, end, InternalVersionT(writeVersion),
|
|
||||||
&writeContext, this);
|
|
||||||
} else {
|
|
||||||
addPointWrite(root, begin, InternalVersionT(writeVersion),
|
|
||||||
&writeContext);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
bool allPointWrites = false;
|
||||||
|
#endif
|
||||||
|
if (allPointWrites && count > 1) {
|
||||||
|
interleavedPointWrites(writes, count, InternalVersionT(writeVersion));
|
||||||
|
} else {
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
const auto &w = writes[i];
|
||||||
|
writeContext.accum.write_bytes += w.begin.len + w.end.len;
|
||||||
|
auto begin = std::span<const uint8_t>(w.begin.p, w.begin.len);
|
||||||
|
auto end = std::span<const uint8_t>(w.end.p, w.end.len);
|
||||||
|
if (w.end.len > 0) {
|
||||||
|
addWriteRange(root, begin, end, InternalVersionT(writeVersion),
|
||||||
|
&writeContext, this);
|
||||||
|
} else {
|
||||||
|
addPointWrite(root, begin, InternalVersionT(writeVersion),
|
||||||
|
&writeContext);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
writeContext.releaseDeferred();
|
||||||
|
|
||||||
// Run gc at least 200% the rate we're inserting entries
|
// Run gc at least 200% the rate we're inserting entries
|
||||||
keyUpdates += std::max<int64_t>(writeContext.accum.entries_inserted -
|
keyUpdates += std::max<int64_t>(writeContext.accum.entries_inserted -
|
||||||
|
|||||||
Reference in New Issue
Block a user