Claude's docstrings

This commit is contained in:
2025-08-18 12:47:24 -04:00
parent 085e3c3fce
commit 0920193e5c

View File

@@ -9,7 +9,48 @@
#include <utility>
#include <vector>
// Multi-stage lock-free pipeline for high-throughput inter-thread
// communication.
//
// Overview:
// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... ->
// final stage)
// - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency
//
// Usage Pattern:
// // Producer threads (add items to stage 0):
// auto guard = pipeline.push(batchSize, /*block=*/true);
// for (auto& item : guard.batch) {
// // Initialize item data
// }
// // Guard destructor publishes batch to consumers
//
// // Consumer threads (process items from any stage):
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
// /*mayBlock=*/true); for (auto& item : guard.batch) {
// // Process item
// }
// // Guard destructor marks items as consumed and available to next stage
//
// Memory Model:
// - Ring buffer size must be power of 2 for efficient masking
// - Uses 64-bit indices to avoid ABA problems (indices never repeat until
// uint32_t overflow)
// - Actual ring slots accessed via: index & (slotCount - 1)
// - 128-byte aligned atomics prevent false sharing between CPU cache lines
//
// Thread Safety:
// - Fully lock-free using atomic operations with acquire/release memory
// ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions
template <class T> struct ThreadPipeline {
// Constructor
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1
// producer, 4 stage-1 workers, 2 stage-2 workers)
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
: slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1),
threadState(threadsPerStage.size()), ring(slotCount) {
@@ -218,6 +259,12 @@ public:
uint32_t newSlot;
};
// Acquire a batch of items for processing by a consumer thread.
// stage: which processing stage (0 = first consumer stage after producers)
// thread: thread ID within the stage (0 to threadsPerStage[stage]-1)
// maxBatch: maximum items to acquire (0 = no limit)
// mayBlock: whether to block waiting for items (false = return empty batch if
// none available) Returns: StageGuard with batch of items to process
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
bool mayBlock = true) {
assert(stage < threadState.size());
@@ -226,10 +273,17 @@ public:
return StageGuard{std::move(batch), &threadState[stage][thread]};
}
// Grants exclusive access to a producer thread to a span of up to `size`. If
// `block` is true, then this call will block if the queue is full, until the
// queue is not full. Otherwise it will return an empty batch if the queue is
// full.
// Reserve slots in the ring buffer for a producer thread to fill with items.
// This is used by producer threads to add new items to stage 0 of the
// pipeline.
//
// size: number of slots to reserve (must be > 0 and <= ring buffer capacity)
// block: if true, blocks when ring buffer is full; if false, returns empty
// guard Returns: ProducerGuard with exclusive access to reserved slots
//
// Usage: Fill items in the returned batch, then let guard destructor publish
// them. The guard destructor ensures items are published in the correct
// order.
//
// Preconditions:
// - size > 0 (must request at least one slot)