Improve ThreadPipeline docs

This commit is contained in:
2025-08-26 13:10:58 -04:00
parent 6ddba37e60
commit eaeffff620

View File

@@ -38,21 +38,27 @@ enum class WaitStrategy {
// communication. // communication.
// //
// Overview: // Overview:
// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... -> // - Items flow from producers through multiple processing stages (stage 0 ->
// final stage) // stage 1 -> ... -> final stage)
// - Each stage can have multiple worker threads processing items in parallel // - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination // - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency // - Supports batch processing for efficiency
// //
// Architecture:
// - Producers: External threads that add items to the pipeline via push()
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
// acquire()
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
//
// Usage Pattern: // Usage Pattern:
// // Producer threads (add items to stage 0): // // Producer threads (external to pipeline stages - add items for stage 0 to
// auto guard = pipeline.push(batchSize, /*block=*/true); // consume): auto guard = pipeline.push(batchSize, /*block=*/true); for (auto&
// for (auto& item : guard.batch) { // item : guard.batch) {
// // Initialize item data // // Initialize item data
// } // }
// // Guard destructor publishes batch to consumers // // Guard destructor publishes batch to stage 0 consumers
// //
// // Consumer threads (process items from any stage): // // Stage worker threads (process items and pass to next stage):
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch, // auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
// /*mayBlock=*/true); for (auto& item : guard.batch) { // /*mayBlock=*/true); for (auto& item : guard.batch) {
// // Process item // // Process item
@@ -73,8 +79,11 @@ template <class T, WaitStrategy wait_strategy = WaitStrategy::WaitIfStageEmpty>
struct ThreadPipeline { struct ThreadPipeline {
// Constructor // Constructor
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1 // threadsPerStage: number of worker threads for each processing stage (e.g.,
// stage-0 worker, 4 stage-1 workers, 2 stage-2 workers) // {1, 4, 2} =
// 1 stage-0 worker, 4 stage-1 workers, 2 stage-2 workers)
// Note: Producer threads are external to the pipeline and not counted in
// threadsPerStage
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage) ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
threadState(threadsPerStage.size()), ring(slot_count) { threadState(threadsPerStage.size()), ring(slot_count) {