From 6ddba37e6070f95bf788e8085bb98eb849979ca0 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 26 Aug 2025 12:10:26 -0400 Subject: [PATCH] Add different wait strategies to pipeline --- src/thread_pipeline.hpp | 68 ++++++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 11 deletions(-) diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index c492f62..53fe2a3 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -9,6 +9,31 @@ #include #include +// Wait strategies for controlling thread blocking behavior when no work is +// available +enum class WaitStrategy { + // Never block - threads busy-wait (spin) when no work available. + // Stage threads will always use 100% CPU even when idle. + // Requires dedicated CPU cores to avoid scheduler thrashing. + // Use when: latency is critical and you have spare cores. + Never, + + // Block only when all upstream stages are idle (no new work entering + // pipeline). + // Downstream threads busy-wait if upstream has work but not for their stage. + // Eliminates futex notifications between stages, reduces to 0% CPU when idle. + // Requires dedicated cores to avoid priority inversion when pipeline has + // work. + // Use when: high throughput with spare cores and sustained workloads. + WaitIfUpstreamIdle, + + // Block when individual stages are empty (original behavior). + // Each stage waits independently on its input sources. + // Safe for shared CPU environments, works well with variable workloads. + // Use when: general purpose, shared cores, or unpredictable workloads. + WaitIfStageEmpty, +}; + // Multi-stage lock-free pipeline for high-throughput inter-thread // communication. // @@ -44,7 +69,8 @@ // ordering // - Uses C++20 atomic wait/notify for efficient blocking when no work available // - RAII guards ensure proper cleanup even with exceptions -template struct ThreadPipeline { +template +struct ThreadPipeline { // Constructor // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) // threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1 @@ -57,6 +83,7 @@ template struct ThreadPipeline { for (size_t i = 0; i < threadsPerStage.size(); ++i) { threadState[i] = std::vector(threadsPerStage[i]); for (auto &t : threadState[i]) { + t.last_stage = i == threadsPerStage.size() - 1; if (i == 0) { t.last_push_read = std::vector(1); } else { @@ -177,7 +204,7 @@ template struct ThreadPipeline { [[nodiscard]] bool empty() const { return end_ == begin_; } private: - friend struct ThreadPipeline; + friend struct ThreadPipeline; Batch(std::vector *const ring, uint32_t begin_, uint32_t end_) : ring(ring), begin_(begin_), end_(end_) {} std::vector *const ring; @@ -223,8 +250,21 @@ private: if (!mayBlock) { return 0; } - // Wait for lastPush to change and try again - lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed); + + if constexpr (wait_strategy == WaitStrategy::Never) { + // Empty + } else if constexpr (wait_strategy == + WaitStrategy::WaitIfUpstreamIdle) { + auto push = pushes.load(std::memory_order_relaxed); + if (push == thread.local_pops) { + pushes.wait(push, std::memory_order_relaxed); + } + } else { + static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); + // Wait for lastPush to change and try again + lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed); + } + thread.last_push_read[i] = lastPush.load(std::memory_order_acquire); } } @@ -236,10 +276,12 @@ private: struct ThreadState { // Where this thread has published up to alignas(128) std::atomic pops{0}; - // Where this thread will publish to the next time it publishes + // Where this thread will publish to the next time it publishes, or if idle + // where it has published to uint32_t local_pops{0}; // Where the previous stage's threads have published up to last we checked std::vector last_push_read; + bool last_stage; }; // threadState[i][j] is the state for thread j in stage i std::vector> threadState; @@ -251,9 +293,13 @@ public: Batch batch; ~StageGuard() { if (ts != nullptr) { - // seq_cst so that the notify can't be ordered before the store - ts->pops.store(local_pops, std::memory_order_seq_cst); - ts->pops.notify_all(); + if (wait_strategy == WaitStrategy::WaitIfStageEmpty || ts->last_stage) { + // seq_cst so that the notify can't be ordered before the store + ts->pops.store(local_pops, std::memory_order_seq_cst); + ts->pops.notify_all(); + } else { + ts->pops.store(local_pops, std::memory_order_release); + } } } @@ -304,10 +350,10 @@ public: private: friend struct ThreadPipeline; ProducerGuard() : batch(), tp() {} - ProducerGuard(Batch batch, ThreadPipeline *tp, uint32_t old_slot, - uint32_t new_slot) + ProducerGuard(Batch batch, ThreadPipeline *tp, + uint32_t old_slot, uint32_t new_slot) : batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {} - ThreadPipeline *const tp; + ThreadPipeline *const tp; uint32_t old_slot; uint32_t new_slot; };