#include "VersionedMap.h" #include #include #include #include #include #include #include #include #include #include #ifndef DEBUG_VERBOSE #define DEBUG_VERBOSE 0 #endif void *mmapSafe(void *addr, size_t len, int prot, int flags, int fd, off_t offset) { void *result = mmap(addr, len, prot, flags, fd, offset); if (result == MAP_FAILED) { int err = errno; // GCOVR_EXCL_LINE fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Error calling mmap(%p, %zu, %d, %d, %d, %jd): %d %s\n", // GCOVR_EXCL_LINE addr, len, prot, flags, fd, (intmax_t)offset, err, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } return result; } void mprotectSafe(void *p, size_t s, int prot) { if (mprotect(p, s, prot) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, // GCOVR_EXCL_LINE "Error calling mprotect(%p, %zu, %d): %s\n", // GCOVR_EXCL_LINE p, // GCOVR_EXCL_LINE s, // GCOVR_EXCL_LINE prot, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } void munmapSafe(void *ptr, size_t size) { if (munmap(ptr, size) != 0) { int err = errno; // GCOVR_EXCL_LINE fprintf(stderr, "Error calling munmap(%p, %zu): %s\n", // GCOVR_EXCL_LINE ptr, // GCOVR_EXCL_LINE size, // GCOVR_EXCL_LINE strerror(err)); // GCOVR_EXCL_LINE fflush(stderr); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } namespace weaselab { struct Entry { int64_t insertVersion; int keyLen; // Negative if this key is cleared int valLen; mutable int refCount; uint32_t priority; // True if mutations in (pred, this) are cleared. If false, (pred, this) // should be read through to the underlying data structure. bool clearTo; // There's an extra zero byte past the end of getKey, used for // reconstructing logical mutations without copies. const uint8_t *getKey() const { return (const uint8_t *)(this + 1); } const uint8_t *getVal() const { return (const uint8_t *)(this + 1) + 1 + keyLen; } Entry *addref() const { ++refCount; return (Entry *)this; } void delref() const { if (--refCount == 0) { free((void *)this); } } static Entry *make(int64_t insertVersion, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, bool clearTo) { auto e = (Entry *)malloc(sizeof(Entry) + keyLen + 1 + std::max(valLen, 0)); e->insertVersion = insertVersion; e->keyLen = keyLen; e->valLen = valLen; e->refCount = 1; e->priority = XXH3_64bits(key, keyLen); e->clearTo = clearTo; memcpy((uint8_t *)e->getKey(), key, keyLen); ((uint8_t *)e->getKey())[keyLen] = 0; memcpy((uint8_t *)e->getVal(), val, std::max(valLen, 0)); return e; } }; struct Node { union { int64_t updateVersion; uint32_t nextFree; }; Entry *entry; uint32_t pointer[3]; bool replacedPointer; std::atomic updated; }; // Limit mmap to 32 GiB so valgrind doesn't complain. // https://bugs.kde.org/show_bug.cgi?id=229500 constexpr size_t kMapSize = size_t(32) * (1 << 30); const size_t kPageSize = sysconf(_SC_PAGESIZE); const uint32_t kNodesPerPage = kPageSize / sizeof(Node); const uint32_t kMinAddressable = kNodesPerPage; constexpr uint32_t kUpsizeBytes = 1 << 20; constexpr uint32_t kUpsizeNodes = kUpsizeBytes / sizeof(Node); static_assert(kUpsizeNodes * sizeof(Node) == kUpsizeBytes); struct BitSet { explicit BitSet(uint32_t size) : words((uint64_t *)malloc(size / 8 + 8)) {} bool test(uint32_t i) const { return words[i >> 6] & (uint64_t(1) << (i & 63)); } // Returns former value bool set(uint32_t i) { const auto prev = words[i >> 6]; const auto mask = uint64_t(1) << (i & 63); words[i >> 6] |= mask; max_ = std::max(i, max_); return prev & mask; } // Returns 0 if set is empty uint32_t max() const { return max_; } template void iterateAbsentApproxBackwards(F f, uint32_t begin, uint32_t end) const { // TODO can this be improved? We can do something with a word at a time // instead of a bit at a time. The first attempt at doing so benchmarked as // slower. assert(begin != 0); for (uint32_t i = end - 1; i >= begin; --i) { if (!test(i)) { f(i); } } } ~BitSet() { free(words); } private: uint32_t max_ = 0; uint64_t *const words; }; struct MemManager { MemManager() : base((Node *)mmapSafe(nullptr, kMapSize, PROT_NONE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) { if (kPageSize % sizeof(Node) != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kPageSize not a multiple of Node size\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } if (kUpsizeBytes % kPageSize != 0) { fprintf(stderr, // GCOVR_EXCL_LINE "kUpsizeBytes not a multiple of kPageSize\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } ~MemManager() { gc(nullptr, 0, 0); munmapSafe(base, kMapSize); } Node *const base; uint32_t allocate() { if (freeList != 0) { uint32_t result = freeList; freeList = base[result].nextFree; assert(base[result].entry == nullptr); return result; } if (next == firstUnaddressable) { mprotectSafe(base + firstUnaddressable, kUpsizeBytes, PROT_READ | PROT_WRITE); firstUnaddressable += kUpsizeNodes; if (firstUnaddressable > kMapSize / sizeof(Node)) { fprintf( // GCOVR_EXCL_LINE stderr, // GCOVR_EXCL_LINE "Out of memory: firstUnaddressable > kMapSize / " // GCOVR_EXCL_LINE "sizeof(Node)\n"); // GCOVR_EXCL_LINE abort(); // GCOVR_EXCL_LINE } } return next++; } void gc(const uint32_t *roots, int numRoots, int64_t oldestVersion) { // Calculate reachable set BitSet reachable{next}; uint32_t stack[1000]; // Much more than bound imposed by max height of tree int stackIndex = 0; auto tryPush = [&](uint32_t p) { if (!reachable.set(p)) { assert(stackIndex < sizeof(stack) / sizeof(stack[0])); stack[stackIndex++] = p; } }; for (int i = 0; i < numRoots; ++i) { if (roots[i] == 0) { continue; } tryPush(roots[i]); while (stackIndex > 0) { uint32_t p = stack[--stackIndex]; auto &node = base[p]; if (node.updated.load(std::memory_order_relaxed)) { if (node.pointer[!node.replacedPointer] != 0) { tryPush(node.pointer[!node.replacedPointer]); } if (oldestVersion < node.updateVersion) { if (node.pointer[node.replacedPointer] != 0) { tryPush(node.pointer[node.replacedPointer]); } } tryPush(node.pointer[2]); } else { if (node.pointer[0] != 0) { tryPush(node.pointer[0]); } if (node.pointer[1] != 0) { tryPush(node.pointer[1]); } } } } // Reclaim memory on the right side uint32_t max = reachable.max(); if (max == 0) { max = kMinAddressable - 1; } assert(max < next); uint32_t newFirstUnaddressable = (max / kNodesPerPage + 1) * kNodesPerPage; if (newFirstUnaddressable < firstUnaddressable) { for (int i = newFirstUnaddressable; i < firstUnaddressable; ++i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE printf("Collecting %u\n", i); #endif base[i].entry->delref(); } } mprotectSafe(base + newFirstUnaddressable, (firstUnaddressable - newFirstUnaddressable) * sizeof(Node), PROT_NONE); firstUnaddressable = newFirstUnaddressable; } next = max + 1; // Rebuild free list and delref entries freeList = 0; reachable.iterateAbsentApproxBackwards( [&](uint32_t i) { if (base[i].entry != nullptr) { #if DEBUG_VERBOSE printf("Collecting %u\n", i); #endif base[i].entry->delref(); base[i].entry = nullptr; } base[i].nextFree = freeList; freeList = i; }, kMinAddressable, next); } private: uint32_t next = kMinAddressable; uint32_t firstUnaddressable = kMinAddressable; uint32_t freeList = 0; }; struct RootSet { /// Register the root node for version after adding mutations void add(uint32_t node, int64_t version) { if (end == 0) { nodes[end] = node; versions[end] = version; ++end; return; } if (nodes[end - 1] == node) { return; } if (end == capacity) { capacity *= 2; nodes = (uint32_t *)realloc(nodes, capacity * sizeof(uint32_t)); versions = (int64_t *)realloc(versions, capacity * sizeof(int64_t)); } nodes[end] = node; versions[end] = version; ++end; } /// Inform that there will be no calls to rootForVersion with a version less /// than `oldestVersion` void setOldestVersion(int64_t oldestVersion) { const uint32_t firstToKeep = lastLeq(oldestVersion); if (firstToKeep != 0) { memmove(nodes, nodes + firstToKeep, (end - firstToKeep) * sizeof(uint32_t)); memmove(versions, versions + firstToKeep, (end - firstToKeep) * sizeof(int64_t)); end -= firstToKeep; } assert(end > 0); assert(versions[0] <= oldestVersion); } /// Get a root node that can correctly be used for `version` uint32_t rootForVersion(int64_t version) const { return nodes[lastLeq(version)]; } const uint32_t *roots() const { return nodes; } int rootCount() const { return end; } RootSet() { nodes = (uint32_t *)malloc(kMinCapacity * sizeof(uint32_t)); versions = (int64_t *)malloc(kMinCapacity * sizeof(int64_t)); capacity = kMinCapacity; nodes[0] = 0; versions[0] = 0; end = 1; } ~RootSet() { free(versions); free(nodes); } private: uint32_t lastLeq(int64_t version) const { assert(end > 0); assert(versions[0] <= version); // Find the last version <= oldestVersion int left = 1; int right = end - 1; int result = 0; while (left <= right) { int mid = left + (right - left) / 2; if (versions[mid] <= version) { result = mid; left = mid + 1; } else { right = mid - 1; } } assert(result < end); return result; } uint32_t *nodes; // versions[i] is the version of nodes[i] int64_t *versions; constexpr static uint32_t kMinCapacity = 16; uint32_t capacity; uint32_t end; }; struct VersionedMap::Impl { template uint32_t child(uint32_t node, bool which, int64_t at) { static_assert(kOrder == std::memory_order_acquire || kOrder == std::memory_order_relaxed); auto &n = mm.base[node]; if (n.updated.load(kOrder) && n.updateVersion <= at && which == n.replacedPointer) { return n.pointer[2]; } else { return n.pointer[which]; } } template uint32_t left(uint32_t node, bool which, int64_t at) { return child(node, false, at); } template uint32_t right(uint32_t node, bool which, int64_t at) { return child(node, true, at); } // Returns the node that results from setting `which` to `child` on `node` uint32_t update(uint32_t node, int64_t version, bool which, uint32_t child) { if (this->child(node, which, version) == child) { return node; } auto &n = mm.base[node]; const bool updated = n.updated.load(std::memory_order_relaxed); auto doCopy = [&]() { uint32_t copy = mm.allocate(); auto &c = mm.base[copy]; c.entry = n.entry->addref(); c.pointer[which] = child; c.pointer[!which] = n.pointer[!which]; c.updated.store(false, std::memory_order_relaxed); c.updateVersion = version; return copy; }; if (n.updateVersion == version) { if (updated && n.replacedPointer != which) { // We can't update n.replacedPointer without introducing a data race // (unless we packed it into the atomic?) so we copy. pointer[2] becomes // unreachable, but need to tell the garbage collector. n.pointer[2] = 0; return doCopy(); } else if (updated) { n.pointer[2] = child; } else { n.pointer[which] = child; } return node; } if (updated) { // We already used this node's in-place update return doCopy(); } else { n.updateVersion = version; n.pointer[2] = child; n.replacedPointer = which; n.updated.store(true, std::memory_order_release); // Must be last return node; } } void rotate(uint32_t &n, int64_t at, bool right) { auto l = child(n, !right, at); n = update( l, right, update(n, !right, child(l, right, at), at), at); } uint32_t newNode(int64_t version, const uint8_t *key, int keyLen, const uint8_t *val, int valLen, bool clearTo) { auto result = mm.allocate(); auto &node = mm.base[result]; node.updateVersion = version; node.pointer[0] = 0; node.pointer[1] = 0; node.updated.store(false, std::memory_order_relaxed); node.entry = Entry::make(version, key, keyLen, val, valLen, clearTo); return result; } void setOldestVersion(int64_t oldestVersion) { roots.setOldestVersion(oldestVersion); mm.gc(roots.roots(), roots.rootCount(), oldestVersion); } void printInOrder(int64_t version) { printInOrderHelper(version, roots.rootForVersion(version)); } void printInOrderHelper(int64_t version, uint32_t node) { if (node == 0) { return; } printInOrderHelper(version, child(node, false, version)); printf("%.*s", mm.base[node].entry->keyLen, mm.base[node].entry->getKey()); if (mm.base[node].entry->valLen >= 0) { printf(" -> '%.*s'", mm.base[node].entry->valLen, mm.base[node].entry->getVal()); } else { printf(" "); } if (mm.base[node].entry->clearTo) { printf(" "); } printf("\n"); printInOrderHelper(version, child(node, true, version)); } MemManager mm; RootSet roots; }; } // namespace weaselab #ifdef ENABLE_MAIN #include int main() { { weaselab::VersionedMap::Impl impl; impl.roots.add(impl.newNode(1, (const uint8_t *)"a", 1, nullptr, 0, true), 1); impl.roots.add(impl.newNode(2, (const uint8_t *)"b", 1, nullptr, -1, false), 2); impl.roots.add(impl.newNode(3, (const uint8_t *)"c", 1, nullptr, -1, false), 3); impl.printInOrder(0); impl.printInOrder(1); impl.printInOrder(2); impl.printInOrder(3); impl.setOldestVersion(3); } return 0; ankerl::nanobench::Bench bench; bench.minEpochIterations(5000); weaselab::MemManager mm; bench.run("allocate", [&]() { auto x = mm.allocate(); mm.base[x].pointer[0] = 0; mm.base[x].pointer[1] = 0; mm.base[x].updated.store(false, std::memory_order_relaxed); }); mm.gc(nullptr, 0, 0); for (int i = 0; i < 10000; ++i) { auto x = mm.allocate(); mm.base[x].pointer[0] = 0; mm.base[x].pointer[1] = 0; mm.base[x].updated.store(false, std::memory_order_relaxed); } auto root = mm.allocate(); mm.base[root].entry = weaselab::Entry::make(0, nullptr, 0, nullptr, 0, weaselab::VersionedMap::Set); mm.base[root].pointer[0] = 0; mm.base[root].pointer[1] = 0; mm.base[root].updated.store(false, std::memory_order_relaxed); bench.run("gc", [&]() { mm.gc(&root, 1, 0); }); { int i = 0; constexpr int kNumVersions = 1000; weaselab::RootSet roots; for (; i < kNumVersions; i += 2) { roots.add(i, i); roots.add(i, i + 1); } bench.run("roots - setOldestVersion", [&]() { roots.add(i, i); roots.setOldestVersion(i - kNumVersions); ++i; }); bench.run("roots - rootForVersion", [&]() { bench.doNotOptimizeAway(roots.rootForVersion(i - kNumVersions / 2)); }); } } #endif