From dcc5275ec9d4cea784f7c5aa337d86b6c57c291a Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 13 Nov 2024 22:46:11 -0800 Subject: [PATCH] Isolate conflict-set on one thread in server_bench --- CMakeLists.txt | 3 +- ServerBench.cpp | 158 +++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 143 insertions(+), 18 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 7397195..0ea0b71 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -343,8 +343,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING) # c++98 add_executable(conflict_set_cxx_api_test conflict_set_cxx_api_test.cpp) target_compile_options(conflict_set_cxx_api_test PRIVATE ${TEST_FLAGS}) - target_link_libraries(conflict_set_cxx_api_test - PRIVATE ${PROJECT_NAME}-static) + target_link_libraries(conflict_set_cxx_api_test PRIVATE ${PROJECT_NAME}) set_target_properties(conflict_set_cxx_api_test PROPERTIES CXX_STANDARD 98) set_target_properties(conflict_set_cxx_api_test PROPERTIES CXX_STANDARD_REQUIRED ON) diff --git a/ServerBench.cpp b/ServerBench.cpp index 37bbf3c..b49a372 100644 --- a/ServerBench.cpp +++ b/ServerBench.cpp @@ -23,6 +23,97 @@ #include "Internal.h" #include "third_party/nadeau.h" +constexpr int kCacheLine = 64; // TODO mac m1 is 128 + +template struct TxQueue { + + explicit TxQueue(int lgSlotCount) + : slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1), + slots(new T[slotCount]) { + // Otherwise we can't tell the difference between full and empty. + assert(!(slotCountMask & 0x80000000)); + } + + /// Call from producer thread, after ensuring consumer is no longer accessing + /// it somehow + ~TxQueue() { delete[] slots; } + + /// Must be called from the producer thread + void push(T t) { + if (wouldBlock()) { + // Wait for pops to change and try again + consumer.pops.wait(producer.lastPopRead, std::memory_order_relaxed); + producer.lastPopRead = consumer.pops.load(std::memory_order_acquire); + } + slots[producer.pushesNonAtomic++ & slotCountMask] = std::move(t); + // seq_cst so that the notify can't be ordered before the store + producer.pushes.store(producer.pushesNonAtomic, std::memory_order_seq_cst); + // We have to notify every time, since we don't know if this is the last + // push ever + producer.pushes.notify_one(); + } + + /// Must be called from the producer thread + uint32_t outstanding() { + return producer.pushesNonAtomic - + consumer.pops.load(std::memory_order_relaxed); + } + + /// Returns true if a call to push might block. Must be called from the + /// producer thread. + bool wouldBlock() { + // See if we can determine that overflow won't happen entirely from state + // local to the producer + if (producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1) { + // Re-read pops with memory order + producer.lastPopRead = consumer.pops.load(std::memory_order_acquire); + return producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1; + } + return false; + } + + /// Valid until the next pop, or until this queue is destroyed. + T *pop() { + // See if we can determine that there's an entry we can pop entirely from + // state local to the consumer + if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { + // Re-read pushes with memory order and try again + consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire); + if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { + // Wait for pushes to change and try again + producer.pushes.wait(consumer.lastPushRead, std::memory_order_relaxed); + consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire); + } + } + auto result = &slots[consumer.popsNonAtomic++ & slotCountMask]; + // We only have to write pops with memory order if we've run out of items. + // We know that we'll eventually run out. + if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { + // seq_cst so that the notify can't be ordered before the store + consumer.pops.store(consumer.popsNonAtomic, std::memory_order_seq_cst); + consumer.pops.notify_one(); + } + return result; + } + +private: + const uint32_t slotCount; + const uint32_t slotCountMask; + T *slots; + struct alignas(kCacheLine) ProducerState { + std::atomic pushes{0}; + uint32_t pushesNonAtomic{0}; + uint32_t lastPopRead{0}; + }; + struct alignas(kCacheLine) ConsumerState { + std::atomic pops{0}; + uint32_t popsNonAtomic{0}; + uint32_t lastPushRead{0}; + }; + ProducerState producer; + ConsumerState consumer; +}; + std::atomic transactions; int64_t safeUnaryMinus(int64_t x) { @@ -69,15 +160,20 @@ constexpr int kWindowSize = 1'000'000; constexpr int kNumReadKeysPerTx = 10; constexpr int kNumWriteKeysPerTx = 5; -void workload(weaselab::ConflictSet *cs) { - int64_t version = kWindowSize; - for (;; transactions.fetch_add(1, std::memory_order_relaxed)) { +struct Transaction { + std::vector keys; + std::vector reads; + std::vector writes; + int64_t version; + int64_t oldestVersion; + Transaction() = default; + explicit Transaction(int64_t version) + : version(version), oldestVersion(version - kWindowSize) { std::vector keyIndices; for (int i = 0; i < std::max(kNumReadKeysPerTx, kNumWriteKeysPerTx); ++i) { keyIndices.push_back(rand() % kTotalKeyRange); } std::sort(keyIndices.begin(), keyIndices.end()); - std::vector keys; constexpr std::string_view fullString = "this is a string, where a prefix of it is used as an element of the " "tuple forming the key"; @@ -87,10 +183,6 @@ void workload(weaselab::ConflictSet *cs) { fullString.substr(0, keyIndices[i] % fullString.size()))); // printf("%s\n", printable(keys.back()).c_str()); } - - std::vector reads; - std::vector writes; - std::vector results; for (int i = 0; i < kNumWriteKeysPerTx; ++i) { writes.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())}, {nullptr, 0}}); @@ -104,13 +196,30 @@ void workload(weaselab::ConflictSet *cs) { {nullptr, 0}, version - kWindowSize}); } - results.resize(reads.size()); - cs->check(reads.data(), results.data(), reads.size()); - cs->addWrites(writes.data(), writes.size(), version); - cs->setOldestVersion(version - kWindowSize); - ++version; } -} + + Transaction(Transaction &&) = default; + Transaction &operator=(Transaction &&) = default; + Transaction(Transaction const &) = delete; + Transaction const &operator=(Transaction const &) = delete; +}; + +struct Resolver { + + void resolve(const weaselab::ConflictSet::ReadRange *reads, int readCount, + const weaselab::ConflictSet::WriteRange *writes, int writeCount, + int64_t newVersion, int64_t newOldestVersion) { + results.resize(readCount); + cs.check(reads, results.data(), readCount); + cs.addWrites(writes, writeCount, newVersion); + cs.setOldestVersion(newOldestVersion); + } + + ConflictSet cs{0}; + +private: + std::vector results; +}; // Adapted from getaddrinfo man page int getListenFd(const char *node, const char *service) { @@ -253,7 +362,8 @@ int main(int argc, char **argv) { { int listenFd = getListenFd(argv[1], argv[2]); - weaselab::ConflictSet cs{0}; + Resolver resolver; + auto &cs = resolver.cs; weaselab::ConflictSet::MetricsV1 *metrics; int metricsCount; cs.getMetricsV1(&metrics, &metricsCount); @@ -302,7 +412,23 @@ int main(int argc, char **argv) { } #endif - auto w = std::thread{workload, &cs}; + TxQueue> queue{10}; + + auto workloadThread = std::thread{[&]() { + for (int64_t version = kWindowSize;; + ++version, transactions.fetch_add(1, std::memory_order_relaxed)) { + auto tx = std::make_unique(version); + queue.push(std::move(tx)); + } + }}; + + auto resolverThread = std::thread{[&]() { + for (;;) { + auto tx = queue.pop()->get(); + resolver.resolve(tx->reads.data(), tx->reads.size(), tx->writes.data(), + tx->writes.size(), tx->version, tx->oldestVersion); + } + }}; for (;;) { struct sockaddr_storage peer_addr = {};