From 0920193e5cd1e1d15796373954d2f2aea188e30e Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 18 Aug 2025 12:47:24 -0400 Subject: [PATCH] Claude's docstrings --- src/ThreadPipeline.h | 62 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 58 insertions(+), 4 deletions(-) diff --git a/src/ThreadPipeline.h b/src/ThreadPipeline.h index 422a6c9..e6c6874 100644 --- a/src/ThreadPipeline.h +++ b/src/ThreadPipeline.h @@ -9,7 +9,48 @@ #include #include +// Multi-stage lock-free pipeline for high-throughput inter-thread +// communication. +// +// Overview: +// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... -> +// final stage) +// - Each stage can have multiple worker threads processing items in parallel +// - Uses a shared ring buffer with atomic counters for lock-free coordination +// - Supports batch processing for efficiency +// +// Usage Pattern: +// // Producer threads (add items to stage 0): +// auto guard = pipeline.push(batchSize, /*block=*/true); +// for (auto& item : guard.batch) { +// // Initialize item data +// } +// // Guard destructor publishes batch to consumers +// +// // Consumer threads (process items from any stage): +// auto guard = pipeline.acquire(stageNum, threadId, maxBatch, +// /*mayBlock=*/true); for (auto& item : guard.batch) { +// // Process item +// } +// // Guard destructor marks items as consumed and available to next stage +// +// Memory Model: +// - Ring buffer size must be power of 2 for efficient masking +// - Uses 64-bit indices to avoid ABA problems (indices never repeat until +// uint32_t overflow) +// - Actual ring slots accessed via: index & (slotCount - 1) +// - 128-byte aligned atomics prevent false sharing between CPU cache lines +// +// Thread Safety: +// - Fully lock-free using atomic operations with acquire/release memory +// ordering +// - Uses C++20 atomic wait/notify for efficient blocking when no work available +// - RAII guards ensure proper cleanup even with exceptions template struct ThreadPipeline { + // Constructor + // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) + // threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1 + // producer, 4 stage-1 workers, 2 stage-2 workers) ThreadPipeline(int lgSlotCount, const std::vector &threadsPerStage) : slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1), threadState(threadsPerStage.size()), ring(slotCount) { @@ -218,6 +259,12 @@ public: uint32_t newSlot; }; + // Acquire a batch of items for processing by a consumer thread. + // stage: which processing stage (0 = first consumer stage after producers) + // thread: thread ID within the stage (0 to threadsPerStage[stage]-1) + // maxBatch: maximum items to acquire (0 = no limit) + // mayBlock: whether to block waiting for items (false = return empty batch if + // none available) Returns: StageGuard with batch of items to process [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0, bool mayBlock = true) { assert(stage < threadState.size()); @@ -226,10 +273,17 @@ public: return StageGuard{std::move(batch), &threadState[stage][thread]}; } - // Grants exclusive access to a producer thread to a span of up to `size`. If - // `block` is true, then this call will block if the queue is full, until the - // queue is not full. Otherwise it will return an empty batch if the queue is - // full. + // Reserve slots in the ring buffer for a producer thread to fill with items. + // This is used by producer threads to add new items to stage 0 of the + // pipeline. + // + // size: number of slots to reserve (must be > 0 and <= ring buffer capacity) + // block: if true, blocks when ring buffer is full; if false, returns empty + // guard Returns: ProducerGuard with exclusive access to reserved slots + // + // Usage: Fill items in the returned batch, then let guard destructor publish + // them. The guard destructor ensures items are published in the correct + // order. // // Preconditions: // - size > 0 (must request at least one slot)