diff --git a/ConflictSet.cpp b/ConflictSet.cpp index b0def54..3241ede 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -259,6 +259,10 @@ struct Node256 : Node { uint8_t *partialKey() { return (uint8_t *)(this + 1); } }; +namespace { +std::string getSearchPathPrintable(Node *n); +} + // Bound memory usage following the analysis in the ART paper constexpr int kBytesPerKey = 120; @@ -839,6 +843,11 @@ void maybeDecreaseCapacity(Node *&self, NodeAllocators *allocators, // TODO fuse into erase child so we don't need to repeat branches on type void maybeDownsize(Node *self, NodeAllocators *allocators, ConflictSet::Impl *impl, Node *&dontInvalidate) { + +#if DEBUG_VERBOSE && !defined(NDEBUG) + fprintf(stderr, "maybeDownsize: %s\n", getSearchPathPrintable(self).c_str()); +#endif + switch (self->type) { case Type::Node0: __builtin_unreachable(); // GCOVR_EXCL_LINE @@ -858,11 +867,11 @@ void maybeDownsize(Node *self, NodeAllocators *allocators, int minCapacity = self4->partialKeyLen + 1 + child->partialKeyLen; if (minCapacity > child->partialKeyCapacity) { - // TODO resize child? It seems to be quite challenging to implement, - // since callers would now have to account for erase invalidating - // not on the search path. We could lower kBytesPerKey by doing this - // though. - return; + const bool update = child == dontInvalidate; + makeCapacityAtLeast(child, minCapacity, allocators, impl); + if (update) { + dontInvalidate = child; + } } // Merge partial key with child @@ -964,16 +973,27 @@ void maybeDownsize(Node *self, NodeAllocators *allocators, // Precondition: self is not the root. May invalidate nodes along the search // path to self. May invalidate children of self->parent. Returns a pointer to -// the node after self. -Node *erase(Node *self, NodeAllocators *allocators, ConflictSet::Impl *impl) { +// the node after self. If erase invalidates the pointee of `dontInvalidate`, it +// will update it to its new pointee as well. +Node *erase(Node *self, NodeAllocators *allocators, ConflictSet::Impl *impl, + Node *&dontInvalidate) { assert(self->parent != nullptr); +#if DEBUG_VERBOSE && !defined(NDEBUG) + fprintf(stderr, "Erase: %s\n", getSearchPathPrintable(self).c_str()); +#endif + Node *parent = self->parent; uint8_t parentsIndex = self->parentsIndex; auto *result = nextLogical(self); self->entryPresent = false; if (self->numChildren != 0) { + const bool update = result == dontInvalidate; + maybeDownsize(self, allocators, impl, result); + if (update) { + dontInvalidate = result; + } return result; } @@ -1027,9 +1047,13 @@ Node *erase(Node *self, NodeAllocators *allocators, ConflictSet::Impl *impl) { --parent->numChildren; if (parent->numChildren == 0 && !parent->entryPresent && parent->parent != nullptr) { - erase(parent, allocators, impl); + return erase(parent, allocators, impl, dontInvalidate); } else { + const bool update = result == dontInvalidate; maybeDownsize(parent, allocators, impl, result); + if (update) { + dontInvalidate = result; + } } return result; } @@ -1233,10 +1257,6 @@ struct SearchStepWise { } }; -namespace { -std::string getSearchPathPrintable(Node *n); -} - // Logically this is the same as performing firstGeq and then checking against // point or range version according to cmp, but this version short circuits as // soon as it can prove that there's no conflict. @@ -2024,14 +2044,8 @@ void addWriteRange(Node *&root, int64_t oldestVersion, assert(beginNode->entryPresent); } - for (beginNode = nextLogical(beginNode); beginNode != endNode;) { - auto *next = nextLogical(beginNode); - bool done = next == endNode; - erase(beginNode, allocators, impl); - if (done) { - break; - } - beginNode = next; + for (beginNode = nextLogical(beginNode); beginNode != endNode; + beginNode = erase(beginNode, allocators, impl, endNode)) { } } @@ -2157,7 +2171,8 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl { // There's no way to insert a range such that range version of the right // node is greater than the point version of the left node assert(n->entry.rangeVersion <= oldestVersion); - n = erase(n, &allocators, this); + Node *dummy = nullptr; + n = erase(n, &allocators, this, dummy); } else { maybeDecreaseCapacity(n, &allocators, this); n = nextLogical(n); @@ -2511,9 +2526,10 @@ Iterator firstGeq(Node *n, std::string_view key) { } if (node->numChildren + int(node->entryPresent) < minNumChildren) { fprintf(stderr, - "%s has %d children, which is less than the minimum required %d\n", + "%s has %d children + %d entries, which is less than the minimum " + "required %d\n", getSearchPathPrintable(node).c_str(), node->numChildren, - minNumChildren); + int(node->entryPresent), minNumChildren); success = false; } // TODO check that the max capacity property eventually holds