Add different wait strategies to pipeline

This commit is contained in:
2025-08-26 12:10:26 -04:00
parent f8be6941bb
commit 6ddba37e60

View File

@@ -9,6 +9,31 @@
#include <utility> #include <utility>
#include <vector> #include <vector>
// Wait strategies for controlling thread blocking behavior when no work is
// available
enum class WaitStrategy {
// Never block - threads busy-wait (spin) when no work available.
// Stage threads will always use 100% CPU even when idle.
// Requires dedicated CPU cores to avoid scheduler thrashing.
// Use when: latency is critical and you have spare cores.
Never,
// Block only when all upstream stages are idle (no new work entering
// pipeline).
// Downstream threads busy-wait if upstream has work but not for their stage.
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
// Requires dedicated cores to avoid priority inversion when pipeline has
// work.
// Use when: high throughput with spare cores and sustained workloads.
WaitIfUpstreamIdle,
// Block when individual stages are empty (original behavior).
// Each stage waits independently on its input sources.
// Safe for shared CPU environments, works well with variable workloads.
// Use when: general purpose, shared cores, or unpredictable workloads.
WaitIfStageEmpty,
};
// Multi-stage lock-free pipeline for high-throughput inter-thread // Multi-stage lock-free pipeline for high-throughput inter-thread
// communication. // communication.
// //
@@ -44,7 +69,8 @@
// ordering // ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available // - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions // - RAII guards ensure proper cleanup even with exceptions
template <class T> struct ThreadPipeline { template <class T, WaitStrategy wait_strategy = WaitStrategy::WaitIfStageEmpty>
struct ThreadPipeline {
// Constructor // Constructor
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1 // threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1
@@ -57,6 +83,7 @@ template <class T> struct ThreadPipeline {
for (size_t i = 0; i < threadsPerStage.size(); ++i) { for (size_t i = 0; i < threadsPerStage.size(); ++i) {
threadState[i] = std::vector<ThreadState>(threadsPerStage[i]); threadState[i] = std::vector<ThreadState>(threadsPerStage[i]);
for (auto &t : threadState[i]) { for (auto &t : threadState[i]) {
t.last_stage = i == threadsPerStage.size() - 1;
if (i == 0) { if (i == 0) {
t.last_push_read = std::vector<uint32_t>(1); t.last_push_read = std::vector<uint32_t>(1);
} else { } else {
@@ -177,7 +204,7 @@ template <class T> struct ThreadPipeline {
[[nodiscard]] bool empty() const { return end_ == begin_; } [[nodiscard]] bool empty() const { return end_ == begin_; }
private: private:
friend struct ThreadPipeline<T>; friend struct ThreadPipeline;
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_) Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
: ring(ring), begin_(begin_), end_(end_) {} : ring(ring), begin_(begin_), end_(end_) {}
std::vector<T> *const ring; std::vector<T> *const ring;
@@ -223,8 +250,21 @@ private:
if (!mayBlock) { if (!mayBlock) {
return 0; return 0;
} }
// Wait for lastPush to change and try again
lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed); if constexpr (wait_strategy == WaitStrategy::Never) {
// Empty
} else if constexpr (wait_strategy ==
WaitStrategy::WaitIfUpstreamIdle) {
auto push = pushes.load(std::memory_order_relaxed);
if (push == thread.local_pops) {
pushes.wait(push, std::memory_order_relaxed);
}
} else {
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
// Wait for lastPush to change and try again
lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed);
}
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire); thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
} }
} }
@@ -236,10 +276,12 @@ private:
struct ThreadState { struct ThreadState {
// Where this thread has published up to // Where this thread has published up to
alignas(128) std::atomic<uint32_t> pops{0}; alignas(128) std::atomic<uint32_t> pops{0};
// Where this thread will publish to the next time it publishes // Where this thread will publish to the next time it publishes, or if idle
// where it has published to
uint32_t local_pops{0}; uint32_t local_pops{0};
// Where the previous stage's threads have published up to last we checked // Where the previous stage's threads have published up to last we checked
std::vector<uint32_t> last_push_read; std::vector<uint32_t> last_push_read;
bool last_stage;
}; };
// threadState[i][j] is the state for thread j in stage i // threadState[i][j] is the state for thread j in stage i
std::vector<std::vector<ThreadState>> threadState; std::vector<std::vector<ThreadState>> threadState;
@@ -251,9 +293,13 @@ public:
Batch batch; Batch batch;
~StageGuard() { ~StageGuard() {
if (ts != nullptr) { if (ts != nullptr) {
// seq_cst so that the notify can't be ordered before the store if (wait_strategy == WaitStrategy::WaitIfStageEmpty || ts->last_stage) {
ts->pops.store(local_pops, std::memory_order_seq_cst); // seq_cst so that the notify can't be ordered before the store
ts->pops.notify_all(); ts->pops.store(local_pops, std::memory_order_seq_cst);
ts->pops.notify_all();
} else {
ts->pops.store(local_pops, std::memory_order_release);
}
} }
} }
@@ -304,10 +350,10 @@ public:
private: private:
friend struct ThreadPipeline; friend struct ThreadPipeline;
ProducerGuard() : batch(), tp() {} ProducerGuard() : batch(), tp() {}
ProducerGuard(Batch batch, ThreadPipeline<T> *tp, uint32_t old_slot, ProducerGuard(Batch batch, ThreadPipeline<T, wait_strategy> *tp,
uint32_t new_slot) uint32_t old_slot, uint32_t new_slot)
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {} : batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
ThreadPipeline<T> *const tp; ThreadPipeline<T, wait_strategy> *const tp;
uint32_t old_slot; uint32_t old_slot;
uint32_t new_slot; uint32_t new_slot;
}; };