#include "VersionedMap.h" #include "Internal.h" #include "KeyCompare.h" #include "PrintMutation.h" #include "RootSet.h" #include #include #include #include #include #include #include #include #include #include #include #include static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::bidirectional_iterator); static_assert(std::is_standard_layout_v< weaselab::VersionedMap::Iterator::VersionedMutation>); void *mmapSafe(void *addr, size_t len, int prot, int flags, int fd, off_t offset) { void *result = mmap(addr, len, prot, flags, fd, offset); if (result == MAP_FAILED) { int err = errno; // GCOVR_EXCL_LINE fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Error calling mmap(%p, %zu, %d, %d, %d, %jd): %d %s\n", // GCOVR_EXCL_LINE addr, len, prot, flags, fd, (intmax_t)offset, err, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } return result; } void mprotectSafe(void *p, size_t s, int prot) { if (mprotect(p, s, prot) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, // GCOVR_EXCL_LINE "Error calling mprotect(%p, %zu, %d): %s\n", // GCOVR_EXCL_LINE p, // GCOVR_EXCL_LINE s, // GCOVR_EXCL_LINE prot, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } void munmapSafe(void *ptr, size_t size) { if (munmap(ptr, size) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, "Error calling munmap(%p, %zu): %s\n", // GCOVR_EXCL_LINE ptr, // GCOVR_EXCL_LINE size, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } namespace weaselab { // 96 is enough for an entire search path in a tree with a size that // overflows int. See // https://en.wikipedia.org/wiki/Random_binary_tree#The_longest_path constexpr int kPathLengthUpperBound = 96; struct Entry { // If there is a point mutation at key, then pointVersion is >= 0 and key has // not been modified since pointVersion. Otherwise it's negative. int64_t pointVersion; // If there is a range mutation ending at key, then rangeVersion is >= 0 and // the range has not been modified since rangeVersion. Otherwise it's // negative. int64_t rangeVersion; int keyLen; // Negative if this key is cleared. Only meaningful if this is a point // mutation. int valLen; mutable int refCount; uint32_t priority; // True if the entry is a point mutation. If false, this entry's key should be // read through to the underlying data structure. bool pointMutation() const { return pointVersion >= 0; } bool pointSet() const { return pointVersion >= 0 && valLen >= 0; } bool pointClear() const { return pointVersion >= 0 && valLen < 0; } // True if mutations in (pred, this) are cleared. If false, (pred, this) // should be read through to the underlying data structure. bool clearTo() const { return rangeVersion >= 0; } // There's an extra zero byte past the end of getKey, used for // reconstructing logical mutations without copies. const uint8_t *getKey() const { return (const uint8_t *)(this + 1); } const uint8_t *getVal() const { return (const uint8_t *)(this + 1) + 1 + keyLen; } Entry *addref() const { ++refCount; #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("addref %p to %d\n", this, refCount); } #endif return (Entry *)this; } void delref() const { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("delref %p to %d\n", this, refCount - 1); } #endif if (--refCount == 0) { safe_free((void *)this, sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); } } static Entry *make(int64_t pointVersion, int64_t rangeVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, uint32_t priority) { auto e = (Entry *)safe_malloc(sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); e->pointVersion = pointVersion; e->rangeVersion = rangeVersion; e->keyLen = keyLen; e->valLen = valLen; e->refCount = 1; e->priority = priority; if (keyLen > 0) { memcpy((uint8_t *)e->getKey(), key, keyLen); } ((uint8_t *)e->getKey())[keyLen] = 0; if (valLen > 0) { memcpy((uint8_t *)e->getVal(), val, valLen); } return e; } }; struct UpdateInfo { UpdateInfo() : version(kVersionIfNotUpdated) {} int64_t version; constexpr static int64_t kVersionIfNotUpdated = 0x7fffffffffffffff; bool updated() const { return version != UpdateInfo::kVersionIfNotUpdated; } bool updated(int64_t at) const { return version <= at; } }; static_assert(std::atomic::is_always_lock_free); struct Node { union { std::atomic updateInfo; uint32_t nextFree; }; Entry *entry; // [left/right, older/newer]. Logically this is only 1 aux pointer since we // only store one updateInfo, but this encoding let's us write a branch-free // `child` function, which really helps with the effective ILP of the bulk // firstGeq function. uint32_t pointer[2][2]; }; // Limit mmap to 32 GiB so valgrind doesn't complain. // https://bugs.kde.org/show_bug.cgi?id=229500 constexpr size_t kMapSize = size_t(32) * (1 << 30); const size_t kPageSize = sysconf(_SC_PAGESIZE); const uint32_t kNodesPerPage = kPageSize / sizeof(Node); const uint32_t kMinAddressable = kNodesPerPage; constexpr uint32_t kUpsizeBytes = 1 << 20; constexpr uint32_t kUpsizeNodes = kUpsizeBytes / sizeof(Node); static_assert(kUpsizeNodes * sizeof(Node) == kUpsizeBytes); struct BitSet { explicit BitSet(uint32_t size) : words((uint64_t *)safe_calloc(size / 64 + 1, 8)), size(size) {} bool test(uint32_t i) const { return words[i >> 6] & (uint64_t(1) << (i & 63)); } // Returns former value bool set(uint32_t i) { const auto prev = words[i >> 6]; const auto mask = uint64_t(1) << (i & 63); words[i >> 6] |= mask; max_ = std::max(i, max_); return prev & mask; } // Returns 0 if set is empty uint32_t max() const { return max_; } template void iterateAbsentApproxBackwards(F f, uint32_t begin, uint32_t end) const { // TODO can this be improved? We can do something with a word at a time // instead of a bit at a time. The first attempt at doing so benchmarked as // slower. assert(begin != 0); for (uint32_t i = end - 1; i >= begin; --i) { if (!test(i)) { f(i); } } } ~BitSet() { safe_free(words, (size / 64 + 1) * 8); } private: uint32_t max_ = 0; uint64_t *const words; const uint32_t size; }; int64_t mmapBytes = 0; int64_t peakMmapBytes = 0; struct MemManager { MemManager() : base((Node *)mmapSafe(nullptr, kMapSize, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) { if (kPageSize % sizeof(Node) != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kPageSize not a multiple of Node size\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } if (kUpsizeBytes % kPageSize != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kUpsizeBytes not a multiple of kPageSize\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } ~MemManager() { gc(nullptr, 0, 0); munmapSafe(base, kMapSize); } Node *const base; uint32_t allocate() { if (freeList != 0) { uint32_t result = freeList; freeList = base[result].nextFree; assert(base[result].entry == nullptr); return result; } if (next == firstUnaddressable) { mprotectSafe(base + firstUnaddressable, kUpsizeBytes, PROT_READ | PROT_WRITE); VALGRIND_MAKE_MEM_NOACCESS(base + firstUnaddressable, kUpsizeBytes); firstUnaddressable += kUpsizeNodes; #if SHOW_MEMORY mmapBytes = getBytes(); peakMmapBytes = std::max(peakMmapBytes, mmapBytes); #endif if (firstUnaddressable > kMapSize / sizeof(Node)) { fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Out of memory: firstUnaddressable > kMapSize / " // GCOVR_EXCL_LINE "sizeof(Node)\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } VALGRIND_MAKE_MEM_UNDEFINED(base + next, sizeof(Node)); return next++; } void gc(const uint32_t *roots, int numRoots, int64_t oldestVersion) { // Calculate reachable set BitSet reachable{next}; Arena arena; constexpr int kInitialStackCapacity = 128; int64_t stackCapacity = kInitialStackCapacity; uint32_t stackStack[kInitialStackCapacity]; uint32_t *stack = stackStack; int stackIndex = 0; auto tryPush = [&]([[maybe_unused]] uint32_t parent, uint32_t child) { if (!reachable.set(child)) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf(" GC: reach: %u (parent %u)\n", child, parent); } #endif if (stackIndex == stackCapacity) [[unlikely]] { auto *old = stack; stackCapacity *= 2; stack = new (arena) uint32_t[stackCapacity]; memcpy(stack, old, stackIndex * sizeof(stack[0])); } stack[stackIndex++] = child; } }; for (int i = 0; i < numRoots; ++i) { if (roots[i] == 0) { continue; } tryPush(0, roots[i]); while (stackIndex > 0) { uint32_t p = stack[--stackIndex]; auto &node = base[p]; auto updateInfo = node.updateInfo.load(std::memory_order_relaxed); if (updateInfo.updated()) { if (node.pointer[0][1] != 0) { tryPush(p, node.pointer[0][1]); } if (node.pointer[1][1] != 0) { tryPush(p, node.pointer[1][1]); } } if (!updateInfo.updated(oldestVersion)) { if (node.pointer[0][0] != 0) { tryPush(p, node.pointer[0][0]); } if (node.pointer[1][0] != 0) { tryPush(p, node.pointer[1][0]); } } } } uint32_t max = reachable.max(); if (max == 0) { max = kMinAddressable - 1; } assert(max < next); // Rebuild free list to prefer leftward nodes freeList = 0; reachable.iterateAbsentApproxBackwards( [&](uint32_t i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while building free list\n", i); } #endif base[i].entry->delref(); base[i].entry = nullptr; } base[i].nextFree = freeList; freeList = i; }, kMinAddressable, max + 1); // Entries to the right of max don't need to be in the freelist. They're // allocated by pointer bumping. for (uint32_t i = max + 1; i < next; ++i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while shrinking right\n", i); } #endif base[i].entry->delref(); base[i].entry = nullptr; } } uint32_t newFirstUnaddressable = (max / kNodesPerPage + 1) * kNodesPerPage; if (newFirstUnaddressable < firstUnaddressable) { mprotectSafe(base + newFirstUnaddressable, (firstUnaddressable - newFirstUnaddressable) * sizeof(Node), PROT_NONE); firstUnaddressable = newFirstUnaddressable; #if SHOW_MEMORY mmapBytes = getBytes(); #endif } next = max + 1; assert(firstUnaddressable >= next); VALGRIND_MAKE_MEM_NOACCESS(base + next, (firstUnaddressable - next) * sizeof(Node)); } int64_t getBytes() const { return (firstUnaddressable - kMinAddressable) * sizeof(Node); } private: uint32_t next = kMinAddressable; uint32_t firstUnaddressable = kMinAddressable; uint32_t freeList = 0; }; auto operator<=>(const VersionedMap::Key &lhs, const Node &rhs) { int cl = std::min(lhs.len, rhs.entry->keyLen); if (cl > 0) { int c = memcmp(lhs.p, rhs.entry->getKey(), cl); if (c != 0) { return c <=> 0; } } return lhs.len <=> rhs.entry->keyLen; } constexpr int orderToInt(std::strong_ordering o) { return o == std::strong_ordering::less ? -1 : o == std::strong_ordering::equal ? 0 : 1; } struct Finger { void push(uint32_t node, bool dir) { searchPath[searchPathSize_] = node; direction[searchPathSize_] = dir; ++searchPathSize_; } void pop() { assert(searchPathSize_ > 0); --searchPathSize_; } uint32_t backNode() const { assert(searchPathSize_ > 0); return searchPath[searchPathSize_ - 1]; } uint32_t &backNodeRef() { assert(searchPathSize_ > 0); return searchPath[searchPathSize_ - 1]; } bool backDirection() const { assert(searchPathSize_ > 0); return direction[searchPathSize_ - 1]; } uint32_t searchPathSize() const { return searchPathSize_; } void setSearchPathSizeUnsafe(int size) { searchPathSize_ = size; } Finger() { clear(); } void clear() { #ifndef NDEBUG memset(searchPath, 0, sizeof(searchPath)); memset(direction, 0, sizeof(direction)); #endif searchPathSize_ = 0; } void copyTo(Finger &result) { #ifndef NDEBUG memset(result.searchPath, 0, sizeof(searchPath)); memset(result.direction, 0, sizeof(direction)); #endif memcpy(result.searchPath, searchPath, searchPathSize_ * sizeof(searchPath[0])); memcpy(result.direction, direction, searchPathSize_ * sizeof(direction[0])); result.searchPathSize_ = searchPathSize_; } Finger(const Finger &) = delete; Finger &operator=(const Finger &) = delete; Finger(Finger &&) = delete; Finger &operator=(Finger &&) = delete; bool operator==(const Finger &other) const { bool result = searchPathSize_ == other.searchPathSize_ && (searchPathSize_ == 0 || backNode() == other.backNode()); #ifndef NDEBUG auto expected = searchPathSize_ == other.searchPathSize_ && memcmp(searchPath, other.searchPath, searchPathSize_ * sizeof(searchPath[0])) == 0 && (searchPathSize_ == 0 || memcmp(direction + 1, other.direction + 1, (searchPathSize_ - 1) * sizeof(direction[0])) == 0); assert(result == expected); #endif return result; } private: uint32_t searchPath[kPathLengthUpperBound]; bool direction[kPathLengthUpperBound]; int searchPathSize_; }; VersionedMap::Key keyAfter(VersionedMap::Key k, Arena &arena) { uint8_t *result = new (arena) uint8_t[k.len + 1]; memcpy(result, k.p, k.len); result[k.len] = 0; return {result, k.len + 1}; } struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl { // The last node is allowed to be 0, in which case this is the search path of // where an entry would exist template void move(Finger &finger, int64_t at) const { uint32_t c; if (finger.backNode() != 0 && (c = child(finger.backNode(), kDirection, at)) != 0) { finger.push(c, kDirection); while ((c = child(finger.backNode(), !kDirection, at)) != 0) { finger.push(c, !kDirection); } } else { while (finger.searchPathSize() > 1 && finger.backDirection() == kDirection) { finger.pop(); } finger.pop(); } } template uint32_t child(uint32_t node, bool which, int64_t at) const { assert(node != 0); static_assert(kOrder == std::memory_order_acquire || kOrder == std::memory_order_relaxed); auto &n = mm.base[node]; uint32_t result; assert(at < UpdateInfo::kVersionIfNotUpdated); auto updateInfo = n.updateInfo.load(kOrder); result = n.pointer[which][updateInfo.updated(at)]; assert(result == 0 || result >= kMinAddressable); #ifndef NDEBUG if (result != 0) { assert(mm.base[result].entry != nullptr); } #endif return result; } // Returns the node that results from setting `which` to `child` on `node` uint32_t update(uint32_t node, bool which, uint32_t child, int64_t version) { assert(node == 0 || node >= kMinAddressable); assert(child == 0 || child >= kMinAddressable); if (this->child(node, which, version) == child) { return node; } auto &n = mm.base[node]; auto updateInfo = n.updateInfo.load(std::memory_order_relaxed); const bool updated = updateInfo.updated(); auto doCopy = [&]() { uint32_t copy = mm.allocate(); #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Copy %u to %u\n", node, copy); } #endif auto &c = mm.base[copy]; c.entry = n.entry->addref(); c.pointer[which][0] = child; c.pointer[!which][0] = this->child(node, !which, latestVersion); c.updateInfo.store(UpdateInfo{}, std::memory_order_relaxed); assert(copy == 0 || copy >= kMinAddressable); return copy; }; if (n.entry->pointVersion == version || n.entry->rangeVersion == version) { // This node is not yet published to concurrent readers n.pointer[which][0] = child; assert(node == 0 || node >= kMinAddressable); return node; } if (updateInfo.version == version) { // Not a data race since concurrent readers are reading at a version < // `updateInfo.version` n.pointer[which][1] = child; assert(node == 0 || node >= kMinAddressable); return node; } if (updated) { // We already used this node's in-place update return doCopy(); } else { n.pointer[which][1] = child; n.pointer[!which][1] = n.pointer[!which][0]; updateInfo.version = version; n.updateInfo.store(updateInfo, std::memory_order_release); // Must be last assert(node == 0 || node >= kMinAddressable); return node; } } void rotate(uint32_t &n, int64_t at, bool right) { auto l = child(n, !right, at); n = update( l, right, update(n, !right, child(l, right, at), at), at); } struct Val { const uint8_t *p; int len; }; // Initialize finger to the insertion path of `key`. If `key` is not present, // then the finger ends on a null entry. template void search(Key key, T root, int64_t version, Finger &finger) const { // Prevent integer promotion etc static_assert(std::is_same_v); finger.clear(); bool ignored = false; finger.push(root, ignored); for (;;) { auto n = finger.backNode(); if (n == 0) { break; } auto c = key <=> mm.base[n]; if (c == 0) { // No duplicates break; } finger.push(child(n, c > 0, version), c > 0); } } // If `val` is true, then this is a point set at `latestVersion`. // If `endRange` is true, then this is a range end marker at `latestVersion`. // Otherwise it's the beginning of a range at `latestVersion`. // `finger` is a valid finger to the insertion path of `key` in the latest // version (which can be obtained with `search`) void insert(Key key, std::optional val, bool endRange, Finger &finger) { const bool inserted = finger.backNode() == 0; int64_t pointVersion, rangeVersion; if (val.has_value()) { // Point set pointVersion = latestVersion; if (inserted) { Finger copy; finger.copyTo(copy); move(copy, latestVersion); if (copy.searchPathSize() == 0) { rangeVersion = -1; // Sentinel for "no mutation ending here" } else { rangeVersion = mm.base[copy.backNode()].entry->rangeVersion; } } else { auto *entry = mm.base[finger.backNode()].entry; rangeVersion = entry->rangeVersion; } } else if (endRange) { rangeVersion = latestVersion; if (inserted) { val = {nullptr, -1}; pointVersion = -1; // Sentinel for "no point mutation here" #ifndef NDEBUG // If we inserted this, there would be adjacent clears and so the // range would not be canonical Finger copy; finger.copyTo(copy); move(copy, latestVersion); assert(copy.searchPathSize() == 0 || mm.base[copy.backNode()].entry->rangeVersion < 0); #endif } else { auto *entry = mm.base[finger.backNode()].entry; val = {entry->getVal(), entry->valLen}; pointVersion = entry->pointVersion; } } else { // Beginning of a clear range pointVersion = -1; // Sentinel for "no point mutation here" if (inserted) { // If there were a clear range here, it wouldn't be canonical rangeVersion = -1; // Sentinel for "no mutation ending here" } else { auto *entry = mm.base[finger.backNode()].entry; rangeVersion = entry->rangeVersion; } val = {nullptr, -1}; } // TODO check for noop? // Prepare new node const uint32_t node = newNode( pointVersion, rangeVersion, key.p, key.len, val->p, val->len, inserted ? gRandom.next() : mm.base[finger.backNode()].entry->priority); if (!inserted) { auto &n = mm.base[node]; n.pointer[0][0] = child(finger.backNode(), false, latestVersion); n.pointer[1][0] = child(finger.backNode(), true, latestVersion); } finger.backNodeRef() = node; uint32_t oldSize = finger.searchPathSize(); if (inserted) { // Rotate for (;;) { const uint32_t node = finger.backNode(); oldSize = finger.searchPathSize(); if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); auto &parent = finger.backNodeRef(); parent = update(parent, direction, node, latestVersion); if (mm.base[node].entry->priority > mm.base[parent].entry->priority) { rotate(parent, latestVersion, !direction); } else { break; } } } // Propagate for (;;) { const uint32_t node = finger.backNode(); if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); const auto old = finger.backNode(); auto &parent = finger.backNodeRef(); parent = update(parent, direction, node, latestVersion); if (parent == old) { break; } } finger.setSearchPathSizeUnsafe(oldSize); #ifndef NDEBUG { Finger expected; search(key, latestRoot, latestVersion, expected); assert(finger == expected); } #endif } // Removes `finger` from the tree, and leaves `finger` pointing to insertion // point of its former entry. void remove(Finger &finger) { // True if finger is pointing to an entry > than the entry we're removing // after we rotate it down // Rotate down until we can remove the entry for (;;) { auto &node = finger.backNodeRef(); const auto l = child(node, false, latestVersion); const auto r = child(node, true, latestVersion); if (l == 0 && r == 0) { // TODO we can avoid some rotations if we stop when l or r == 0 node = 0; break; } else { const bool direction = (l == 0 ? 0 : mm.base[l].entry->priority) > (r == 0 ? 0 : mm.base[r].entry->priority); rotate(node, latestVersion, direction); assert(node != 0); finger.push( child(node, direction, latestVersion), direction); } } // propagate up the search path, all the way to the root since we may have // more rotations to do even if an update doesn't change a node pointer auto node = finger.backNode(); assert(node == 0); const auto oldSize = finger.searchPathSize(); for (;;) { if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); auto &parent = finger.backNodeRef(); [[maybe_unused]] auto old = parent; parent = update(parent, direction, node, latestVersion); node = parent; } finger.setSearchPathSizeUnsafe(oldSize); } uint32_t newNode(int64_t version, int64_t rangeVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, uint32_t priority) { auto result = mm.allocate(); auto &node = mm.base[result]; node.pointer[0][0] = 0; node.pointer[1][0] = 0; node.updateInfo.store(UpdateInfo{}, std::memory_order_relaxed); node.entry = Entry::make(version, rangeVersion, key, keyLen, val, valLen, priority); return result; } void setOldestVersion(int64_t oldestVersion) { mallocBytesDelta = 0; this->oldestVersion = oldestVersion; roots.setOldestVersion(oldestVersion); mm.gc(roots.roots(), roots.rootCount(), oldestVersion); totalMallocBytes += mallocBytesDelta; } int64_t getBytes() const { return totalMallocBytes + mm.getBytes(); } void printInOrder(int64_t version) const; void printInOrderHelper(int64_t version, uint32_t node, int depth) const; int accumulatedFuel = 0; void scanAndRemoveOldEntries(int fuel) { accumulatedFuel += fuel; #ifdef NDEBUG // This is here for performance reasons, since we want to amortize the cost // of searching for continueKey. In tests, we want to exercise the rest of // the code often. if (accumulatedFuel < 500) { return; } #endif // Get a finger to the first entry > continueKey, or the last entry if // continueKey is the empty string if (latestRoot == 0) { // Tree is empty return; } Finger finger; if (continueKey.len == 0) { // Set finger to last entry in tree bool ignored = false; finger.push(latestRoot, ignored); uint32_t c; while ((c = child(finger.backNode(), true, latestVersion)) != 0) { finger.push(c, true); } } else { search(continueKey, latestRoot, latestVersion, finger); move(finger, latestVersion); if (finger.searchPathSize() == 0) { continueKey = {nullptr, 0}; return; } } assert(finger.backNode() != 0); int64_t rangeVersion = mm.base[finger.backNode()].entry->rangeVersion; move(finger, latestVersion); if (finger.searchPathSize() == 0) { continueKey = {nullptr, 0}; return; } // Phew. Ok now we have a finger to the next entry to consider removing, and // the range version terminated at this entry. for (; accumulatedFuel > 0; --accumulatedFuel) { const auto &n = mm.base[finger.backNode()]; if (rangeVersion < 0 && std::max(n.entry->pointVersion, n.entry->rangeVersion) < oldestVersion) { remove(finger); if (latestRoot == 0) { return; } } else { rangeVersion = n.entry->rangeVersion; } move(finger, latestVersion); if (finger.searchPathSize() == 0) { continueKey = {nullptr, 0}; return; } } continueArena = Arena(); const auto &n = mm.base[finger.backNode()]; uint8_t *data = new (continueArena) uint8_t[n.entry->keyLen]; memcpy(data, n.entry->getKey(), n.entry->keyLen); continueKey = {data, n.entry->keyLen}; } void addMutations(const Mutation *mutations, int numMutations, int64_t version) { mallocBytesDelta = 0; assert(latestVersion < version); latestVersion = version; latestRoot = roots.roots()[roots.rootCount() - 1]; // TODO tune? scanAndRemoveOldEntries(2 * numMutations + 10); Arena arena; // TODO Improve ILP? for (int i = 0; i < numMutations; ++i) { const auto &m = mutations[i]; switch (m.type) { case Set: { Finger iter; search({m.param1, m.param1Len}, latestRoot, latestVersion, iter); insert({m.param1, m.param1Len}, {{m.param2, m.param2Len}}, /*endRange*/ false, iter); } break; case Clear: { // TODO we can avoid some insertions here. Complexity is getting out of // hand though. if (m.param2Len == 0) { Finger iter; search({m.param1, m.param1Len}, latestRoot, latestVersion, iter); const bool found = iter.searchPathSize() > 0 && iter.backNode() != 0; bool engulfLeft = found && mm.base[iter.backNode()].entry->clearTo(); bool engulfRight = false; const Entry *next; Finger copy; if (iter.searchPathSize() > 0) { iter.copyTo(copy); move(copy, latestVersion); next = copy.searchPathSize() > 0 ? mm.base[copy.backNode()].entry : nullptr; if (next && next->clearTo()) { engulfRight = true; if (!found) { engulfLeft = true; } } } if (engulfLeft && engulfRight) { insert({next->getKey(), next->keyLen}, {}, /*endRange*/ true, copy); if (found) { move(copy, latestVersion); // Point to the same entry as iter pointed to, but it's not // invalidated remove(copy); } } else if (engulfLeft) { assert(found); remove(iter); insert(keyAfter({m.param1, m.param1Len}, arena), {}, /*endRange*/ true, iter); } else if (engulfRight) { insert({m.param1, m.param1Len}, {}, /*endRange*/ false, iter); move(iter, latestVersion); assert(iter.searchPathSize() > 0 && mm.base[iter.backNode()].entry == next); insert({next->getKey(), next->keyLen}, {}, /*endRange*/ true, iter); } else { insert({m.param1, m.param1Len}, {{nullptr, -1}}, /*endRange*/ false, iter); } } else { // TODO ILP these Finger begin; search({m.param1, m.param1Len}, latestRoot, latestVersion, begin); const bool foundBegin = begin.searchPathSize() > 0 && begin.backNode() != 0; Finger end; search({m.param2, m.param2Len}, latestRoot, latestVersion, end); const bool foundEnd = end.searchPathSize() > 0 && end.backNode() != 0; // Check if we can engulf on the left bool engulfLeft; Finger copy; begin.copyTo(copy); move(copy, latestVersion); if (foundBegin) { engulfLeft = begin.searchPathSize() > 0 && mm.base[begin.backNode()].entry->clearTo(); } else { engulfLeft = copy.searchPathSize() > 0 && mm.base[copy.backNode()].entry->clearTo(); } // Check if we can engulf on the right bool engulfRight = false; if (!foundEnd) { end.copyTo(copy); move(copy, latestVersion); const auto *next = copy.searchPathSize() > 0 ? mm.base[copy.backNode()].entry : nullptr; engulfRight = next && next->clearTo(); } if (engulfLeft && foundBegin) { remove(begin); } else if (!engulfLeft) { insert({m.param1, m.param1Len}, {}, /*rangeEntry*/ false, begin); } move(begin, latestVersion); while (begin.searchPathSize() > 0 && mm.base[begin.backNode()] < Key{m.param2, m.param2Len}) { remove(begin); move(begin, latestVersion); } #ifndef NDEBUG if (foundEnd) { [[maybe_unused]] bool beginEqEnd = mm.base[begin.backNode()] <=> Key{m.param2, m.param2Len} == 0; assert(beginEqEnd); } #endif if (engulfRight) { if (foundEnd) { remove(begin); move(begin, latestVersion); } assert(begin.searchPathSize() > 0 && begin.backNode() != 0); insert({mm.base[begin.backNode()].entry->getKey(), mm.base[begin.backNode()].entry->keyLen}, {}, /*rangeEntry*/ true, begin); } else { if (!foundEnd) { // TODO remove this search search( {m.param2, m.param2Len}, latestRoot, latestVersion, begin); } insert({m.param2, m.param2Len}, {}, /*rangeEntry*/ true, begin); } } } break; default: // GCOVR_EXCL_LINE assert(false); // GCOVR_EXCL_LINE } } roots.add(latestRoot, latestVersion); totalMallocBytes += mallocBytesDelta; // Check the "latestRoot is only meaningful in the callstack of // addMutations" property VALGRIND_MAKE_MEM_UNDEFINED(&latestRoot, sizeof(latestRoot)); } struct StepwiseFirstGeq { const VersionedMap::Impl *map; const weaselab::VersionedMap::Key *key; int64_t version; weaselab::VersionedMap::Iterator *iterator; void begin(uint32_t root); bool step(); void end(); }; void firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const; void firstGeq(const Key *key, Iterator *iterator, int count) const; // State used to resume scanning and removing old entries in `addMutations` Key continueKey; Arena continueArena; MemManager mm; RootSet roots; // Only meaningful within the callstack of `addMutations` uint32_t latestRoot; int64_t oldestVersion = 0; int64_t latestVersion = 0; int64_t totalMallocBytes = sizeof(Impl); }; VersionedMap::Impl *internal_makeImpl(int64_t version) { mallocBytesDelta = 0; auto *result = new (safe_malloc(sizeof(VersionedMap::Impl))) VersionedMap::Impl(); result->totalMallocBytes = mallocBytesDelta; result->latestVersion = version; return result; } VersionedMap::VersionedMap(int64_t version) : impl(internal_makeImpl(version)) {} VersionedMap::~VersionedMap() { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } } VersionedMap::VersionedMap(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); } VersionedMap &VersionedMap::operator=(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); return *this; } void VersionedMap::addMutations(const Mutation *mutations, int numMutations, int64_t version) { impl->addMutations(mutations, numMutations, version); } struct VersionedMap::Iterator::Impl { Finger finger; int64_t version; const VersionedMap::Impl *map; // State for materializing mutations associated with the entry at `finger`. // Cases: // - If finger is a set and the end of a clear, then mutation[0] is the clear // and mutation[1] is the set. // - If finger is a set and not the end of a clear, then mutation[0] is the // set // - If finger is a clear and not a set, then mutation[0] is the clear int mutationCount; int mutationIndex; VersionedMutation mutations[2]; void copyTo(Impl &result) { result.map = map; result.version = version; result.mutationCount = mutationCount; result.mutationIndex = mutationIndex; result.mutations[0] = mutations[0]; result.mutations[1] = mutations[1]; finger.copyTo(result.finger); } bool equals(const Impl &other) const { assert(map == other.map); assert(version == other.version); return finger == other.finger && mutationIndex == other.mutationIndex; } }; VersionedMap::Iterator::~Iterator() { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } } VersionedMap::Iterator::Iterator(const Iterator &other) : impl(new(safe_malloc(sizeof(Impl))) Impl()) { other.impl->copyTo(*impl); } VersionedMap::Iterator & VersionedMap::Iterator::operator=(const Iterator &other) { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } impl = new (safe_malloc(sizeof(Impl))) Impl(); other.impl->copyTo(*impl); return *this; } VersionedMap::Iterator::Iterator(Iterator &&other) noexcept : impl(std::exchange(other.impl, nullptr)) {} VersionedMap::Iterator & VersionedMap::Iterator::operator=(Iterator &&other) noexcept { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } impl = std::exchange(other.impl, nullptr); return *this; } VersionedMap::Iterator::VersionedMutation VersionedMap::Iterator::operator*() const { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Dereference %u\n", impl->finger.backNode()); } #endif assert(impl->finger.searchPathSize() != 0); assert(impl->mutationIndex < impl->mutationCount); assert(impl->mutationIndex >= 0); return impl->mutations[impl->mutationIndex]; } void materializeMutations(VersionedMap::Iterator::Impl *impl, const Entry *prev) { if (prev == nullptr) { Finger copy; impl->finger.copyTo(copy); impl->map->move(copy, impl->version); if (copy.searchPathSize() > 0) { prev = impl->map->mm.base[copy.backNode()].entry; } else { assert(!impl->map->mm.base[impl->finger.backNode()].entry->clearTo()); } } const auto &entry = *impl->map->mm.base[impl->finger.backNode()].entry; impl->mutationCount = 0; if (entry.clearTo()) { impl->mutations[impl->mutationCount++] = { prev->getKey(), entry.getKey(), prev->pointSet() ? prev->keyLen + 1 : prev->keyLen, entry.keyLen, VersionedMap::Clear, entry.rangeVersion}; } if (entry.pointMutation()) { if (entry.valLen < 0 /* pointClear */) { impl->mutations[impl->mutationCount++] = { entry.getKey(), nullptr, entry.keyLen, 0, VersionedMap::Clear, entry.pointVersion}; } else { impl->mutations[impl->mutationCount++] = { entry.getKey(), entry.getVal(), entry.keyLen, entry.valLen, VersionedMap::Set, entry.pointVersion}; } } } VersionedMap::Iterator &VersionedMap::Iterator::operator++() { if (impl->mutationIndex < impl->mutationCount - 1) { ++impl->mutationIndex; return *this; } do { const auto &entry = *impl->map->mm.base[impl->finger.backNode()].entry; impl->map->move(impl->finger, impl->version); if (impl->finger.searchPathSize() == 0) { break; } materializeMutations(impl, &entry); } while (impl->mutationCount == 0); impl->mutationIndex = 0; return *this; } VersionedMap::Iterator VersionedMap::Iterator::operator++(int) { auto result = *this; // TODO Interposable call ++*this; return result; } VersionedMap::Iterator &VersionedMap::Iterator::operator--() { if (impl->mutationIndex > 0) { --impl->mutationIndex; return *this; } // Handle decrementing end if (impl->finger.searchPathSize() == 0) { bool ignored = false; impl->finger.push( impl->map->roots.getThreadSafeHandle().rootForVersion(impl->version), ignored); assert(impl->finger.backNode() != 0); uint32_t c; while ((c = impl->map->child( impl->finger.backNode(), true, impl->version)) != 0) { impl->finger.push(c, true); } for (;;) { materializeMutations(impl, nullptr); if (impl->mutationCount > 0) { break; } impl->map->move(impl->finger, impl->version); } impl->mutationIndex = impl->mutationCount - 1; return *this; } do { impl->map->move(impl->finger, impl->version); if (impl->finger.searchPathSize() == 0) { break; } materializeMutations(impl, nullptr); } while (impl->mutationCount == 0); impl->mutationIndex = impl->mutationCount - 1; return *this; } VersionedMap::Iterator VersionedMap::Iterator::operator--(int) { auto result = *this; // TODO Interposable call --*this; return result; } bool VersionedMap::Iterator::operator==(const Iterator &other) const { if (impl == nullptr || other.impl == nullptr) { return impl == other.impl; } return impl->equals(*other.impl); } bool geq(const VersionedMap::Iterator::VersionedMutation &m, const VersionedMap::Key &k) { if (m.type == VersionedMap::Set || m.param2Len == 0) { return VersionedMap::Key{m.param1, m.param1Len} >= k; } else { return VersionedMap::Key{m.param2, m.param2Len} > k; } } void VersionedMap::Impl::StepwiseFirstGeq::begin(uint32_t root) { if (iterator->impl != nullptr) { iterator->impl->~Impl(); new (iterator->impl) Iterator::Impl(); } else { iterator->impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); } Finger &finger = iterator->impl->finger; finger.clear(); bool ignored = false; finger.push(root, ignored); } bool VersionedMap::Impl::StepwiseFirstGeq::step() { Finger &finger = iterator->impl->finger; auto n = finger.backNode(); if (n == 0) { return true; } auto c = *key <=> map->mm.base[n]; if (c == 0) { // No duplicates return true; } finger.push(map->child(n, c > 0, version), c > 0); return false; } void VersionedMap::Impl::StepwiseFirstGeq::end() { Finger &finger = iterator->impl->finger; if (finger.searchPathSize() > 0 && finger.backNode() == 0) { map->move(finger, version); if (finger.searchPathSize() > 0) { assert(finger.backNode() != 0); } } iterator->impl->version = version; iterator->impl->map = map; const Entry *prev = nullptr; for (;;) { if (finger.searchPathSize() == 0) { break; } else { materializeMutations(iterator->impl, prev); for (int j = 0; j < iterator->impl->mutationCount; ++j) { if (geq(iterator->impl->mutations[j], *key)) { iterator->impl->mutationIndex = j; goto loopEnd; } } } prev = iterator->impl->map->mm.base[finger.backNode()].entry; iterator->impl->map->move( finger, iterator->impl->version); } loopEnd:; } constexpr int kStackAllocThreshold = 2; void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, const int64_t *version, Iterator *iterator, int count) const { if (count == 0) { return; } // Use stack allocation for small count Arena arena; StepwiseFirstGeq stepwiseStackAlloc[kStackAllocThreshold]; int nextJobStackAllocation[kStackAllocThreshold]; StepwiseFirstGeq *stepwise; int *nextJob; if (count <= kStackAllocThreshold) { stepwise = stepwiseStackAlloc; nextJob = nextJobStackAllocation; } else { stepwise = new (arena) StepwiseFirstGeq[count]; nextJob = new (arena) int[count]; } auto handle = roots.getThreadSafeHandle(); for (int i = 0; i < count; ++i) { stepwise[i].map = this; stepwise[i].key = &key[i]; stepwise[i].version = version[i]; stepwise[i].iterator = &iterator[i]; stepwise[i].begin(handle.rootForVersion(version[i])); nextJob[i] = i + 1; } nextJob[count - 1] = 0; int prevJob = count - 1; int job = 0; for (;;) { if (stepwise[job].step()) { stepwise[job].end(); if (job == prevJob) { break; } nextJob[prevJob] = nextJob[job]; job = prevJob; } prevJob = job; job = nextJob[job]; } } void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key, Iterator *iterator, int count) const { if (count == 0) { return; } // Use stack allocation for small count Arena arena; StepwiseFirstGeq stepwiseStackAlloc[kStackAllocThreshold]; int nextJobStackAllocation[kStackAllocThreshold]; StepwiseFirstGeq *stepwise; int *nextJob; if (count <= kStackAllocThreshold) { stepwise = stepwiseStackAlloc; nextJob = nextJobStackAllocation; } else { stepwise = new (arena) StepwiseFirstGeq[count]; nextJob = new (arena) int[count]; } const uint32_t root = roots.roots()[roots.rootCount() - 1]; assert(root == roots.getThreadSafeHandle().rootForVersion(latestVersion)); for (int i = 0; i < count; ++i) { stepwise[i].map = this; stepwise[i].key = &key[i]; stepwise[i].version = latestVersion; stepwise[i].iterator = &iterator[i]; stepwise[i].begin(root); nextJob[i] = i + 1; } nextJob[count - 1] = 0; int prevJob = count - 1; int job = 0; for (;;) { if (stepwise[job].step()) { stepwise[job].end(); if (job == prevJob) { break; } nextJob[prevJob] = nextJob[job]; job = prevJob; } prevJob = job; job = nextJob[job]; } } bool VersionedMap::Iterator::operator!=(const Iterator &other) const { if (impl == nullptr || other.impl == nullptr) { return impl != other.impl; } return !impl->equals(*other.impl); } void VersionedMap::firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const { impl->firstGeq(key, version, iterator, count); } void VersionedMap::firstGeq(const Key *key, Iterator *iterator, int count) const { impl->firstGeq(key, iterator, count); } VersionedMap::Iterator VersionedMap::begin(int64_t version) const { VersionedMap::Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); result.impl->version = version; bool ignored = false; result.impl->finger.push( impl->roots.getThreadSafeHandle().rootForVersion(version), ignored); if (result.impl->finger.backNode() == 0) { result.impl->finger.pop(); } else { uint32_t c; while ((c = impl->child( result.impl->finger.backNode(), false, version)) != 0) { result.impl->finger.push(c, false); } } result.impl->map = impl; const Entry *prev = nullptr; for (;;) { if (result.impl->finger.searchPathSize() > 0) { materializeMutations(result.impl, prev); if (result.impl->mutationCount > 0) { break; } } else { break; } prev = result.impl->map->mm.base[result.impl->finger.backNode()].entry; result.impl->map->move( result.impl->finger, result.impl->version); } result.impl->mutationIndex = 0; return result; } VersionedMap::Iterator VersionedMap::end(int64_t version) const { VersionedMap::Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); result.impl->map = impl; result.impl->mutationIndex = 0; result.impl->version = version; return result; } int64_t VersionedMap::getVersion() const { return impl->latestVersion; } int64_t VersionedMap::getOldestVersion() const { return impl->oldestVersion; } void VersionedMap::setOldestVersion(int64_t oldestVersion) { impl->setOldestVersion(oldestVersion); } int64_t VersionedMap::getBytes() const { return impl->getBytes(); } // ==================== END IMPLEMENTATION ==================== // GCOVR_EXCL_START #ifdef NDEBUG inline #endif void VersionedMap::Impl::printInOrder(int64_t version) const { printInOrderHelper(version, roots.getThreadSafeHandle().rootForVersion(version), 0); } #ifdef NDEBUG inline #endif void VersionedMap::Impl::printInOrderHelper(int64_t version, uint32_t node, int depth) const { if (node == 0) { return; } printInOrderHelper(version, child(node, true, version), depth + 1); for (int i = 0; i < depth; ++i) { printf(" "); } printf("node %u: ", node); printBinary({mm.base[node].entry->getKey(), mm.base[node].entry->keyLen}); if (mm.base[node].entry->pointSet()) { printf(" -> '"); printBinary({mm.base[node].entry->getVal(), mm.base[node].entry->valLen}); printf("' @ %" PRId64, mm.base[node].entry->pointVersion); } if (mm.base[node].entry->pointClear()) { printf(" ", mm.base[node].entry->pointVersion); } if (mm.base[node].entry->clearTo()) { printf(" ", mm.base[node].entry->rangeVersion); } if (mm.base[node].entry->pointVersion < 0 && mm.base[node].entry->rangeVersion < 0) { printf(" "); } printf("\n"); VersionedMap::Impl::printInOrderHelper( version, child(node, false, version), depth + 1); } VersionedMap::Impl *cast(const VersionedMap &m) { VersionedMap::Impl *result; memcpy(&result, &m, sizeof(void *)); return result; } #if SHOW_MEMORY struct __attribute__((visibility("default"))) PeakPrinter { ~PeakPrinter() { printf("--- versioned_map ---\n"); printf("mmap bytes: %g\n", double(mmapBytes)); printf("Peak mmap bytes: %g\n", double(peakMmapBytes)); } } peakPrinter2; #endif } // namespace weaselab #ifdef ENABLE_MAIN #include void breakpoint_me() {} int main() { { weaselab::VersionedMap versionedMap{0}; { weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"a", 1, (const uint8_t *)"b", 1, weaselab::VersionedMap::Clear}, }; versionedMap.addMutations(m, sizeof(m) / sizeof(m[0]), 1); } { weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"b", 1, nullptr, 0, weaselab::VersionedMap::Clear}, }; versionedMap.addMutations(m, sizeof(m) / sizeof(m[0]), 2); } const int64_t v = versionedMap.getVersion(); cast(versionedMap)->printInOrder(v); weaselab::VersionedMap::Key k = {(const uint8_t *)"a", 2}; weaselab::VersionedMap::Iterator iter; versionedMap.firstGeq(&k, &v, &iter, 1); versionedMap.setOldestVersion(2); breakpoint_me(); for (auto end = versionedMap.end(v); iter != end; ++iter) { printMutation(*iter); } } return 0; } #endif // GCOVR_EXCL_STOP