From eaeffff620d7826270636664e77ab5ed8d4c0666 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 26 Aug 2025 13:10:58 -0400 Subject: [PATCH] Improve ThreadPipeline docs --- src/thread_pipeline.hpp | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index 53fe2a3..ac13c1b 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -38,21 +38,27 @@ enum class WaitStrategy { // communication. // // Overview: -// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... -> -// final stage) +// - Items flow from producers through multiple processing stages (stage 0 -> +// stage 1 -> ... -> final stage) // - Each stage can have multiple worker threads processing items in parallel // - Uses a shared ring buffer with atomic counters for lock-free coordination // - Supports batch processing for efficiency // +// Architecture: +// - Producers: External threads that add items to the pipeline via push() +// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via +// acquire() +// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage +// // Usage Pattern: -// // Producer threads (add items to stage 0): -// auto guard = pipeline.push(batchSize, /*block=*/true); -// for (auto& item : guard.batch) { +// // Producer threads (external to pipeline stages - add items for stage 0 to +// consume): auto guard = pipeline.push(batchSize, /*block=*/true); for (auto& +// item : guard.batch) { // // Initialize item data // } -// // Guard destructor publishes batch to consumers +// // Guard destructor publishes batch to stage 0 consumers // -// // Consumer threads (process items from any stage): +// // Stage worker threads (process items and pass to next stage): // auto guard = pipeline.acquire(stageNum, threadId, maxBatch, // /*mayBlock=*/true); for (auto& item : guard.batch) { // // Process item @@ -73,8 +79,11 @@ template struct ThreadPipeline { // Constructor // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) - // threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1 - // stage-0 worker, 4 stage-1 workers, 2 stage-2 workers) + // threadsPerStage: number of worker threads for each processing stage (e.g., + // {1, 4, 2} = + // 1 stage-0 worker, 4 stage-1 workers, 2 stage-2 workers) + // Note: Producer threads are external to the pipeline and not counted in + // threadsPerStage ThreadPipeline(int lgSlotCount, const std::vector &threadsPerStage) : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), threadState(threadsPerStage.size()), ring(slot_count) {