From b92f0ec227108cc1379370a08823f1c3e72c464b Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 3 Jun 2024 22:28:01 -0700 Subject: [PATCH] Add efficient firstGeq overload for latestVersion --- Bench.cpp | 3 + FdbVersionedMap.cpp | 29 ++++++ VersionedMap.cpp | 203 +++++++++++++++++++++++++-------------- apple-symbol-exports.txt | 1 + include/VersionedMap.h | 5 + 5 files changed, 168 insertions(+), 73 deletions(-) diff --git a/Bench.cpp b/Bench.cpp index 0bc5681..d0afa78 100644 --- a/Bench.cpp +++ b/Bench.cpp @@ -97,6 +97,9 @@ void bulkFirstGeq() { bench.run("bulkFirstGeq", [&] { versionedMap.firstGeq(keys, versions, iterators, kNumQueries); }); + + bench.run("bulkFirstGeq (latest version)", + [&] { versionedMap.firstGeq(keys, iterators, kNumQueries); }); } int main() { diff --git a/FdbVersionedMap.cpp b/FdbVersionedMap.cpp index 9a224c1..bf2b8a6 100644 --- a/FdbVersionedMap.cpp +++ b/FdbVersionedMap.cpp @@ -1416,6 +1416,35 @@ void VersionedMap::firstGeq(const Key *key, const int64_t *version, } } +void VersionedMap::firstGeq(const Key *key, Iterator *iterator, + int count) const { + for (int i = 0; i < count; i++) { + if (iterator[i].impl != nullptr) { + iterator[i].impl->~Impl(); + new (iterator[i].impl) Iterator::Impl(); + } else { + // TODO re-use root if version if matches + iterator[i].impl = + new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); + } + + auto view = impl->versionedData.atLatest(); + iterator[i].impl->iter = view.lastLessOrEqual(key[i]); + // Increment if the mutation is < key[i], and doesn't intersect it + if (iterator[i].impl->iter) { + if (iterator[i].impl->iter->isValue() && + iterator[i].impl->iter.key() < key[i]) { + ++iterator[i].impl->iter; + } else if (iterator[i].impl->iter->isClearTo() && + iterator[i].impl->iter->getEndKey() <= key[i]) { + ++iterator[i].impl->iter; + } + } else { + iterator[i].impl->iter = view.begin(); + } + } +} + VersionedMap::Iterator VersionedMap::begin(int64_t version) const { Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); diff --git a/VersionedMap.cpp b/VersionedMap.cpp index 38fa53a..92862b9 100644 --- a/VersionedMap.cpp +++ b/VersionedMap.cpp @@ -949,9 +949,23 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl { totalMallocBytes += mallocBytesDelta; } + struct StepwiseFirstGeq { + + const VersionedMap::Impl *map; + const weaselab::VersionedMap::Key *key; + int64_t version; + weaselab::VersionedMap::Iterator *iterator; + + void begin(uint32_t root); + bool step(); + void end(); + }; + void firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const; + void firstGeq(const Key *key, Iterator *iterator, int count) const; + // State used to resume scanning and removing old entries in `addMutations` Key continueKey; Arena continueArena; @@ -1222,6 +1236,67 @@ bool geq(const VersionedMap::Iterator::VersionedMutation &m, } } +void VersionedMap::Impl::StepwiseFirstGeq::begin(uint32_t root) { + if (iterator->impl != nullptr) { + iterator->impl->~Impl(); + new (iterator->impl) Iterator::Impl(); + } else { + iterator->impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); + } + Finger &finger = iterator->impl->finger; + finger.clear(); + bool ignored = false; + finger.push(root, ignored); +} + +bool VersionedMap::Impl::StepwiseFirstGeq::step() { + Finger &finger = iterator->impl->finger; + + auto n = finger.backNode(); + if (n == 0) { + return true; + } + auto c = *key <=> map->mm.base[n]; + if (c == 0) { + // No duplicates + return true; + } + finger.push(map->child(n, c > 0, version), c > 0); + return false; +} + +void VersionedMap::Impl::StepwiseFirstGeq::end() { + Finger &finger = iterator->impl->finger; + if (finger.searchPathSize() > 0 && finger.backNode() == 0) { + map->move(finger, version); + if (finger.searchPathSize() > 0) { + assert(finger.backNode() != 0); + } + } + + iterator->impl->version = version; + iterator->impl->map = map; + + const Entry *prev = nullptr; + for (;;) { + if (finger.searchPathSize() == 0) { + break; + } else { + materializeMutations(iterator->impl, prev, nullptr); + for (int j = 0; j < iterator->impl->mutationCount; ++j) { + if (geq(iterator->impl->mutations[j], *key)) { + iterator->impl->mutationIndex = j; + goto loopEnd; + } + } + } + prev = iterator->impl->map->mm.base[finger.backNode()].entry; + iterator->impl->map->move( + finger, iterator->impl->version); + } +loopEnd:; +} + void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, const int64_t *version, Iterator *iterator, int count) const { @@ -1229,78 +1304,6 @@ void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, return; } - struct StepwiseFirstGeq { - - const VersionedMap::Impl *map; - const weaselab::VersionedMap::Key *key; - int64_t version; - weaselab::VersionedMap::Iterator *iterator; - - void begin(RootSet::ThreadSafeHandle handle) { - if (iterator->impl != nullptr) { - iterator->impl->~Impl(); - new (iterator->impl) Iterator::Impl(); - } else { - iterator->impl = - new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); - } - uint32_t root = handle.rootForVersion(version); - Finger &finger = iterator->impl->finger; - finger.clear(); - bool ignored = false; - finger.push(root, ignored); - } - - bool step() { - Finger &finger = iterator->impl->finger; - - auto n = finger.backNode(); - if (n == 0) { - return true; - } - auto c = *key <=> map->mm.base[n]; - if (c == 0) { - // No duplicates - return true; - } - finger.push(map->child(n, c > 0, version), - c > 0); - return false; - } - - void end() { - Finger &finger = iterator->impl->finger; - if (finger.searchPathSize() > 0 && finger.backNode() == 0) { - map->move(finger, version); - if (finger.searchPathSize() > 0) { - assert(finger.backNode() != 0); - } - } - - iterator->impl->version = version; - iterator->impl->map = map; - - const Entry *prev = nullptr; - for (;;) { - if (finger.searchPathSize() == 0) { - break; - } else { - materializeMutations(iterator->impl, prev, nullptr); - for (int j = 0; j < iterator->impl->mutationCount; ++j) { - if (geq(iterator->impl->mutations[j], *key)) { - iterator->impl->mutationIndex = j; - goto loopEnd; - } - } - } - prev = iterator->impl->map->mm.base[finger.backNode()].entry; - iterator->impl->map->move( - finger, iterator->impl->version); - } - loopEnd:; - } - }; - // Use stack allocation for small count Arena arena; constexpr int kStackAllocThreshold = 2; @@ -1322,7 +1325,56 @@ void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, stepwise[i].key = &key[i]; stepwise[i].version = version[i]; stepwise[i].iterator = &iterator[i]; - stepwise[i].begin(handle); + stepwise[i].begin(handle.rootForVersion(version[i])); + nextJob[i] = i + 1; + } + nextJob[count - 1] = 0; + int prevJob = count - 1; + int job = 0; + + for (;;) { + if (stepwise[job].step()) { + stepwise[job].end(); + if (job == prevJob) { + break; + } + nextJob[prevJob] = nextJob[job]; + job = prevJob; + } + prevJob = job; + job = nextJob[job]; + } +} + +void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, + Iterator *iterator, int count) const { + if (count == 0) { + return; + } + + // Use stack allocation for small count + Arena arena; + constexpr int kStackAllocThreshold = 2; + StepwiseFirstGeq stepwiseStackAlloc[kStackAllocThreshold]; + int nextJobStackAllocation[kStackAllocThreshold]; + StepwiseFirstGeq *stepwise; + int *nextJob; + if (count <= kStackAllocThreshold) { + stepwise = stepwiseStackAlloc; + nextJob = nextJobStackAllocation; + } else { + stepwise = new (arena) StepwiseFirstGeq[count]; + nextJob = new (arena) int[count]; + } + + const uint32_t root = roots.roots()[roots.rootCount() - 1]; + assert(root == roots.getThreadSafeHandle().rootForVersion(latestVersion)); + for (int i = 0; i < count; ++i) { + stepwise[i].map = this; + stepwise[i].key = &key[i]; + stepwise[i].version = latestVersion; + stepwise[i].iterator = &iterator[i]; + stepwise[i].begin(root); nextJob[i] = i + 1; } nextJob[count - 1] = 0; @@ -1355,6 +1407,11 @@ void VersionedMap::firstGeq(const Key *key, const int64_t *version, impl->firstGeq(key, version, iterator, count); } +void VersionedMap::firstGeq(const Key *key, Iterator *iterator, + int count) const { + impl->firstGeq(key, iterator, count); +} + VersionedMap::Iterator VersionedMap::begin(int64_t version) const { VersionedMap::Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); diff --git a/apple-symbol-exports.txt b/apple-symbol-exports.txt index 2d382a8..22bc5a3 100644 --- a/apple-symbol-exports.txt +++ b/apple-symbol-exports.txt @@ -27,4 +27,5 @@ __ZNK8weaselab12VersionedMap8IteratordeEv __ZNK8weaselab12VersionedMap8IteratoreqERKS1_ __ZNK8weaselab12VersionedMap8IteratorneERKS1_ __ZNK8weaselab12VersionedMap8firstGeqEPKNS0_3KeyEPKxPNS0_8IteratorEi +__ZNK8weaselab12VersionedMap8firstGeqEPKNS0_3KeyEPNS0_8IteratorEi __ZNK8weaselab12VersionedMap8getBytesEv \ No newline at end of file diff --git a/include/VersionedMap.h b/include/VersionedMap.h index bd4d236..0b46ab1 100644 --- a/include/VersionedMap.h +++ b/include/VersionedMap.h @@ -158,6 +158,11 @@ struct __attribute__((__visibility__("default"))) VersionedMap { void firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const; + /** Equivalent to `firstGeq(key, version, iterator, 1)` where each element in + * `version` is the latest version, but more efficient. + */ + void firstGeq(const Key *key, Iterator *iterator, int count) const; + /** Returns an iterator to the first mutation visible in the view at * `version`, or `end()` if none exists. Thread-safe as long as `version` is * not concurrently invalidated by `setOldestVersion`. */