#include "VersionedMap.h" #include "Internal.h" #include "RootSet.h" #include #include #include #include #include #include #include #include #include #include static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::is_standard_layout_v); static_assert(std::bidirectional_iterator); static_assert(std::is_standard_layout_v< weaselab::VersionedMap::Iterator::VersionedMutation>); void *mmapSafe(void *addr, size_t len, int prot, int flags, int fd, off_t offset) { void *result = mmap(addr, len, prot, flags, fd, offset); if (result == MAP_FAILED) { int err = errno; // GCOVR_EXCL_LINE fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Error calling mmap(%p, %zu, %d, %d, %d, %jd): %d %s\n", // GCOVR_EXCL_LINE addr, len, prot, flags, fd, (intmax_t)offset, err, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } return result; } void mprotectSafe(void *p, size_t s, int prot) { if (mprotect(p, s, prot) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, // GCOVR_EXCL_LINE "Error calling mprotect(%p, %zu, %d): %s\n", // GCOVR_EXCL_LINE p, // GCOVR_EXCL_LINE s, // GCOVR_EXCL_LINE prot, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } void munmapSafe(void *ptr, size_t size) { if (munmap(ptr, size) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, "Error calling munmap(%p, %zu): %s\n", // GCOVR_EXCL_LINE ptr, // GCOVR_EXCL_LINE size, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } struct Random { // *Really* minimal PCG32 code / (c) 2014 M.E. O'Neill / pcg-random.org // Licensed under Apache License 2.0 (NO WARRANTY, etc. see website) // // Modified - mostly c -> c++ Random() = default; Random(uint64_t initState, uint64_t initSeq) { pcg32_srandom_r(initState, initSeq); next(); } /// Draws from a uniform distribution of uint32_t's uint32_t next() { auto result = next_; next_ = pcg32_random_r(); return result; } /// Draws from a uniform distribution of [0, s). From /// https://arxiv.org/pdf/1805.10941.pdf uint32_t bounded(uint32_t s) { assert(s != 0); uint32_t x = next(); auto m = uint64_t(x) * uint64_t(s); auto l = uint32_t(m); if (l < s) { uint32_t t = -s % s; while (l < t) { x = next(); m = uint64_t(x) * uint64_t(s); l = uint32_t(m); } } uint32_t result = m >> 32; return result; } /// Fill `bytes` with `size` random hex bytes void randomHex(uint8_t *bytes, int size); private: uint32_t pcg32_random_r() { uint64_t oldState = state; // Advance internal state state = oldState * 6364136223846793005ULL + inc; // Calculate output function (XSH RR), uses old state for max ILP uint32_t xorShifted = ((oldState >> 18u) ^ oldState) >> 27u; uint32_t rot = oldState >> 59u; return (xorShifted >> rot) | (xorShifted << ((-rot) & 31)); } // Seed the rng. Specified in two parts, state initializer and a // sequence selection constant (a.k.a. stream id) void pcg32_srandom_r(uint64_t initstate, uint64_t initSeq) { state = 0U; inc = (initSeq << 1u) | 1u; pcg32_random_r(); state += initstate; pcg32_random_r(); } uint32_t next_{}; // RNG state. All values are possible. uint64_t state{}; // Controls which RNG sequence (stream) is selected. Must *always* be odd. uint64_t inc{}; }; void Random::randomHex(uint8_t *bytes, int size) { int i = 0; while (i + 8 < size) { uint32_t r = next(); bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; bytes[i++] = "0123456789abcdef"[r & 0b1111]; } uint32_t r = next(); while (i < size) { bytes[i++] = "0123456789abcdef"[r & 0b1111]; r >>= 4; } } Random seededRandom() { FILE *f = fopen("/dev/urandom", "r"); if (f == nullptr) { fprintf(stderr, "Failed to open /dev/urandom\n"); abort(); } uint64_t seed[2]; if (fread(seed, sizeof(seed[0]), sizeof(seed) / sizeof(seed[0]), f) != sizeof(seed) / sizeof(seed[0])) { fprintf(stderr, "Failed to read from /dev/urandom\n"); abort(); } fclose(f); return Random{seed[0], seed[1]}; } namespace weaselab { // 96 is enough for an entire search path in a tree with a size that // overflows int. See // https://en.wikipedia.org/wiki/Random_binary_tree#The_longest_path constexpr int kPathLengthUpperBound = 96; struct Entry { // If there is a point mutation at key, then pointVersion is its version. // Otherwise it's negative. int64_t pointVersion; // If there is a range mutation ending at key, then rangeVersion is its // version. Otherwise it's negative. int64_t rangeVersion; int keyLen; // Negative if this key is cleared. Only meaningful if this is a point // mutation. int valLen; mutable int refCount; uint32_t priority; // True if the entry is a point mutation. If false, this entry's key should be // read through to the underlying data structure. bool pointMutation() const { return pointVersion >= 0; } // True if mutations in (pred, this) are cleared. If false, (pred, this) // should be read through to the underlying data structure. bool clearTo() const { return rangeVersion >= 0; } // There's an extra zero byte past the end of getKey, used for // reconstructing logical mutations without copies. const uint8_t *getKey() const { return (const uint8_t *)(this + 1); } const uint8_t *getVal() const { return (const uint8_t *)(this + 1) + 1 + keyLen; } Entry *addref() const { ++refCount; #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("addref %p to %d\n", this, refCount); } #endif return (Entry *)this; } void delref() const { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("delref %p to %d\n", this, refCount - 1); } #endif if (--refCount == 0) { safe_free((void *)this, sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); } } static Entry *make(int64_t pointVersion, int64_t rangeVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, uint32_t priority) { auto e = (Entry *)safe_malloc(sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); e->pointVersion = pointVersion; e->rangeVersion = rangeVersion; e->keyLen = keyLen; e->valLen = valLen; e->refCount = 1; e->priority = priority; if (keyLen > 0) { memcpy((uint8_t *)e->getKey(), key, keyLen); } ((uint8_t *)e->getKey())[keyLen] = 0; if (valLen > 0) { memcpy((uint8_t *)e->getVal(), val, valLen); } return e; } }; struct Node { union { int64_t updateVersion; uint32_t nextFree; }; Entry *entry; uint32_t pointer[3]; bool replacedPointer; std::atomic updated; }; // Limit mmap to 32 GiB so valgrind doesn't complain. // https://bugs.kde.org/show_bug.cgi?id=229500 constexpr size_t kMapSize = size_t(32) * (1 << 30); const size_t kPageSize = sysconf(_SC_PAGESIZE); const uint32_t kNodesPerPage = kPageSize / sizeof(Node); const uint32_t kMinAddressable = kNodesPerPage; constexpr uint32_t kUpsizeBytes = 1 << 20; constexpr uint32_t kUpsizeNodes = kUpsizeBytes / sizeof(Node); static_assert(kUpsizeNodes * sizeof(Node) == kUpsizeBytes); struct BitSet { explicit BitSet(uint32_t size) : words((uint64_t *)safe_calloc(size / 64 + 1, 8)), size(size) {} bool test(uint32_t i) const { return words[i >> 6] & (uint64_t(1) << (i & 63)); } // Returns former value bool set(uint32_t i) { const auto prev = words[i >> 6]; const auto mask = uint64_t(1) << (i & 63); words[i >> 6] |= mask; max_ = std::max(i, max_); return prev & mask; } // Returns 0 if set is empty uint32_t max() const { return max_; } template void iterateAbsentApproxBackwards(F f, uint32_t begin, uint32_t end) const { // TODO can this be improved? We can do something with a word at a time // instead of a bit at a time. The first attempt at doing so benchmarked as // slower. assert(begin != 0); for (uint32_t i = end - 1; i >= begin; --i) { if (!test(i)) { f(i); } } } ~BitSet() { safe_free(words, (size / 64 + 1) * 8); } private: uint32_t max_ = 0; uint64_t *const words; const uint32_t size; }; int64_t mmapBytes = 0; int64_t peakMmapBytes = 0; struct MemManager { MemManager() : base((Node *)mmapSafe(nullptr, kMapSize, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) { if (kPageSize % sizeof(Node) != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kPageSize not a multiple of Node size\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } if (kUpsizeBytes % kPageSize != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kUpsizeBytes not a multiple of kPageSize\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } ~MemManager() { gc(nullptr, 0, 0); munmapSafe(base, kMapSize); } Node *const base; uint32_t allocate() { if (freeList != 0) { uint32_t result = freeList; freeList = base[result].nextFree; assert(base[result].entry == nullptr); return result; } if (next == firstUnaddressable) { mprotectSafe(base + firstUnaddressable, kUpsizeBytes, PROT_READ | PROT_WRITE); firstUnaddressable += kUpsizeNodes; #if SHOW_MEMORY mmapBytes = getBytes(); peakMmapBytes = std::max(peakMmapBytes, mmapBytes); #endif if (firstUnaddressable > kMapSize / sizeof(Node)) { fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Out of memory: firstUnaddressable > kMapSize / " // GCOVR_EXCL_LINE "sizeof(Node)\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } return next++; } void gc(const uint32_t *roots, int numRoots, int64_t oldestVersion) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("GC roots:\n"); for (int i = 0; i < numRoots; ++i) { printf(" %u\n", roots[i]); } } #endif // Calculate reachable set BitSet reachable{next}; // Each node has at most 3 children and nodes along the search path aren't // in the stack, so we need 2 * kPathLengthUpperBound uint32_t stack[2 * kPathLengthUpperBound]; int stackIndex = 0; auto tryPush = [&](uint32_t p) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf(" GC: visit: %u\n", p); } #endif if (!reachable.set(p)) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf(" GC: push on to stack: %u\n", p); } #endif assert(stackIndex < sizeof(stack) / sizeof(stack[0])); stack[stackIndex++] = p; } }; for (int i = 0; i < numRoots; ++i) { if (roots[i] == 0) { continue; } tryPush(roots[i]); while (stackIndex > 0) { uint32_t p = stack[--stackIndex]; auto &node = base[p]; if (node.updated.load(std::memory_order_relaxed)) { if (node.pointer[!node.replacedPointer] != 0) { tryPush(node.pointer[!node.replacedPointer]); } if (oldestVersion < node.updateVersion) { if (node.pointer[node.replacedPointer] != 0) { tryPush(node.pointer[node.replacedPointer]); } } if (node.pointer[2] != 0) { tryPush(node.pointer[2]); } } else { if (node.pointer[0] != 0) { tryPush(node.pointer[0]); } if (node.pointer[1] != 0) { tryPush(node.pointer[1]); } } } } // Reclaim memory on the right side uint32_t max = reachable.max(); if (max == 0) { max = kMinAddressable - 1; } assert(max < next); uint32_t newFirstUnaddressable = (max / kNodesPerPage + 1) * kNodesPerPage; if (newFirstUnaddressable < firstUnaddressable) { for (int i = newFirstUnaddressable; i < firstUnaddressable; ++i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while shrinking right\n", i); } #endif base[i].entry->delref(); } } mprotectSafe(base + newFirstUnaddressable, (firstUnaddressable - newFirstUnaddressable) * sizeof(Node), PROT_NONE); firstUnaddressable = newFirstUnaddressable; #if SHOW_MEMORY mmapBytes = getBytes(); #endif } next = max + 1; // Rebuild free list and delref entries freeList = 0; reachable.iterateAbsentApproxBackwards( [&](uint32_t i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while building free list\n", i); } #endif base[i].entry->delref(); base[i].entry = nullptr; } base[i].nextFree = freeList; freeList = i; }, kMinAddressable, next); } int64_t getBytes() const { return (firstUnaddressable - kMinAddressable) * sizeof(Node); } private: uint32_t next = kMinAddressable; uint32_t firstUnaddressable = kMinAddressable; uint32_t freeList = 0; }; auto operator<=>(const VersionedMap::Key &lhs, const Node &rhs) { int cl = std::min(lhs.len, rhs.entry->keyLen); if (cl > 0) { int c = memcmp(lhs.p, rhs.entry->getKey(), cl); if (c != 0) { return c <=> 0; } } return lhs.len <=> rhs.entry->keyLen; } constexpr int orderToInt(std::strong_ordering o) { return o == std::strong_ordering::less ? -1 : o == std::strong_ordering::equal ? 0 : 1; } struct Finger { void push(uint32_t node, bool dir) { searchPath[searchPathSize_] = node; direction[searchPathSize_] = dir; ++searchPathSize_; } void pop() { assert(searchPathSize_ > 0); --searchPathSize_; } uint32_t root() const { assert(searchPathSize_ > 0); return searchPath[0]; } uint32_t backNode() const { assert(searchPathSize_ > 0); return searchPath[searchPathSize_ - 1]; } uint32_t &backNodeRef() { assert(searchPathSize_ > 0); return searchPath[searchPathSize_ - 1]; } bool backDirection() const { assert(searchPathSize_ > 0); return direction[searchPathSize_ - 1]; } uint32_t searchPathSize() const { return searchPathSize_; } void setSearchPathSizeUnsafe(int size) { searchPathSize_ = size; } Finger() { clear(); } void clear() { #ifndef NDEBUG memset(searchPath, 0, sizeof(searchPath)); memset(direction, 0, sizeof(direction)); #endif searchPathSize_ = 0; } void copyTo(Finger &result) { #ifndef NDEBUG memset(result.searchPath, 0, sizeof(searchPath)); memset(result.direction, 0, sizeof(direction)); #endif memcpy(result.searchPath, searchPath, searchPathSize_ * sizeof(searchPath[0])); memcpy(result.direction, direction, searchPathSize_ * sizeof(direction[0])); result.searchPathSize_ = searchPathSize_; } Finger(const Finger &) = delete; Finger &operator=(const Finger &) = delete; Finger(Finger &&) = delete; Finger &operator=(Finger &&) = delete; private: uint32_t searchPath[kPathLengthUpperBound]; bool direction[kPathLengthUpperBound]; int searchPathSize_; }; struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl { // The last node is allowed to be 0, in which case this is the search path of // where an entry would exist template void move(Finger &finger, int64_t at, bool direction) const { uint32_t c; if (finger.backNode() != 0 && (c = child(finger.backNode(), direction, at)) != 0) { finger.push(c, direction); while ((c = child(finger.backNode(), !direction, at)) != 0) { finger.push(c, !direction); } } else { while (finger.searchPathSize() > 1 && finger.backDirection() == direction) { finger.pop(); } finger.pop(); } } template uint32_t child(uint32_t node, bool which, int64_t at) const { assert(node != 0); static_assert(kOrder == std::memory_order_acquire || kOrder == std::memory_order_relaxed); auto &n = mm.base[node]; uint32_t result; if (n.updated.load(kOrder) && n.updateVersion <= at && which == n.replacedPointer) { result = n.pointer[2]; } else { result = n.pointer[which]; } assert(result == 0 || result >= kMinAddressable); return result; } template uint32_t left(uint32_t node, bool which, int64_t at) { return child(node, false, at); } template uint32_t right(uint32_t node, bool which, int64_t at) { return child(node, true, at); } // Returns the node that results from setting `which` to `child` on `node` uint32_t update(uint32_t node, bool which, uint32_t child, int64_t version) { assert(node == 0 || node >= kMinAddressable); assert(child == 0 || child >= kMinAddressable); if (this->child(node, which, version) == child) { return node; } auto &n = mm.base[node]; const bool updated = n.updated.load(std::memory_order_relaxed); auto doCopy = [&]() { uint32_t copy = mm.allocate(); #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Copy %u to %u\n", node, copy); } #endif auto &c = mm.base[copy]; c.entry = n.entry->addref(); c.pointer[which] = child; c.pointer[!which] = this->child(node, !which, latestVersion); c.updated.store(false, std::memory_order_relaxed); c.updateVersion = version; assert(copy == 0 || copy >= kMinAddressable); return copy; }; if (n.updateVersion == version) { // The reason these aren't data races is that concurrent readers are // reading < `version` if (updated && n.replacedPointer != which) { auto result = doCopy(); // We can't update n.replacedPointer without introducing a data race // (unless we packed it into the atomic?) so we copy. pointer[2] becomes // unreachable, but need to tell the garbage collector. n.pointer[2] = 0; return result; } else if (updated) { n.pointer[2] = child; } else { n.pointer[which] = child; } assert(node == 0 || node >= kMinAddressable); return node; } if (updated) { // We already used this node's in-place update return doCopy(); } else { n.updateVersion = version; n.pointer[2] = child; n.replacedPointer = which; n.updated.store(true, std::memory_order_release); // Must be last assert(node == 0 || node >= kMinAddressable); return node; } } void rotate(uint32_t &n, int64_t at, bool right) { auto l = child(n, !right, at); n = update( l, right, update(n, !right, child(l, right, at), at), at); } struct Val { const uint8_t *p; int len; }; template void search(Key key, T root, int64_t version, Finger &finger) const { // Prevent integer promotion etc static_assert(std::is_same_v); finger.clear(); if (root == 0) { return; } bool ignored; finger.push(root, ignored); // Initialize finger to the search path of `key` for (;;) { auto n = finger.backNode(); if (n == 0) { break; } auto c = key <=> mm.base[n]; if (c == 0) { // No duplicates break; } finger.push(child(n, c > 0, version), c > 0); } } // If `val` is set, then this is a point mutation at `latestVersion`. // Otherwise it's the end of a range mutation at `latestVersion`. void insert(Key key, std::optional val) { Finger finger; bool ignored; finger.push(latestRoot, ignored); bool inserted; // Initialize finger to the search path of `key` for (;;) { auto n = finger.backNode(); if (n == 0) { inserted = true; break; } auto c = key <=> mm.base[n]; if (c == 0) { // No duplicates inserted = false; break; } finger.push(child(n, c > 0, latestVersion), c > 0); } int64_t pointVersion, rangeVersion; if (val.has_value()) { pointVersion = latestVersion; if (inserted) { Finger copy; finger.copyTo(copy); move(copy, latestVersion, true); if (copy.searchPathSize() == 0) { rangeVersion = -1; // Sentinel for "no mutation ending here" } else { rangeVersion = mm.base[copy.backNode()].entry->rangeVersion; } } else { auto *entry = mm.base[finger.backNode()].entry; rangeVersion = entry->rangeVersion; } } else { rangeVersion = latestVersion; if (inserted) { val = {nullptr, -1}; // Sentinel for "no point mutation here" pointVersion = -1; // Sentinel for "no point mutation here" } else { auto *entry = mm.base[finger.backNode()].entry; val = {entry->getVal(), entry->valLen}; pointVersion = entry->pointVersion; } } // Prepare new node uint32_t node = newNode( pointVersion, rangeVersion, key.p, key.len, val->p, val->len, inserted ? random.next() : mm.base[finger.backNode()].entry->priority); if (!inserted) { auto &n = mm.base[node]; n.pointer[0] = child(finger.backNode(), false, latestVersion); n.pointer[1] = child(finger.backNode(), true, latestVersion); } // Rotate and propagate up the search path for (;;) { if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); auto parent = finger.backNode(); parent = update(parent, direction, node, latestVersion); if (inserted && mm.base[node].entry->priority > mm.base[parent].entry->priority) { rotate(parent, latestVersion, !direction); } else { if (parent == finger.backNode()) { break; } } node = parent; } } // Removes `finger` from the tree, and leaves `finger` pointing to the next // entry. void remove(Finger &finger) { #ifndef NDEBUG Entry *expected; { Finger copy; finger.copyTo(copy); move(copy, latestVersion, true); expected = copy.searchPathSize() > 0 ? mm.base[copy.backNode()].entry : nullptr; } #endif // True if finger is pointing to an entry > than the entry we're removing // after we rotate it down // Rotate down until we can remove the entry for (;;) { auto &node = finger.backNodeRef(); const auto l = child(node, false, latestVersion); const auto r = child(node, true, latestVersion); if (l == 0 && r == 0) { // TODO we can avoid some rotations if we stop when l or r == 0 node = 0; break; } else { const bool direction = (l == 0 ? 0 : mm.base[l].entry->priority) > (r == 0 ? 0 : mm.base[r].entry->priority); rotate(node, latestVersion, direction); assert(node != 0); finger.push( child(node, direction, latestVersion), direction); } } // propagate up the search path, all the way to the root since we may have // more rotations to do even if an update doesn't change a node pointer auto node = finger.backNode(); assert(node == 0); const auto oldSize = finger.searchPathSize(); for (;;) { if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); auto &parent = finger.backNodeRef(); auto old = parent; parent = update(parent, direction, node, latestVersion); node = parent; } finger.setSearchPathSizeUnsafe(oldSize); // finger now points to the insertion point of the node we're removing move(finger, latestVersion, true); #ifndef NDEBUG if (finger.searchPathSize() > 0) { assert(mm.base[finger.backNode()].entry == expected); } else { assert(expected == nullptr); } #endif } uint32_t newNode(int64_t version, int64_t rangeVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, uint32_t priority) { auto result = mm.allocate(); auto &node = mm.base[result]; node.updateVersion = version; node.pointer[0] = 0; node.pointer[1] = 0; node.updated.store(false, std::memory_order_relaxed); node.entry = Entry::make(version, rangeVersion, key, keyLen, val, valLen, priority); return result; } void setOldestVersion(int64_t oldestVersion) { mallocBytesDelta = 0; this->oldestVersion = oldestVersion; roots.setOldestVersion(oldestVersion); mm.gc(roots.roots(), roots.rootCount(), oldestVersion); totalMallocBytes += mallocBytesDelta; } int64_t getBytes() const { return totalMallocBytes + mm.getBytes(); } void printInOrder(int64_t version); void printInOrderHelper(int64_t version, uint32_t node, int depth); void addMutations(const Mutation *mutations, int numMutations, int64_t version) { mallocBytesDelta = 0; // TODO scan to remove mutations older than oldestVersion assert(latestVersion < version); latestVersion = version; latestRoot = roots.roots()[roots.rootCount() - 1]; // TODO Improve ILP? for (int i = 0; i < numMutations; ++i) { const auto &m = mutations[i]; switch (m.type) { case Set: { insert({m.param1, m.param1Len}, {{m.param2, m.param2Len}}); } break; case Clear: { insert({m.param1, m.param1Len}, {{nullptr, -1}}); if (m.param2Len > 0) { Finger iter; search({m.param1, m.param1Len}, latestRoot, latestVersion, iter); move(iter, latestVersion, true); while (iter.searchPathSize() > 0 && mm.base[iter.backNode()] < Key{m.param2, m.param2Len}) { remove(iter); } insert({m.param2, m.param2Len}, {}); } } break; default: // GCOVR_EXCL_LINE assert(false); // GCOVR_EXCL_LINE __builtin_unreachable(); // GCOVR_EXCL_LINE } } roots.add(latestRoot, latestVersion); totalMallocBytes += mallocBytesDelta; } void firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const; Random random = #ifndef NDEBUG {}; #else seededRandom(); #endif MemManager mm; RootSet roots; // Only meaningful within the callstack of `addMutations` uint32_t latestRoot; int64_t oldestVersion = 0; int64_t latestVersion = 0; int64_t totalMallocBytes = sizeof(Impl); }; VersionedMap::Impl *internal_makeImpl(int64_t version) { mallocBytesDelta = 0; auto *result = new (safe_malloc(sizeof(VersionedMap::Impl))) VersionedMap::Impl(); result->totalMallocBytes = mallocBytesDelta; result->latestVersion = version; return result; } VersionedMap::VersionedMap(int64_t version) : impl(internal_makeImpl(version)) {} VersionedMap::~VersionedMap() { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } } VersionedMap::VersionedMap(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); } VersionedMap &VersionedMap::operator=(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); return *this; } void VersionedMap::addMutations(const Mutation *mutations, int numMutations, int64_t version) { impl->addMutations(mutations, numMutations, version); } struct VersionedMap::Iterator::Impl { Finger finger; int64_t version; const VersionedMap::Impl *map; int cmp; // State for materializing mutations associated with the entry at `finger`. // Cases: // - If finger is a set and the end of a clear, then mutation[0] is the clear // and mutation[1] is the set. // - If finger is a set and not the end of a clear, then mutation[0] is the // set // - If finger is a clear and not a set, then mutation[0] is the clear int mutationCount; int mutationIndex; VersionedMutation mutations[2]; void copyTo(Impl &result) { result.cmp = cmp; result.map = map; result.version = version; result.mutationCount = mutationCount; result.mutationIndex = mutationIndex; result.mutations[0] = mutations[0]; result.mutations[1] = mutations[1]; finger.copyTo(result.finger); } bool equals(const Impl &other) const { assert(map == other.map); assert(version == other.version); if (finger.searchPathSize() == 0 || other.finger.searchPathSize() == 0) { return finger.searchPathSize() == other.finger.searchPathSize(); } return finger.backNode() == other.finger.backNode() && mutationIndex == other.mutationIndex; } }; VersionedMap::Iterator::~Iterator() { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } } VersionedMap::Iterator::Iterator(const Iterator &other) : impl(new(safe_malloc(sizeof(Impl))) Impl()) { other.impl->copyTo(*impl); } VersionedMap::Iterator & VersionedMap::Iterator::operator=(const Iterator &other) { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } impl = new (safe_malloc(sizeof(Impl))) Impl(); other.impl->copyTo(*impl); return *this; } VersionedMap::Iterator::Iterator(Iterator &&other) noexcept : impl(std::exchange(other.impl, nullptr)) {} VersionedMap::Iterator & VersionedMap::Iterator::operator=(Iterator &&other) noexcept { if (impl != nullptr) { impl->~Impl(); safe_free(impl, sizeof(*impl)); } impl = std::exchange(other.impl, nullptr); return *this; } VersionedMap::Iterator::VersionedMutation VersionedMap::Iterator::operator*() const { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Dereference %u\n", impl->finger.backNode()); } #endif assert(impl->finger.searchPathSize() != 0); assert(impl->mutationIndex < impl->mutationCount); assert(impl->mutationIndex >= 0); return impl->mutations[impl->mutationIndex]; } void materializeMutations(VersionedMap::Iterator::Impl *impl, const Entry *prev, const Entry *next) { if (prev == nullptr) { Finger copy; impl->finger.copyTo(copy); impl->map->move(copy, impl->version, false); if (copy.searchPathSize() > 0) { prev = impl->map->mm.base[copy.backNode()].entry; } else { assert(!impl->map->mm.base[impl->finger.backNode()].entry->clearTo()); } } if (next == nullptr) { Finger copy; impl->finger.copyTo(copy); impl->map->move(copy, impl->version, true); if (copy.searchPathSize() > 0) { next = impl->map->mm.base[copy.backNode()].entry; } } const auto &entry = *impl->map->mm.base[impl->finger.backNode()].entry; impl->mutationCount = 0; if (entry.clearTo()) { impl->mutations[impl->mutationCount++] = { prev->getKey(), entry.getKey(), prev->pointMutation() && prev->valLen < 0 && prev->pointVersion == entry.rangeVersion ? prev->keyLen : prev->keyLen + 1, entry.keyLen, VersionedMap::Clear, entry.rangeVersion}; } if (entry.pointMutation()) { if (entry.valLen < 0) { if (next == nullptr || !(next->clearTo() && next->rangeVersion == entry.pointVersion)) { impl->mutations[impl->mutationCount++] = { entry.getKey(), nullptr, entry.keyLen, 0, VersionedMap::Clear, entry.pointVersion}; } } else { impl->mutations[impl->mutationCount++] = { entry.getKey(), entry.getVal(), entry.keyLen, entry.valLen, VersionedMap::Set, entry.pointVersion}; } } } VersionedMap::Iterator &VersionedMap::Iterator::operator++() { if (impl->mutationIndex < impl->mutationCount - 1) { ++impl->mutationIndex; return *this; } do { const auto &entry = *impl->map->mm.base[impl->finger.backNode()].entry; impl->map->move(impl->finger, impl->version, true); if (impl->finger.searchPathSize() > 0) { materializeMutations(impl, &entry, nullptr); } } while (impl->mutationCount == 0); impl->mutationIndex = 0; return *this; } VersionedMap::Iterator VersionedMap::Iterator::operator++(int) { auto result = *this; ++*this; return result; } VersionedMap::Iterator &VersionedMap::Iterator::operator--() { if (impl->mutationIndex > 0) { --impl->mutationIndex; return *this; } // TODO support decrementing end do { const auto &entry = *impl->map->mm.base[impl->finger.backNode()].entry; impl->map->move(impl->finger, impl->version, false); if (impl->finger.searchPathSize() > 0) { materializeMutations(impl, nullptr, &entry); } } while (impl->mutationCount == 0); impl->mutationIndex = impl->mutationCount - 1; return *this; } VersionedMap::Iterator VersionedMap::Iterator::operator--(int) { auto result = *this; --*this; return result; } bool VersionedMap::Iterator::operator==(const Iterator &other) const { if (impl == nullptr || other.impl == nullptr) { return impl == other.impl; } return impl->equals(*other.impl); } void VersionedMap::Impl::firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const { // TODO ILP! auto handle = roots.getThreadSafeHandle(); for (int i = 0; i < count; ++i) { uint32_t root; if (iterator[i].impl != nullptr) { root = iterator[i].impl->version == version[i] ? iterator[i].impl->finger.root() : handle.rootForVersion(version[i]); iterator[i].impl->~Impl(); new (iterator[i].impl) Iterator::Impl(); } else { root = handle.rootForVersion(version[i]); iterator[i].impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); } Finger &finger = iterator[i].impl->finger; search(key[i], root, version[i], finger); if (finger.searchPathSize() == 0) { iterator[i].impl->cmp = 1; } else if (finger.backNode() == 0) { iterator[i].impl->cmp = 1; move(finger, version[i], true); if (finger.searchPathSize() > 0) { assert(finger.backNode() != 0); } } else { iterator[i].impl->cmp = 0; } iterator[i].impl->version = version[i]; iterator[i].impl->map = this; const Entry *prev = nullptr; for (;;) { if (finger.searchPathSize() > 0) { materializeMutations(iterator[i].impl, prev, nullptr); if (iterator[i].impl->mutationCount > 0) { break; } } else { break; } prev = iterator[i].impl->map->mm.base[finger.backNode()].entry; iterator[i].impl->map->move( finger, iterator[i].impl->version, true); } if (iterator[i].impl->cmp == 0) { iterator[i].impl->mutationIndex = iterator[i].impl->mutationCount - 1; } else { iterator[i].impl->mutationIndex = 0; } } } bool VersionedMap::Iterator::operator!=(const Iterator &other) const { if (impl == nullptr || other.impl == nullptr) { return impl != other.impl; } return !impl->equals(*other.impl); } int VersionedMap::Iterator::cmp() const { return impl->cmp; } void VersionedMap::firstGeq(const Key *key, const int64_t *version, Iterator *iterator, int count) const { impl->firstGeq(key, version, iterator, count); } VersionedMap::Iterator VersionedMap::begin(int64_t version) const { VersionedMap::Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); result.impl->cmp = 1; bool ignored; result.impl->finger.push( impl->roots.getThreadSafeHandle().rootForVersion(version), ignored); if (result.impl->finger.backNode() == 0) { result.impl->finger.pop(); } else { uint32_t c; while ((c = impl->child( result.impl->finger.backNode(), false, version)) != 0) { result.impl->finger.push(c, false); } } result.impl->map = impl; const Entry *prev = nullptr; for (;;) { if (result.impl->finger.searchPathSize() > 0) { materializeMutations(result.impl, prev, nullptr); if (result.impl->mutationCount > 0) { break; } } else { break; } prev = result.impl->map->mm.base[result.impl->finger.backNode()].entry; result.impl->map->move( result.impl->finger, result.impl->version, true); } result.impl->mutationIndex = 0; result.impl->version = version; return result; } VersionedMap::Iterator VersionedMap::end(int64_t version) const { VersionedMap::Iterator result; result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl(); result.impl->cmp = 1; result.impl->map = impl; result.impl->mutationIndex = 0; result.impl->version = version; return result; } int64_t VersionedMap::getVersion() const { return impl->latestVersion; } int64_t VersionedMap::getOldestVersion() const { return impl->oldestVersion; } void VersionedMap::setOldestVersion(int64_t oldestVersion) { impl->setOldestVersion(oldestVersion); } int64_t VersionedMap::getBytes() const { return impl->getBytes(); } // ==================== END IMPLEMENTATION ==================== // GCOVR_EXCL_START void VersionedMap::Impl::printInOrder(int64_t version) { printInOrderHelper(version, roots.getThreadSafeHandle().rootForVersion(version), 0); } void VersionedMap::Impl::printInOrderHelper(int64_t version, uint32_t node, int depth) { if (node == 0) { return; } printInOrderHelper(version, child(node, true, version), depth + 1); for (int i = 0; i < depth; ++i) { printf(" "); } printf("node %u: ", node); printf("%.*s", mm.base[node].entry->keyLen, mm.base[node].entry->getKey()); if (mm.base[node].entry->valLen >= 0) { printf(" -> '%.*s' @ %" PRId64, mm.base[node].entry->valLen, mm.base[node].entry->getVal(), mm.base[node].entry->pointVersion); } else { printf(" ", mm.base[node].entry->pointVersion); } if (mm.base[node].entry->clearTo()) { printf(" ", mm.base[node].entry->rangeVersion); } printf("\n"); VersionedMap::Impl::printInOrderHelper( version, child(node, false, version), depth + 1); } VersionedMap::Impl *cast(const VersionedMap &m) { VersionedMap::Impl *result; memcpy(&result, &m, sizeof(void *)); return result; } #if SHOW_MEMORY struct __attribute__((visibility("default"))) PeakPrinter { ~PeakPrinter() { printf("--- versioned_map ---\n"); printf("malloc bytes: %g\n", double(mallocBytes)); printf("Peak malloc bytes: %g\n", double(peakMallocBytes)); printf("mmap bytes: %g\n", double(mmapBytes)); printf("Peak mmap bytes: %g\n", double(peakMmapBytes)); } } peakPrinter; #endif } // namespace weaselab #ifdef ENABLE_MAIN #include #include "PrintMutation.h" void breakpoint_me() {} int main() { { weaselab::VersionedMap versionedMap{0}; printf("Bytes: %" PRId64 "\n", versionedMap.getBytes()); { weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"a", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"b", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"c", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"d", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"e", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"f", nullptr, 1, 0, weaselab::VersionedMap::Set}, }; versionedMap.addMutations(m, sizeof(m) / sizeof(m[0]), 1); } printf("Bytes: %" PRId64 "\n", versionedMap.getBytes()); { weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"a", (const uint8_t *)"d", 1, 1, weaselab::VersionedMap::Clear}, }; versionedMap.addMutations(m, sizeof(m) / sizeof(m[0]), 2); } { weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"b", (const uint8_t *)"", 1, 0, weaselab::VersionedMap::Clear}, }; versionedMap.addMutations(m, sizeof(m) / sizeof(m[0]), 3); } const int64_t v = 3; cast(versionedMap)->printInOrder(v); weaselab::VersionedMap::Key k = {(const uint8_t *)"a", 2}; weaselab::VersionedMap::Iterator iter; versionedMap.firstGeq(&k, &v, &iter, 1); printf("Bytes: %" PRId64 "\n", versionedMap.getBytes()); versionedMap.setOldestVersion(2); printf("Bytes: %" PRId64 "\n", versionedMap.getBytes()); breakpoint_me(); for (auto end = versionedMap.end(v); iter != end; ++iter) { printMutation(*iter); } } return 0; } #endif // GCOVR_EXCL_STOP