#include "VersionedMap.h" #include "RootSet.h" #include #include #include #include #include #include #include #include #include #include #ifndef DEBUG_VERBOSE #define DEBUG_VERBOSE 0 #endif #if DEBUG_VERBOSE // Use to toggle debug verbose dynamically bool debugVerboseEnabled = true; #endif static_assert(std::bidirectional_iterator); void *mmapSafe(void *addr, size_t len, int prot, int flags, int fd, off_t offset) { void *result = mmap(addr, len, prot, flags, fd, offset); if (result == MAP_FAILED) { int err = errno; // GCOVR_EXCL_LINE fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Error calling mmap(%p, %zu, %d, %d, %d, %jd): %d %s\n", // GCOVR_EXCL_LINE addr, len, prot, flags, fd, (intmax_t)offset, err, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } return result; } void mprotectSafe(void *p, size_t s, int prot) { if (mprotect(p, s, prot) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, // GCOVR_EXCL_LINE "Error calling mprotect(%p, %zu, %d): %s\n", // GCOVR_EXCL_LINE p, // GCOVR_EXCL_LINE s, // GCOVR_EXCL_LINE prot, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } void munmapSafe(void *ptr, size_t size) { if (munmap(ptr, size) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, "Error calling munmap(%p, %zu): %s\n", // GCOVR_EXCL_LINE ptr, // GCOVR_EXCL_LINE size, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } namespace weaselab { // 96 is enough for an entire search path in a tree with a size that // overflows int. See // https://en.wikipedia.org/wiki/Random_binary_tree#The_longest_path constexpr int kPathLengthUpperBound = 96; struct Entry { int64_t insertVersion; int keyLen; // Negative if this key is cleared int valLen; mutable int refCount; uint32_t priority; // True if mutations in (pred, this) are cleared. If false, (pred, this) // should be read through to the underlying data structure. bool clearTo; // There's an extra zero byte past the end of getKey, used for // reconstructing logical mutations without copies. const uint8_t *getKey() const { return (const uint8_t *)(this + 1); } const uint8_t *getVal() const { return (const uint8_t *)(this + 1) + 1 + keyLen; } Entry *addref() const { ++refCount; return (Entry *)this; } void delref() const { if (--refCount == 0) { free((void *)this); } } static Entry *make(int64_t insertVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, bool clearTo) { auto e = (Entry *)malloc(sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); e->insertVersion = insertVersion; e->keyLen = keyLen; e->valLen = valLen; e->refCount = 1; e->priority = XXH3_64bits(key, keyLen); e->clearTo = clearTo; if (keyLen > 0) { memcpy((uint8_t *)e->getKey(), key, keyLen); } ((uint8_t *)e->getKey())[keyLen] = 0; if (valLen > 0) { memcpy((uint8_t *)e->getVal(), val, valLen); } return e; } }; struct Node { union { int64_t updateVersion; uint32_t nextFree; }; Entry *entry; uint32_t pointer[3]; bool replacedPointer; std::atomic updated; }; // Limit mmap to 32 GiB so valgrind doesn't complain. // https://bugs.kde.org/show_bug.cgi?id=229500 constexpr size_t kMapSize = size_t(32) * (1 << 30); const size_t kPageSize = sysconf(_SC_PAGESIZE); const uint32_t kNodesPerPage = kPageSize / sizeof(Node); const uint32_t kMinAddressable = kNodesPerPage; constexpr uint32_t kUpsizeBytes = 1 << 20; constexpr uint32_t kUpsizeNodes = kUpsizeBytes / sizeof(Node); static_assert(kUpsizeNodes * sizeof(Node) == kUpsizeBytes); struct BitSet { explicit BitSet(uint32_t size) : words((uint64_t *)malloc(size / 8 + 8)) {} bool test(uint32_t i) const { return words[i >> 6] & (uint64_t(1) << (i & 63)); } // Returns former value bool set(uint32_t i) { const auto prev = words[i >> 6]; const auto mask = uint64_t(1) << (i & 63); words[i >> 6] |= mask; max_ = std::max(i, max_); return prev & mask; } // Returns 0 if set is empty uint32_t max() const { return max_; } template void iterateAbsentApproxBackwards(F f, uint32_t begin, uint32_t end) const { // TODO can this be improved? We can do something with a word at a time // instead of a bit at a time. The first attempt at doing so benchmarked as // slower. assert(begin != 0); for (uint32_t i = end - 1; i >= begin; --i) { if (!test(i)) { f(i); } } } ~BitSet() { free(words); } private: uint32_t max_ = 0; uint64_t *const words; }; struct MemManager { MemManager() : base((Node *)mmapSafe(nullptr, kMapSize, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) { if (kPageSize % sizeof(Node) != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kPageSize not a multiple of Node size\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } if (kUpsizeBytes % kPageSize != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kUpsizeBytes not a multiple of kPageSize\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } ~MemManager() { gc(nullptr, 0, 0); munmapSafe(base, kMapSize); } Node *const base; uint32_t allocate() { if (freeList != 0) { uint32_t result = freeList; freeList = base[result].nextFree; assert(base[result].entry == nullptr); return result; } if (next == firstUnaddressable) { mprotectSafe(base + firstUnaddressable, kUpsizeBytes, PROT_READ | PROT_WRITE); firstUnaddressable += kUpsizeNodes; if (firstUnaddressable > kMapSize / sizeof(Node)) { fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Out of memory: firstUnaddressable > kMapSize / " // GCOVR_EXCL_LINE "sizeof(Node)\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } return next++; } void gc(const uint32_t *roots, int numRoots, int64_t oldestVersion) { // Calculate reachable set BitSet reachable{next}; // Each node has at most 3 children and nodes along the search path aren't // in the stack, so we need 2 * kPathLengthUpperBound uint32_t stack[2 * kPathLengthUpperBound]; int stackIndex = 0; auto tryPush = [&](uint32_t p) { if (!reachable.set(p)) { assert(stackIndex < sizeof(stack) / sizeof(stack[0])); stack[stackIndex++] = p; } }; for (int i = 0; i < numRoots; ++i) { if (roots[i] == 0) { continue; } tryPush(roots[i]); while (stackIndex > 0) { uint32_t p = stack[--stackIndex]; auto &node = base[p]; if (node.updated.load(std::memory_order_relaxed)) { if (node.pointer[!node.replacedPointer] != 0) { tryPush(node.pointer[!node.replacedPointer]); } if (oldestVersion < node.updateVersion) { if (node.pointer[node.replacedPointer] != 0) { tryPush(node.pointer[node.replacedPointer]); } } tryPush(node.pointer[2]); } else { if (node.pointer[0] != 0) { tryPush(node.pointer[0]); } if (node.pointer[1] != 0) { tryPush(node.pointer[1]); } } } } // Reclaim memory on the right side uint32_t max = reachable.max(); if (max == 0) { max = kMinAddressable - 1; } assert(max < next); uint32_t newFirstUnaddressable = (max / kNodesPerPage + 1) * kNodesPerPage; if (newFirstUnaddressable < firstUnaddressable) { for (int i = newFirstUnaddressable; i < firstUnaddressable; ++i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while shrinking right\n", i); } #endif base[i].entry->delref(); } } mprotectSafe(base + newFirstUnaddressable, (firstUnaddressable - newFirstUnaddressable) * sizeof(Node), PROT_NONE); firstUnaddressable = newFirstUnaddressable; } next = max + 1; // Rebuild free list and delref entries freeList = 0; reachable.iterateAbsentApproxBackwards( [&](uint32_t i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE if (debugVerboseEnabled) { printf("Collecting %u while building free list\n", i); } #endif base[i].entry->delref(); base[i].entry = nullptr; } base[i].nextFree = freeList; freeList = i; }, kMinAddressable, next); } private: uint32_t next = kMinAddressable; uint32_t firstUnaddressable = kMinAddressable; uint32_t freeList = 0; }; auto operator<=>(const VersionedMap::Key &lhs, const Node &rhs) { int cl = std::min(lhs.len, rhs.entry->keyLen); if (cl > 0) { int c = memcmp(lhs.p, rhs.entry->getKey(), cl); if (c != 0) { return c <=> 0; } } return lhs.len <=> rhs.entry->keyLen; } struct Finger { void push(uint32_t node, bool dir) { searchPath[searchPathSize_] = node; direction[searchPathSize_] = dir; ++searchPathSize_; } void pop() { --searchPathSize_; } uint32_t backNode() const { assert(searchPathSize_ > 0); return searchPath[searchPathSize_ - 1]; } bool backDirection() const { assert(searchPathSize_ > 0); return direction[searchPathSize_ - 1]; } uint32_t searchPathSize() const { return searchPathSize_; } Finger() : searchPathSize_(0) {} Finger(const Finger &other) { #ifndef NDEBUG memset(searchPath, 0, sizeof(searchPath)); memset(direction, 0, sizeof(direction)); #endif memcpy(searchPath, other.searchPath, other.searchPathSize_ * sizeof(searchPath[0])); memcpy(direction, other.direction, other.searchPathSize_ * sizeof(direction[0])); searchPathSize_ = other.searchPathSize_; } Finger &operator=(const Finger &other) { #ifndef NDEBUG memset(searchPath, 0, sizeof(searchPath)); memset(direction, 0, sizeof(direction)); #endif memcpy(searchPath, other.searchPath, other.searchPathSize_ * sizeof(searchPath[0])); memcpy(direction, other.direction, other.searchPathSize_ * sizeof(direction[0])); searchPathSize_ = other.searchPathSize_; return *this; } private: uint32_t searchPath[kPathLengthUpperBound]; bool direction[kPathLengthUpperBound]; int searchPathSize_; }; struct VersionedMap::Impl { template void move(Finger &finger, int64_t at, bool direction) { uint32_t c; if (finger.backNode() != 0 && (c = child(finger.backNode(), direction, at)) != 0) { finger.push(c, direction); while (auto c = child(finger.backNode(), !direction, at) != 0) { finger.push(c, !direction); } } else { while (finger.searchPathSize() > 1 && finger.backDirection() == true) { finger.pop(); } finger.pop(); } } template uint32_t child(uint32_t node, bool which, int64_t at) { static_assert(kOrder == std::memory_order_acquire || kOrder == std::memory_order_relaxed); auto &n = mm.base[node]; uint32_t result; if (n.updated.load(kOrder) && n.updateVersion <= at && which == n.replacedPointer) { result = n.pointer[2]; } else { result = n.pointer[which]; } assert(result == 0 || result >= kMinAddressable); return result; } template uint32_t left(uint32_t node, bool which, int64_t at) { return child(node, false, at); } template uint32_t right(uint32_t node, bool which, int64_t at) { return child(node, true, at); } // Returns the node that results from setting `which` to `child` on `node` uint32_t update(uint32_t node, bool which, uint32_t child, int64_t version) { assert(node == 0 || node >= kMinAddressable); assert(child == 0 || child >= kMinAddressable); if (this->child(node, which, version) == child) { return node; } auto &n = mm.base[node]; const bool updated = n.updated.load(std::memory_order_relaxed); auto doCopy = [&]() { uint32_t copy = mm.allocate(); auto &c = mm.base[copy]; c.entry = n.entry->addref(); c.pointer[which] = child; c.pointer[!which] = n.pointer[!which]; c.updated.store(false, std::memory_order_relaxed); c.updateVersion = version; assert(copy == 0 || copy >= kMinAddressable); return copy; }; if (n.updateVersion == version) { // The reason these aren't data races is that concurrent readers are // reading < `version` if (updated && n.replacedPointer != which) { // We can't update n.replacedPointer without introducing a data race // (unless we packed it into the atomic?) so we copy. pointer[2] becomes // unreachable, but need to tell the garbage collector. n.pointer[2] = 0; return doCopy(); } else if (updated) { n.pointer[2] = child; } else { n.pointer[which] = child; } assert(node == 0 || node >= kMinAddressable); return node; } if (updated) { // We already used this node's in-place update return doCopy(); } else { n.updateVersion = version; n.pointer[2] = child; n.replacedPointer = which; n.updated.store(true, std::memory_order_release); // Must be last assert(node == 0 || node >= kMinAddressable); return node; } } void rotate(uint32_t &n, int64_t at, bool right) { auto l = child(n, !right, at); n = update( l, right, update(n, !right, child(l, right, at), at), at); } struct Val { const uint8_t *p; int len; }; // Infers `val` and `clearTo` if not set void insert(Key key, std::optional val, std::optional clearTo) { Finger finger; bool ignored; finger.push(latestRoot, ignored); bool inserted; // Initialize finger to the search path of `m` for (;;) { auto n = finger.backNode(); if (n == 0) { inserted = true; break; } auto c = key <=> mm.base[n]; if (c == 0) { // No duplicates inserted = false; break; } finger.push(child(n, c > 0, latestVersion), c > 0); } // Infer `val` if not set if (!val.has_value()) { if (inserted) { val = {nullptr, -1}; } else { auto *entry = mm.base[finger.backNode()].entry; val = {entry->getVal(), entry->valLen}; } } // Infer `clearTo` if not set if (!clearTo.has_value()) { if (inserted) { auto copy = finger; move(copy, latestVersion, true); if (copy.searchPathSize() == 0) { clearTo = false; } else { clearTo = mm.base[copy.backNode()].entry->clearTo; } } else { clearTo = false; } } // Prepare new node uint32_t node = newNode(latestVersion, key.p, key.len, val->p, val->len, *clearTo); if (!inserted) { auto &n = mm.base[node]; n.pointer[0] = child(finger.backNode(), false, latestVersion); n.pointer[1] = child(finger.backNode(), true, latestVersion); } // Rotate and propagate up the search path for (;;) { if (finger.searchPathSize() == 1) { // Made it to the root latestRoot = node; break; } const bool direction = finger.backDirection(); finger.pop(); auto parent = finger.backNode(); parent = update(parent, direction, node, latestVersion); if (inserted && mm.base[node].entry->priority > mm.base[parent].entry->priority) { rotate(parent, latestVersion, !direction); } else { if (parent == finger.backNode()) { break; } } node = parent; } } uint32_t newNode(int64_t version, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, bool clearTo) { auto result = mm.allocate(); auto &node = mm.base[result]; node.updateVersion = version; node.pointer[0] = 0; node.pointer[1] = 0; node.updated.store(false, std::memory_order_relaxed); node.entry = Entry::make(version, key, keyLen, val, valLen, clearTo); return result; } void setOldestVersion(int64_t oldestVersion) { roots.setOldestVersion(oldestVersion); mm.gc(roots.roots(), roots.rootCount(), oldestVersion); } void printInOrder(int64_t version); void printInOrderHelper(int64_t version, uint32_t node); void addMutations(const Mutation *mutations, int numMutations, int64_t version) { assert(latestVersion < version); latestVersion = version; latestRoot = roots.roots()[roots.rootCount() - 1]; // TODO Improve ILP? for (int i = 0; i < numMutations; ++i) { const auto &m = mutations[i]; switch (m.type) { case Set: { insert({m.param1, m.param1Len}, {{m.param2, m.param2Len}}, {}); } break; case Clear: { insert({m.param1, m.param1Len}, {{nullptr, -1}}, {}); // TODO erase (param1, param2) insert({m.param2, m.param2Len}, {}, true); } break; default: // GCOVR_EXCL_LINE __builtin_unreachable(); // GCOVR_EXCL_LINE } } roots.add(latestRoot, latestVersion); } MemManager mm; RootSet roots; // Only meaningful within the callstack of `addMutations` uint32_t latestRoot; int64_t latestVersion = 0; }; VersionedMap::VersionedMap(int64_t version) : impl(new(malloc(sizeof(Impl))) Impl()) { impl->latestVersion = version; } VersionedMap::~VersionedMap() { if (impl != nullptr) { impl->~Impl(); free(impl); } } VersionedMap::VersionedMap(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); } VersionedMap &VersionedMap::operator=(VersionedMap &&other) noexcept { impl = std::exchange(other.impl, nullptr); return *this; } void VersionedMap::addMutations(const Mutation *mutations, int numMutations, int64_t version) { impl->addMutations(mutations, numMutations, version); } // ==================== END IMPLEMENTATION ==================== // GCOVR_EXCL_START void VersionedMap::Impl::printInOrder(int64_t version) { printInOrderHelper(version, roots.getThreadSafeHandle().rootForVersion(version)); } void VersionedMap::Impl::printInOrderHelper(int64_t version, uint32_t node) { if (node == 0) { return; } printInOrderHelper(version, child(node, false, version)); printf("%.*s", mm.base[node].entry->keyLen, mm.base[node].entry->getKey()); if (mm.base[node].entry->valLen >= 0) { printf(" -> '%.*s'", mm.base[node].entry->valLen, mm.base[node].entry->getVal()); } else { printf(" "); } if (mm.base[node].entry->clearTo) { printf(" "); } printf("\n"); VersionedMap::Impl::printInOrderHelper( version, child(node, true, version)); } } // namespace weaselab #ifdef ENABLE_MAIN #include int main() { { weaselab::VersionedMap::Impl impl; weaselab::VersionedMap::Mutation m[] = { {(const uint8_t *)"a", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"b", nullptr, 1, 0, weaselab::VersionedMap::Set}, {(const uint8_t *)"c", nullptr, 1, 0, weaselab::VersionedMap::Set}, }; impl.addMutations(m, sizeof(m) / sizeof(m[0]), 1); impl.printInOrder(1); } ankerl::nanobench::Bench bench; bench.minEpochIterations(10000); weaselab::MemManager mm; bench.run("allocate", [&]() { auto x = mm.allocate(); mm.base[x].pointer[0] = 0; mm.base[x].pointer[1] = 0; mm.base[x].updated.store(false, std::memory_order_relaxed); }); mm.gc(nullptr, 0, 0); for (int i = 0; i < 10000; ++i) { auto x = mm.allocate(); mm.base[x].pointer[0] = 0; mm.base[x].pointer[1] = 0; mm.base[x].updated.store(false, std::memory_order_relaxed); } auto root = mm.allocate(); mm.base[root].entry = weaselab::Entry::make(0, nullptr, 0, nullptr, 0, weaselab::VersionedMap::Set); mm.base[root].pointer[0] = 0; mm.base[root].pointer[1] = 0; mm.base[root].updated.store(false, std::memory_order_relaxed); bench.run("gc", [&]() { mm.gc(&root, 1, 0); }); { int i = 0; constexpr int kNumVersions = 1000; RootSet roots; for (; i < kNumVersions; i += 2) { roots.add(i, i); roots.add(i, i + 1); } bench.run("roots - setOldestVersion", [&]() { roots.add(i, i); roots.setOldestVersion(i - kNumVersions); ++i; }); bench.run("roots - rootForVersion", [&]() { bench.doNotOptimizeAway( roots.getThreadSafeHandle().rootForVersion(i - kNumVersions / 2)); }); } } #endif // GCOVR_EXCL_STOP