diff --git a/ConflictSet.cpp b/ConflictSet.cpp index 7ca8296..87464a0 100644 --- a/ConflictSet.cpp +++ b/ConflictSet.cpp @@ -680,73 +680,101 @@ std::string getSearchPath(Node *n) { } } // namespace -Iterator firstGeq(Node *n, const std::span key) { - auto remaining = key; +struct FirstGeqStepwise { + Node *n; + std::span remaining; Node *nextSib = nullptr; - for (;;) { - if (n->partialKeyLen > 0) { - int commonLen = std::min(n->partialKeyLen, remaining.size()); - for (int i = 0; i < commonLen; ++i) { - auto c = n->partialKey[i] <=> remaining[i]; - if (c == 0) { - continue; + int cmp; + + enum Phase { Search, DownLeftSpine }; + Phase phase; + + FirstGeqStepwise(Node *n, std::span remaining) + : n(n), remaining(remaining), phase(Search) {} + + bool step() { + switch (phase) { + case Search: + if (n->partialKeyLen > 0) { + int commonLen = std::min(n->partialKeyLen, remaining.size()); + for (int i = 0; i < commonLen; ++i) { + auto c = n->partialKey[i] <=> remaining[i]; + if (c == 0) { + continue; + } + if (c > 0) { + return downLeftSpine(); + } else { + n = nextSib; + return downLeftSpine(); + } } - if (c > 0) { - goto downLeftSpine; + if (commonLen == n->partialKeyLen) { + // partial key matches + remaining = + remaining.subspan(commonLen, remaining.size() - commonLen); + } else if (n->partialKeyLen > int(remaining.size())) { + // n is the first physical node greater than remaining, and there's no + // eq node + return downLeftSpine(); + } + } + if (remaining.size() == 0) { + if (n->entryPresent) { + cmp = 0; + return true; + } + int c = getChildGeq(n, 0); + assert(c >= 0); + n = getChildExists(n, c); + return downLeftSpine(); + } else { + int c = getChildGeq(n, remaining[0]); + int c2 = getChildGeq(n, int(remaining[0]) + 1); + if (c2 >= 0) { + nextSib = getChildExists(n, c2); + } + if (c == remaining[0]) { + n = getChildExists(n, c); + remaining = remaining.subspan(1, remaining.size() - 1); } else { - n = nextSib; - goto downLeftSpine; + if (c >= 0) { + n = getChildExists(n, c); + return downLeftSpine(); + } else { + n = nextSib; + return downLeftSpine(); + } } } - if (commonLen == n->partialKeyLen) { - // partial key matches - remaining = remaining.subspan(commonLen, remaining.size() - commonLen); - } else if (n->partialKeyLen > int(remaining.size())) { - // n is the first physical node greater than remaining, and there's no - // eq node - goto downLeftSpine; - } - } - if (remaining.size() == 0) { + return false; + case DownLeftSpine: if (n->entryPresent) { - return {n, 0}; + cmp = 1; + return true; } int c = getChildGeq(n, 0); assert(c >= 0); n = getChildExists(n, c); - goto downLeftSpine; - } else { - int c = getChildGeq(n, remaining[0]); - int c2 = getChildGeq(n, int(remaining[0]) + 1); - if (c2 >= 0) { - nextSib = getChildExists(n, c2); - } - if (c == remaining[0]) { - n = getChildExists(n, c); - remaining = remaining.subspan(1, remaining.size() - 1); - } else { - if (c >= 0) { - n = getChildExists(n, c); - goto downLeftSpine; - } else { - n = nextSib; - goto downLeftSpine; - } - } + return false; } } -downLeftSpine: - if (n == nullptr) { - return {nullptr, 1}; - } - for (;;) { - if (n->entryPresent) { - return {n, 1}; + + bool downLeftSpine() { + if (n == nullptr) { + cmp = 1; + return true; } - int c = getChildGeq(n, 0); - assert(c >= 0); - n = getChildExists(n, c); + phase = DownLeftSpine; + return step(); } +}; + +Iterator firstGeq(Node *n, const std::span key) { + FirstGeqStepwise stepwise{n, key}; + while (!stepwise.step()) + ; + return {stepwise.n, stepwise.cmp}; } Iterator firstGeq(Node *n, std::string_view key) { @@ -832,84 +860,82 @@ downLeftSpine: bool checkRangeRead(Node *n, const std::span begin, const std::span end, int64_t readVersion) { - auto left = firstGeq(n, begin); - auto right = firstGeq(n, end); - - Arena arena; - auto leftPath = vector(arena); - auto rightPath = vector(arena); - for (auto *iter = left.n; iter != nullptr; iter = iter->parent) { - leftPath.push_back(iter); - } - for (auto *iter = right.n; iter != nullptr; iter = iter->parent) { - rightPath.push_back(iter); - } - Node *lca = n; - for (int i = 0; int(leftPath.size()) - 1 - i >= 0 && - int(rightPath.size()) - 1 - i >= 0 && - leftPath[int(leftPath.size()) - 1 - i] == - rightPath[(rightPath.size()) - 1 - i]; - ++i) { - lca = leftPath[int(leftPath.size()) - 1 - i]; + auto left = FirstGeqStepwise{n, begin}; + auto right = FirstGeqStepwise{n, end}; + bool leftDone; + bool rightDone; + for (;;) { + if (left.phase == FirstGeqStepwise::Search && + right.phase == FirstGeqStepwise::Search && + left.n->maxVersion <= readVersion) { + return true; + } + leftDone = left.step(); + rightDone = right.step(); + if (leftDone || rightDone) { + break; + } + if (left.n != right.n) { + break; + } } -#if DEBUG_VERBOSE && !defined(NDEBUG) - fprintf(stderr, "firstGeq for `%s' got `%s'\n", printable(begin).c_str(), - getSearchPathPrintable(left.n).c_str()); - fprintf(stderr, "firstGeq for `%s' got `%s'\n", printable(end).c_str(), - getSearchPathPrintable(right.n).c_str()); - fprintf(stderr, "lca `%s'\n", getSearchPathPrintable(lca).c_str()); -#endif - if (left.n != nullptr && left.cmp != 0 && - left.n->entry.rangeVersion > readVersion) { - return false; + if (!leftDone && !rightDone) { + assert(left.n->parent == right.n->parent); + for (int c = left.n->parentsIndex; c < right.n->parentsIndex; + c = getChildGeq(left.n->parent, c + 1)) { + assert(c >= 0); + if (getChildExists(left.n->parent, c)->maxVersion > readVersion) { + return false; + } + } + // We've checked everything from begin to the search path of right.n + // Now check from the search path of right.n to end + for (;;) { + if (right.phase == FirstGeqStepwise::Search && + right.n->maxVersion <= readVersion) { + return true; + } + rightDone = right.step(); + if (rightDone) { + return right.n->entry.rangeVersion <= readVersion; + } + } } - if (left.n == right.n) { + + if (leftDone) { + if (left.n == nullptr) { + return true; + } + if (right.phase == FirstGeqStepwise::DownLeftSpine) { + if (left.n->maxVersion > readVersion) { + return false; + } + } + + // We've checked everything from begin to the search path of right.n + // Now check from the search path of right.n to end + for (;;) { + if (right.phase == FirstGeqStepwise::Search && + right.n->maxVersion <= readVersion) { + return true; + } + rightDone = right.step(); + if (rightDone) { + return right.n->entry.rangeVersion <= readVersion; + } + } + } + + if (rightDone) { + // This would mean that left is overtaking right + __builtin_unreachable(); + } + + { + assert(left.n == right.n); return true; } - assert(left.n != nullptr); - auto boundaryVersion = left.n->entry.pointVersion; - if (left.cmp != 0) { - boundaryVersion = std::max(boundaryVersion, left.n->entry.rangeVersion); - } - if (right.n != nullptr) { - boundaryVersion = std::max(boundaryVersion, right.n->entry.rangeVersion); - } - if (boundaryVersion > readVersion) { - return false; - } - - if (left.n != lca) { - while (left.n->parent != lca) { - for (int c = getChildGeq(left.n->parent, int(left.n->parentsIndex) + 1); - c >= 0; c = getChildGeq(left.n->parent, c + 1)) { - if (getChildExists(left.n->parent, c)->maxVersion > readVersion) { - return false; - } - } - left.n = left.n->parent; - } - } - if (right.n != nullptr && right.n != lca) { - while (right.n->parent != lca) { - for (int c = getChildLeq(right.n->parent, int(right.n->parentsIndex) - 1); - c >= 0; c = getChildLeq(right.n->parent, c - 1)) { - if (getChildExists(right.n->parent, c)->maxVersion > readVersion) { - return false; - } - } - right.n = right.n->parent; - } - } - for (int c = left.n != lca ? getChildGeq(lca, int(left.n->parentsIndex) + 1) - : getChildGeq(lca, 0); - c >= 0 && (right.n == nullptr || c < right.n->parentsIndex); - c = getChildGeq(lca, c + 1)) { - if (getChildExists(lca, c)->maxVersion > readVersion) { - return false; - } - } - return true; } // Returns a pointer to the newly inserted node. caller is reponsible for