diff --git a/ConflictSet.cpp b/ConflictSet.cpp index 16fa1c0..f81681a 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -21,6 +21,33 @@ // ==================== BEGIN IMPLEMENTATION ==================== +int compare(std::span lhs, std::span rhs) { + int cl = std::min(lhs.size(), rhs.size()); + int c = cl == 0 ? 0 : memcmp(lhs.data(), rhs.data(), cl); + if (c != 0) { + return c; + } + return int(rhs.size()) - int(lhs.size()); +} + +std::span strincMutate(std::span str, bool &ok) { + int index; + for (index = str.size() - 1; index >= 0; index--) + if (str[index] != 255) + break; + + // Must not be called with a string that consists only of zero or more '\xff' + // bytes. + if (index < 0) { + ok = false; + return {}; + } + ok = true; + str = str.subspan(0, index + 1); + ++str.back(); + return str; +} + struct Entry { int64_t pointVersion; int64_t rangeVersion; @@ -485,6 +512,26 @@ Node *nextPhysical(Node *node) { } } +Node *nextPhysical(Node *node, Vector &searchPath) { + int index = -1; + for (;;) { + auto nextChild = getChildGeq(node, index + 1); + if (nextChild >= 0) { + auto *result = getChildExists(node, nextChild); + searchPath.push_back(nextChild); + searchPath.insert(searchPath.end(), result->partialKey, + result->partialKey + result->partialKeyLen); + return result; + } + if (node->parent == nullptr) { + return nullptr; + } + searchPath.resize(int(searchPath.size()) - 1 - node->partialKeyLen); + index = node->parentsIndex; + node = node->parent; + } +} + Node *nextLogical(Node *node) { for (node = nextPhysical(node); node != nullptr && !node->entryPresent; node = nextPhysical(node)) @@ -511,6 +558,27 @@ Node *nextSibling(Node *node) { } } +Node *nextSibling(Node *node, Vector &searchPath) { + for (;;) { + if (node->parent == nullptr) { + assert(searchPath.size() == 0); + return nullptr; + } + auto next = getChildGeq(node->parent, node->parentsIndex + 1); + if (next < 0) { + node = node->parent; + searchPath.resize(int(searchPath.size()) - 1 - node->partialKeyLen); + } else { + searchPath.resize(int(searchPath.size()) - 1 - node->partialKeyLen); + auto *result = getChildExists(node->parent, next); + searchPath.push_back(next); + searchPath.insert(searchPath.end(), result->partialKey, + result->partialKey + result->partialKeyLen); + return result; + } + } +} + struct FirstGeqStepwise { Node *n; std::span remaining; @@ -691,6 +759,23 @@ namespace { std::string getSearchPathPrintable(Node *n); } +Vector getSearchPath(Arena &arena, Node *n) { + assert(n != nullptr); + auto result = vector(arena); + for (;;) { + for (int i = n->partialKeyLen - 1; i >= 0; --i) { + result.push_back(n->partialKey[i]); + } + if (n->parent == nullptr) { + break; + } + result.push_back(n->parentsIndex); + n = n->parent; + } + std::reverse(result.begin(), result.end()); + return result; +} + bool checkRangeRead(Node *n, const std::span begin, const std::span end, int64_t readVersion) { auto left = FirstGeqStepwise{n, begin}; @@ -725,26 +810,6 @@ bool checkRangeRead(Node *n, const std::span begin, ; } - Arena arena{64 << 10}; - // auto leftBorder = hashSet(arena); - // auto rightBorder = hashSet(arena); - auto leftBorder = vector(arena); - auto rightBorder = vector(arena); - { - auto *l = left.n; - auto *r = right.n; - for (; l != nullptr && r != nullptr; l = l->parent, r = r->parent) { - leftBorder.push_back(l); - rightBorder.push_back(r); - } - for (; l != nullptr; l = l->parent) { - leftBorder.push_back(l); - } - for (; r != nullptr; r = r->parent) { - rightBorder.push_back(r); - } - } - #if DEBUG_VERBOSE && !defined(NDEBUG) fprintf(stderr, "firstGeq for `%s' got `%s'\n", printable(begin).c_str(), getSearchPathPrintable(left.n).c_str()); @@ -764,31 +829,30 @@ bool checkRangeRead(Node *n, const std::span begin, return false; } - // TODO better data structure for pointer set. Also collect during first - // traversal? - for (auto *iter = nextPhysical(left.n); iter != right.n;) { - // const bool onLeftPath = leftBorder.find(iter) != leftBorder.end(); - // const bool onRightPath = rightBorder.find(iter) != rightBorder.end(); - const bool onLeftPath = std::find(leftBorder.begin(), leftBorder.end(), - iter) != leftBorder.end(); - const bool onRightPath = std::find(rightBorder.begin(), rightBorder.end(), - iter) != rightBorder.end(); - if (!onLeftPath && !onRightPath) { + Arena arena; + auto searchPath = getSearchPath(arena, left.n); + + for (auto *iter = nextPhysical(left.n, searchPath); iter != right.n;) { + assert(searchPath == getSearchPath(arena, iter)); + bool ok = true; + auto rangeEnd = strincMutate(searchPath, ok); + if (!ok) { + return iter->maxVersion <= readVersion; + } + int c = compare(rangeEnd, end); + --rangeEnd.back(); + if (c == 0) { + return iter->maxVersion <= readVersion; + } else if (c < 0) { if (iter->maxVersion > readVersion) { return false; } - // Skip subtree - iter = nextSibling(iter); + iter = nextSibling(iter, searchPath); } else { - if (onRightPath && iter->maxVersion <= readVersion) { + if (iter->maxVersion <= readVersion) { return true; } - if (iter->entryPresent && - std::max(iter->entry.pointVersion, iter->entry.rangeVersion) > - readVersion) { - return false; - } - iter = nextPhysical(iter); + iter = nextPhysical(iter, searchPath); } } return true; @@ -1113,18 +1177,7 @@ std::string strinc(std::string_view str, bool &ok) { std::string getSearchPath(Node *n) { assert(n != nullptr); Arena arena; - auto result = vector(arena); - for (;;) { - for (int i = n->partialKeyLen - 1; i >= 0; --i) { - result.push_back(n->partialKey[i]); - } - if (n->parent == nullptr) { - break; - } - result.push_back(n->parentsIndex); - n = n->parent; - } - std::reverse(result.begin(), result.end()); + auto result = getSearchPath(arena, n); return std::string((const char *)result.data(), result.size()); }