diff --git a/.clangd b/.clangd index 1b411b0..086e29b 100644 --- a/.clangd +++ b/.clangd @@ -1,2 +1,2 @@ CompileFlags: - Add: [-DENABLE_MAIN, -UNDEBUG, -DENABLE_FUZZ, -DTHREAD_TEST, -fexceptions, -DDEBUG_VERBOSE=1] + Add: [-DENABLE_MAIN, -UNDEBUG, -DENABLE_FUZZ, -DTHREAD_TEST, -fexceptions, -DDEBUG_VERBOSE=1, -DENABLE_ROOTSET_TESTS] diff --git a/CMakeLists.txt b/CMakeLists.txt index a53a131..2e3e125 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -60,14 +60,31 @@ set(CMAKE_CXX_IMPLICIT_LINK_LIBRARIES "") add_subdirectory(third_party) -add_executable(versioned_map_main VersionedMap.cpp) +add_executable(versioned_map_main VersionedMap.cpp RootSet.cpp) target_include_directories(versioned_map_main PRIVATE ${CMAKE_SOURCE_DIR}/include) target_link_libraries(versioned_map_main PRIVATE nanobench xxhash) target_compile_definitions(versioned_map_main PRIVATE ENABLE_MAIN) -add_library(versioned_map VersionedMap.cpp) +add_library(versioned_map VersionedMap.cpp RootSet.cpp) target_link_libraries(versioned_map PRIVATE xxhash) target_compile_options(versioned_map PRIVATE -fno-exceptions) target_include_directories(versioned_map PUBLIC ${CMAKE_SOURCE_DIR}/include) set_target_properties(versioned_map PROPERTIES LINKER_LANGUAGE C) + +include(CTest) + +if(BUILD_TESTING) + add_executable(rootset_test RootSet.cpp) + target_compile_definitions(rootset_test PRIVATE ENABLE_ROOTSET_TESTS) + target_compile_options(rootset_test PRIVATE -fsanitize=address,undefined + -UNDEBUG) + target_link_options(rootset_test PRIVATE -fsanitize=address,undefined) + add_test(NAME rootset_test COMMAND rootset_test) + + add_executable(rootset_test_tsan RootSet.cpp) + target_compile_definitions(rootset_test_tsan PRIVATE ENABLE_ROOTSET_TESTS) + target_compile_options(rootset_test_tsan PRIVATE -fsanitize=thread -UNDEBUG) + target_link_options(rootset_test_tsan PRIVATE -fsanitize=thread) + add_test(NAME rootset_test_tsan COMMAND rootset_test) +endif() diff --git a/RootSet.cpp b/RootSet.cpp new file mode 100644 index 0000000..1b656ac --- /dev/null +++ b/RootSet.cpp @@ -0,0 +1,209 @@ +#include "RootSet.h" + +#include +#include +#include +#include +#include +#include + +struct RootSet::ThreadSafeHandle::Impl { + + static Impl *create(int capacity) { + int size = + sizeof(Impl) + sizeof(int64_t) * capacity + sizeof(uint32_t) * capacity; + auto *result = (Impl *)malloc(size); + result->capacity = capacity; + return result; + } + + int64_t *versions() { return (int64_t *)(this + 1); } + uint32_t *roots() { return (uint32_t *)(versions() + capacity); } + + // Linked list of Impl's to free, ordered by version + Impl *next; + int capacity; + std::atomic end; + + // Find the index of the last version <= version, or 0 if no such version + // exists + uint32_t lastLeq(int64_t version) { + int left = 1; + int right = end.load(std::memory_order_acquire) - 1; + int result = 0; + while (left <= right) { + int mid = left + (right - left) / 2; + if (versions()[mid] <= version) { + result = mid; + left = mid + 1; + } else { + right = mid - 1; + } + } + return result; + } +}; + +struct RootSet::Impl { + + Impl() { + auto *h = ThreadSafeHandle::Impl::create(kMinCapacity); + h->roots()[0] = 0; + h->versions()[0] = 0; + h->end.store(1, std::memory_order_relaxed); + handle.store(h, std::memory_order_relaxed); + firstToFree = nullptr; + lastToFree = nullptr; + oldestVersion = 0; + } + + ~Impl() { + for (auto *i = firstToFree; i != nullptr;) { + auto *tmp = i; + i = i->next; + + free(tmp); + } + free(handle.load(std::memory_order_relaxed)); + } + + void add(uint32_t node, int64_t version) { + ThreadSafeHandle::Impl *h = handle.load(std::memory_order_relaxed); + + // Upsize if necessary + if (h->end.load(std::memory_order_relaxed) == h->capacity) { + h->next = nullptr; + auto begin = h->lastLeq(oldestVersion); + if (lastToFree != nullptr) { + lastToFree->next = h; + lastToFree = h; + } else { + firstToFree = h; + lastToFree = h; + } + auto *newH = ThreadSafeHandle::Impl::create((h->capacity - begin) * 2); + memcpy(newH->roots(), h->roots() + begin, + sizeof(h->roots()[0]) * (h->capacity - begin)); + memcpy(newH->versions(), h->versions() + begin, + sizeof(h->versions()[0]) * (h->capacity - begin)); + newH->end.store(h->capacity - begin, std::memory_order_relaxed); + handle.store(newH, std::memory_order_release); + h = newH; + } + + auto end = h->end.load(std::memory_order_relaxed); + + if (h->roots()[end - 1] != node) { + h->roots()[end] = node; + h->versions()[end] = version; + h->end.store(end + 1, std::memory_order_release); + } + } + + void setOldestVersion(int64_t oldestVersion) { + this->oldestVersion = oldestVersion; + while (firstToFree != nullptr && firstToFree->next != nullptr && + firstToFree->next->versions()[firstToFree->next->end.load( + std::memory_order_relaxed) - + 1] < oldestVersion) { + auto *tmp = firstToFree; + firstToFree = firstToFree->next; + free(tmp); + } + } + + const uint32_t *roots() const { + auto *h = handle.load(std::memory_order_relaxed); + return h->roots() + h->lastLeq(oldestVersion); + } + + int rootCount() const { + auto *h = handle.load(std::memory_order_relaxed); + return h->end.load(std::memory_order_relaxed) - h->lastLeq(oldestVersion); + } + + ThreadSafeHandle getThreadSafeHandle() const { + ThreadSafeHandle result; + auto *impl = handle.load(std::memory_order_acquire); + memcpy(&result, &impl, sizeof(result)); + return result; + } + + constexpr static uint32_t kMinCapacity = 16; + + std::atomic handle; + + int64_t oldestVersion; + ThreadSafeHandle::Impl *firstToFree; + ThreadSafeHandle::Impl *lastToFree; +}; + +void RootSet::add(uint32_t node, int64_t version) { impl->add(node, version); } + +void RootSet::setOldestVersion(int64_t oldestVersion) { + impl->setOldestVersion(oldestVersion); +} + +uint32_t RootSet::ThreadSafeHandle::rootForVersion(int64_t version) const { + auto result = impl->roots()[impl->lastLeq(version)]; + return result; +} + +RootSet::ThreadSafeHandle RootSet::getThreadSafeHandle() const { + return impl->getThreadSafeHandle(); +} + +const uint32_t *RootSet::roots() const { return impl->roots(); } +int RootSet::rootCount() const { return impl->rootCount(); } + +RootSet::RootSet() : impl(new(malloc(sizeof(Impl))) Impl()) {} + +RootSet::~RootSet() { + impl->~Impl(); + free(impl); +} + +#ifdef ENABLE_ROOTSET_TESTS +#include +#include + +int main() { + constexpr int kNumReaders = 3; + constexpr int kNumVersions = 2000000; + + RootSet rs; + std::latch ready{1 + kNumReaders}; + std::atomic version; + std::vector> doneVersions(kNumReaders); + std::thread writer([&]() { + ready.arrive_and_wait(); + for (int i = 0; i < kNumVersions; ++i) { + rs.add(i / 10, i); + version.store(i); + int min = std::numeric_limits::max(); + for (auto &v : doneVersions) { + min = std::min(min, v.load()); + } + rs.setOldestVersion(min); + } + }); + std::vector readers; + for (int i = 0; i < kNumReaders; ++i) { + readers.emplace_back([&, i]() { + ready.arrive_and_wait(); + for (;;) { + auto v = version.load(); + assert(rs.getThreadSafeHandle().rootForVersion(v) == v / 10); + doneVersions[i].store(v); + if (v == kNumVersions - 1) { + break; + } + } + }); + } + writer.join(); + for (auto &t : readers) { + t.join(); + } +} +#endif \ No newline at end of file diff --git a/RootSet.h b/RootSet.h new file mode 100644 index 0000000..9395532 --- /dev/null +++ b/RootSet.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + +struct RootSet { + + /// Register the root node for version after adding mutations + void add(uint32_t node, int64_t version); + + /// Inform that there will be no calls to rootForVersion with a version less + /// than `oldestVersion` + void setOldestVersion(int64_t oldestVersion); + + /// Foreign threads may freely interact with a `ThreadSafeHandle`, as long as + /// the latest version as of obtaining the handle is greater than + /// `oldestVersion`. + struct ThreadSafeHandle { + + /// Get a root node that can correctly be used for `version` + uint32_t rootForVersion(int64_t version) const; + + /// @private + struct Impl; + + private: + Impl *impl; // not owned + }; + + /// Safe to call from foreign threads + ThreadSafeHandle getThreadSafeHandle() const; + + const uint32_t *roots() const; + int rootCount() const; + + RootSet(); + + ~RootSet(); + + /// @private + struct Impl; + +private: + Impl *impl; +}; diff --git a/VersionedMap.cpp b/VersionedMap.cpp index 92add7b..69438ec 100644 --- a/VersionedMap.cpp +++ b/VersionedMap.cpp @@ -1,4 +1,5 @@ #include "VersionedMap.h" +#include "RootSet.h" #include #include @@ -315,98 +316,6 @@ private: uint32_t freeList = 0; }; -struct RootSet { - - /// Register the root node for version after adding mutations - void add(uint32_t node, int64_t version) { - if (end == 0) { - nodes[end] = node; - versions[end] = version; - ++end; - return; - } - if (nodes[end - 1] == node) { - return; - } - if (end == capacity) { - capacity *= 2; - nodes = (uint32_t *)realloc(nodes, capacity * sizeof(uint32_t)); - versions = (int64_t *)realloc(versions, capacity * sizeof(int64_t)); - } - - nodes[end] = node; - versions[end] = version; - ++end; - } - - /// Inform that there will be no calls to rootForVersion with a version less - /// than `oldestVersion` - void setOldestVersion(int64_t oldestVersion) { - const uint32_t firstToKeep = lastLeq(oldestVersion); - - if (firstToKeep != 0) { - memmove(nodes, nodes + firstToKeep, - (end - firstToKeep) * sizeof(uint32_t)); - memmove(versions, versions + firstToKeep, - (end - firstToKeep) * sizeof(int64_t)); - end -= firstToKeep; - } - assert(end > 0); - assert(versions[0] <= oldestVersion); - } - - /// Get a root node that can correctly be used for `version` - uint32_t rootForVersion(int64_t version) const { - return nodes[lastLeq(version)]; - } - - const uint32_t *roots() const { return nodes; } - int rootCount() const { return end; } - - RootSet() { - nodes = (uint32_t *)malloc(kMinCapacity * sizeof(uint32_t)); - versions = (int64_t *)malloc(kMinCapacity * sizeof(int64_t)); - capacity = kMinCapacity; - nodes[0] = 0; - versions[0] = 0; - end = 1; - } - - ~RootSet() { - free(versions); - free(nodes); - } - -private: - uint32_t lastLeq(int64_t version) const { - assert(end > 0); - assert(versions[0] <= version); - - // Find the last version <= oldestVersion - int left = 1; - int right = end - 1; - int result = 0; - while (left <= right) { - int mid = left + (right - left) / 2; - if (versions[mid] <= version) { - result = mid; - left = mid + 1; - } else { - right = mid - 1; - } - } - assert(result < end); - return result; - } - uint32_t *nodes; - // versions[i] is the version of nodes[i] - int64_t *versions; - - constexpr static uint32_t kMinCapacity = 16; - uint32_t capacity; - uint32_t end; -}; - auto operator<=>(const VersionedMap::Mutation &lhs, const Node &rhs) { int cl = std::min(lhs.param1Length, rhs.entry->keyLen); if (cl > 0) { @@ -593,7 +502,8 @@ struct VersionedMap::Impl { } void printInOrder(int64_t version) { - printInOrderHelper(version, roots.rootForVersion(version)); + printInOrderHelper(version, + roots.getThreadSafeHandle().rootForVersion(version)); } void printInOrderHelper(int64_t version, uint32_t node) { @@ -675,7 +585,7 @@ int main() { { int i = 0; constexpr int kNumVersions = 1000; - weaselab::RootSet roots; + RootSet roots; for (; i < kNumVersions; i += 2) { roots.add(i, i); roots.add(i, i + 1); @@ -686,7 +596,8 @@ int main() { ++i; }); bench.run("roots - rootForVersion", [&]() { - bench.doNotOptimizeAway(roots.rootForVersion(i - kNumVersions / 2)); + bench.doNotOptimizeAway( + roots.getThreadSafeHandle().rootForVersion(i - kNumVersions / 2)); }); }