diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index a68fab5..5434689 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -14,8 +14,25 @@ // Wait strategies for controlling thread blocking behavior when no work is // available enum class WaitStrategy { + // Never block - threads busy-wait (spin) when no work available. + // Stage threads will always use 100% CPU even when idle. + // Requires dedicated CPU cores to avoid scheduler thrashing. + // Use when: latency is critical and you have spare cores. Never, + + // Block only when all upstream stages are idle (no new work entering + // pipeline). + // Downstream threads busy-wait if upstream has work but not for their stage. + // Eliminates futex notifications between stages, reduces to 0% CPU when idle. + // Requires dedicated cores to avoid priority inversion when pipeline has + // work. + // Use when: high throughput with spare cores and sustained workloads. WaitIfUpstreamIdle, + + // Block when individual stages are empty (original behavior). + // Each stage waits independently on its input sources. + // Safe for shared CPU environments, works well with variable workloads. + // Use when: general purpose, shared cores, or unpredictable workloads. WaitIfStageEmpty, }; @@ -27,7 +44,19 @@ struct ThreadState { bool last_stage; }; -// Compile-time topology configuration +// Compile-time topology configuration for static pipelines +// +// This template defines a pipeline topology at compile-time: +// - Stage and thread calculations done at compile-time +// - Type-safe indexing: Stage and thread indices validated at compile-time +// - Fixed-size arrays with known bounds +// - Code specialization for each topology +// +// Example: StaticPipelineTopology<1, 4, 2> creates: +// - Stage 0: 1 thread (index 0) +// - Stage 1: 4 threads (indices 1-4) +// - Stage 2: 2 threads (indices 5-6) +// - Total: 7 threads across 3 stages template struct StaticPipelineTopology { static_assert(sizeof...(ThreadsPerStage) > 0, "Must specify at least one stage"); @@ -189,14 +218,72 @@ int check_producer_capacity( } } // namespace StaticPipelineAlgorithms -// Static multi-stage lock-free pipeline +// Static multi-stage lock-free pipeline for inter-thread communication +// with compile-time topology specification. +// +// Overview: +// - Items flow from producers through multiple processing stages (stage 0 -> +// stage 1 -> ... -> final stage) +// - Each stage can have multiple worker threads processing items in parallel +// - Uses a shared ring buffer with atomic counters for lock-free coordination +// - Supports batch processing for efficiency +// - Compile-time topology specification via template parameters +// +// Architecture: +// - Producers: External threads that add items to the pipeline via push() +// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via +// acquire() +// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage +// +// Differences from Dynamic Version: +// - Template parameters specify topology at compile-time (e.g., ) +// - Stage and thread indices are template parameters, validated at compile-time +// - Fixed-size arrays replace dynamic vectors +// - Specialized algorithms for each stage/thread combination +// - Type-safe guards prevent runtime indexing errors +// +// Usage Pattern: +// using Pipeline = StaticThreadPipeline; Pipeline pipeline(lgSlotCount); +// +// // Producer threads (add items for stage 0 to consume): +// auto guard = pipeline.push(batchSize, /*block=*/true); +// for (auto& item : guard.batch) { +// // Initialize item data +// } +// // Guard destructor publishes batch to stage 0 consumers +// +// // Stage worker threads (process items and pass to next stage): +// auto guard = pipeline.acquire(maxBatch, /*may_block=*/true); +// for (auto& item : guard.batch) { +// // Process item +// } +// // Guard destructor marks items as consumed and available to next stage +// +// Memory Model: +// - Ring buffer size must be power of 2 for efficient masking +// - Actual ring slots accessed via: index & (slotCount - 1) +// - 128-byte aligned atomics prevent false sharing between CPU cache lines +// +// Thread Safety: +// - Fully lock-free using atomic operations with acquire/release memory +// ordering +// - Uses C++20 atomic wait/notify for efficient blocking when no work available +// - RAII guards ensure proper cleanup even with exceptions template struct StaticThreadPipeline { using Topology = StaticPipelineTopology; + // Constructor + // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) + // Template parameters specify pipeline topology (e.g., ) Note: Producer threads are external to the pipeline and not counted in + // ThreadsPerStage explicit StaticThreadPipeline(int lgSlotCount) : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), ring(slot_count) { + // Otherwise we can't tell the difference between full and empty. assert(!(slot_count_mask & 0x80000000)); initialize_all_threads(); } @@ -430,7 +517,13 @@ public: uint32_t new_slot; }; - // Static acquire - Stage and Thread are compile-time parameters + // Acquire a batch of items for processing by a consumer thread. + // Stage: which processing stage (0 = first consumer stage after producers) - + // compile-time parameter Thread: thread ID within the stage (0 to + // ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items + // to acquire (0 = no limit) may_block: whether to block waiting for items + // (false = return empty batch if none available) Returns: StageGuard with batch of items to process and compile-time type safety template [[nodiscard]] StageGuard acquire(int maxBatch = 0, bool may_block = true) { @@ -447,6 +540,22 @@ public: return StageGuard{std::move(batch), local_pops, this}; } + // Reserve slots in the ring buffer for a producer thread to fill with items. + // This is used by producer threads to add new items to stage 0 of the + // pipeline. + // + // size: number of slots to reserve (must be > 0 and <= ring buffer capacity) + // block: if true, blocks when ring buffer is full; if false, returns empty + // guard Returns: ProducerGuard with exclusive access to reserved slots + // + // Usage: Fill items in the returned batch, then let guard destructor publish + // them. The guard destructor ensures items are published in the correct + // order. + // + // Preconditions: + // - size > 0 (must request at least one slot) + // - size <= slotCount (cannot request more slots than ring buffer capacity) + // Violating preconditions results in program termination via abort(). [[nodiscard]] ProducerGuard push(uint32_t const size, bool block) { if (size == 0) { std::abort();