Add documentation to StaticThreadPipeline

This commit is contained in:
2025-08-26 16:11:02 -04:00
parent 6e48a0ff9a
commit a734760b60

View File

@@ -14,8 +14,25 @@
// Wait strategies for controlling thread blocking behavior when no work is // Wait strategies for controlling thread blocking behavior when no work is
// available // available
enum class WaitStrategy { enum class WaitStrategy {
// Never block - threads busy-wait (spin) when no work available.
// Stage threads will always use 100% CPU even when idle.
// Requires dedicated CPU cores to avoid scheduler thrashing.
// Use when: latency is critical and you have spare cores.
Never, Never,
// Block only when all upstream stages are idle (no new work entering
// pipeline).
// Downstream threads busy-wait if upstream has work but not for their stage.
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
// Requires dedicated cores to avoid priority inversion when pipeline has
// work.
// Use when: high throughput with spare cores and sustained workloads.
WaitIfUpstreamIdle, WaitIfUpstreamIdle,
// Block when individual stages are empty (original behavior).
// Each stage waits independently on its input sources.
// Safe for shared CPU environments, works well with variable workloads.
// Use when: general purpose, shared cores, or unpredictable workloads.
WaitIfStageEmpty, WaitIfStageEmpty,
}; };
@@ -27,7 +44,19 @@ struct ThreadState {
bool last_stage; bool last_stage;
}; };
// Compile-time topology configuration // Compile-time topology configuration for static pipelines
//
// This template defines a pipeline topology at compile-time:
// - Stage and thread calculations done at compile-time
// - Type-safe indexing: Stage and thread indices validated at compile-time
// - Fixed-size arrays with known bounds
// - Code specialization for each topology
//
// Example: StaticPipelineTopology<1, 4, 2> creates:
// - Stage 0: 1 thread (index 0)
// - Stage 1: 4 threads (indices 1-4)
// - Stage 2: 2 threads (indices 5-6)
// - Total: 7 threads across 3 stages
template <int... ThreadsPerStage> struct StaticPipelineTopology { template <int... ThreadsPerStage> struct StaticPipelineTopology {
static_assert(sizeof...(ThreadsPerStage) > 0, static_assert(sizeof...(ThreadsPerStage) > 0,
"Must specify at least one stage"); "Must specify at least one stage");
@@ -189,14 +218,72 @@ int check_producer_capacity(
} }
} // namespace StaticPipelineAlgorithms } // namespace StaticPipelineAlgorithms
// Static multi-stage lock-free pipeline // Static multi-stage lock-free pipeline for inter-thread communication
// with compile-time topology specification.
//
// Overview:
// - Items flow from producers through multiple processing stages (stage 0 ->
// stage 1 -> ... -> final stage)
// - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency
// - Compile-time topology specification via template parameters
//
// Architecture:
// - Producers: External threads that add items to the pipeline via push()
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
// acquire<Stage, Thread>()
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
//
// Differences from Dynamic Version:
// - Template parameters specify topology at compile-time (e.g., <Item,
// WaitStrategy::Never, 1, 4, 2>)
// - Stage and thread indices are template parameters, validated at compile-time
// - Fixed-size arrays replace dynamic vectors
// - Specialized algorithms for each stage/thread combination
// - Type-safe guards prevent runtime indexing errors
//
// Usage Pattern:
// using Pipeline = StaticThreadPipeline<Item, WaitStrategy::WaitIfStageEmpty,
// 1, 4, 2>; Pipeline pipeline(lgSlotCount);
//
// // Producer threads (add items for stage 0 to consume):
// auto guard = pipeline.push(batchSize, /*block=*/true);
// for (auto& item : guard.batch) {
// // Initialize item data
// }
// // Guard destructor publishes batch to stage 0 consumers
//
// // Stage worker threads (process items and pass to next stage):
// auto guard = pipeline.acquire<Stage, Thread>(maxBatch, /*may_block=*/true);
// for (auto& item : guard.batch) {
// // Process item
// }
// // Guard destructor marks items as consumed and available to next stage
//
// Memory Model:
// - Ring buffer size must be power of 2 for efficient masking
// - Actual ring slots accessed via: index & (slotCount - 1)
// - 128-byte aligned atomics prevent false sharing between CPU cache lines
//
// Thread Safety:
// - Fully lock-free using atomic operations with acquire/release memory
// ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions
template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage> template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage>
struct StaticThreadPipeline { struct StaticThreadPipeline {
using Topology = StaticPipelineTopology<ThreadsPerStage...>; using Topology = StaticPipelineTopology<ThreadsPerStage...>;
// Constructor
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// Template parameters specify pipeline topology (e.g., <Item, Never, 1, 4,
// 2>) Note: Producer threads are external to the pipeline and not counted in
// ThreadsPerStage
explicit StaticThreadPipeline(int lgSlotCount) explicit StaticThreadPipeline(int lgSlotCount)
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
ring(slot_count) { ring(slot_count) {
// Otherwise we can't tell the difference between full and empty.
assert(!(slot_count_mask & 0x80000000)); assert(!(slot_count_mask & 0x80000000));
initialize_all_threads(); initialize_all_threads();
} }
@@ -430,7 +517,13 @@ public:
uint32_t new_slot; uint32_t new_slot;
}; };
// Static acquire - Stage and Thread are compile-time parameters // Acquire a batch of items for processing by a consumer thread.
// Stage: which processing stage (0 = first consumer stage after producers) -
// compile-time parameter Thread: thread ID within the stage (0 to
// ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items
// to acquire (0 = no limit) may_block: whether to block waiting for items
// (false = return empty batch if none available) Returns: StageGuard<Stage,
// Thread> with batch of items to process and compile-time type safety
template <int Stage, int Thread> template <int Stage, int Thread>
[[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0, [[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0,
bool may_block = true) { bool may_block = true) {
@@ -447,6 +540,22 @@ public:
return StageGuard<Stage, Thread>{std::move(batch), local_pops, this}; return StageGuard<Stage, Thread>{std::move(batch), local_pops, this};
} }
// Reserve slots in the ring buffer for a producer thread to fill with items.
// This is used by producer threads to add new items to stage 0 of the
// pipeline.
//
// size: number of slots to reserve (must be > 0 and <= ring buffer capacity)
// block: if true, blocks when ring buffer is full; if false, returns empty
// guard Returns: ProducerGuard with exclusive access to reserved slots
//
// Usage: Fill items in the returned batch, then let guard destructor publish
// them. The guard destructor ensures items are published in the correct
// order.
//
// Preconditions:
// - size > 0 (must request at least one slot)
// - size <= slotCount (cannot request more slots than ring buffer capacity)
// Violating preconditions results in program termination via abort().
[[nodiscard]] ProducerGuard push(uint32_t const size, bool block) { [[nodiscard]] ProducerGuard push(uint32_t const size, bool block) {
if (size == 0) { if (size == 0) {
std::abort(); std::abort();