diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index 4460ea2..79b4f0a 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -6,10 +6,49 @@ #include #include #include +#include #include +#include #include #include +// Topology configuration - separates stage layout from pipeline logic +struct PipelineTopology { + std::vector threads_per_stage; + int num_stages; + int total_threads; + + PipelineTopology(const std::vector &threads) + : threads_per_stage(threads), num_stages(threads.size()) { + assert(!threads.empty()); + assert(std::all_of(threads.begin(), threads.end(), + [](int t) { return t > 0; })); + total_threads = std::accumulate(threads.begin(), threads.end(), 0); + } + + // Get the flat array offset for the first thread of a stage + int stage_offset(int stage) const { + assert(stage >= 0 && stage < num_stages); + int offset = 0; + for (int i = 0; i < stage; ++i) { + offset += threads_per_stage[i]; + } + return offset; + } + + // Get flat array index for a specific thread in a stage + int thread_index(int stage, int thread) const { + assert(stage >= 0 && stage < num_stages); + assert(thread >= 0 && thread < threads_per_stage[stage]); + return stage_offset(stage) + thread; + } + + // Get number of threads in the previous stage (for initialization) + int prev_stage_thread_count(int stage) const { + return (stage == 0) ? 1 : threads_per_stage[stage - 1]; + } +}; + // Wait strategies for controlling thread blocking behavior when no work is // available enum class WaitStrategy { @@ -35,6 +74,113 @@ enum class WaitStrategy { WaitIfStageEmpty, }; +// Core thread state - extracted from pipeline concerns +struct ThreadState { + // Where this thread has published up to + alignas(128) std::atomic pops{0}; + // Where this thread will publish to the next time it publishes, or if idle + // where it has published to + uint32_t local_pops{0}; + // Where the previous stage's threads have published up to last we checked + std::vector last_push_read; + bool last_stage; +}; + +// Core pipeline algorithms - independent of storage layout +namespace PipelineAlgorithms { + +// Calculate how many items can be safely acquired from a stage +template +uint32_t calculate_safe_len(const PipelineTopology &topology, + std::vector &all_threads, + std::atomic &pushes, int stage, + int thread_in_stage, bool may_block) { + int thread_idx = topology.thread_index(stage, thread_in_stage); + auto &thread = all_threads[thread_idx]; + uint32_t safe_len = UINT32_MAX; + + // Check all threads from the previous stage (or pushes for stage 0) + int prev_stage_threads = topology.prev_stage_thread_count(stage); + for (int i = 0; i < prev_stage_threads; ++i) { + auto &last_push = + (stage == 0) ? pushes + : all_threads[topology.thread_index(stage - 1, i)].pops; + + if (thread.last_push_read[i] == thread.local_pops) { + // Re-read with memory order and try again + thread.last_push_read[i] = last_push.load(std::memory_order_acquire); + if (thread.last_push_read[i] == thread.local_pops) { + if (!may_block) { + return 0; + } + + if constexpr (wait_strategy == WaitStrategy::Never) { + // Empty - busy wait + } else if constexpr (wait_strategy == + WaitStrategy::WaitIfUpstreamIdle) { + auto push = pushes.load(std::memory_order_relaxed); + if (push == thread.local_pops) { + pushes.wait(push, std::memory_order_relaxed); + } + } else { + static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); + last_push.wait(thread.last_push_read[i], std::memory_order_relaxed); + } + + thread.last_push_read[i] = last_push.load(std::memory_order_acquire); + } + } + safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops); + } + return safe_len; +} + +// Update thread pops counter after processing +template +void update_thread_pops(const PipelineTopology &topology, + std::vector &all_threads, int stage, + int thread_in_stage, uint32_t local_pops) { + int thread_idx = topology.thread_index(stage, thread_in_stage); + auto &thread_state = all_threads[thread_idx]; + + if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) { + thread_state.pops.store(local_pops, std::memory_order_seq_cst); + thread_state.pops.notify_all(); + } else if (thread_state.last_stage) { + thread_state.pops.store(local_pops, std::memory_order_seq_cst); + thread_state.pops.notify_all(); + } else { + thread_state.pops.store(local_pops, std::memory_order_release); + } +} + +// Check if producer can proceed without stomping the ring buffer +// Returns: 0 = can proceed, 1 = should retry, 2 = cannot proceed (return empty +// guard) +inline int check_producer_capacity(const PipelineTopology &topology, + std::vector &all_threads, + uint32_t slot, uint32_t size, + uint32_t slot_count, bool block) { + // Check against last stage threads + int last_stage = topology.num_stages - 1; + int last_stage_offset = topology.stage_offset(last_stage); + int last_stage_thread_count = topology.threads_per_stage[last_stage]; + + for (int i = 0; i < last_stage_thread_count; ++i) { + auto &thread = all_threads[last_stage_offset + i]; + uint32_t pops = thread.pops.load(std::memory_order_acquire); + if (slot + size - pops > slot_count) { + if (!block) { + return 2; // Cannot proceed, caller should return empty guard + } + thread.pops.wait(pops, std::memory_order_relaxed); + return 1; // Should retry + } + } + return 0; // Can proceed +} +} // namespace PipelineAlgorithms + // Multi-stage lock-free pipeline for high-throughput inter-thread // communication. // @@ -76,40 +222,36 @@ enum class WaitStrategy { // ordering // - Uses C++20 atomic wait/notify for efficient blocking when no work available // - RAII guards ensure proper cleanup even with exceptions +// +// Refactored Design: +// - Topology configuration separated from pipeline logic +// - Flattened thread storage replaces nested vectors +// - Core algorithms extracted into pure functions +// - Ready for compile-time static version implementation template struct ThreadPipeline { - // Constructor - // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) - // threadsPerStage: number of worker threads for each processing stage (e.g., - // {1, 4, 2} = - // 1 stage-0 worker, 4 stage-1 workers, 2 stage-2 workers) - // Note: Producer threads are external to the pipeline and not counted in - // threadsPerStage - ThreadPipeline(int lgSlotCount, const std::vector &threadsPerStage) - : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), - threadState(threadsPerStage.size()), ring(slot_count) { - // Otherwise we can't tell the difference between full and empty. + + // Constructor now takes topology configuration + ThreadPipeline(int lgSlotCount, const PipelineTopology &topo) + : topology(topo), slot_count(1 << lgSlotCount), + slot_count_mask(slot_count - 1), all_threads(topology.total_threads), + ring(slot_count) { + // Otherwise we can't tell the difference between full and empty assert(!(slot_count_mask & 0x80000000)); - for (size_t i = 0; i < threadsPerStage.size(); ++i) { - threadState[i] = std::vector(threadsPerStage[i]); - for (auto &t : threadState[i]) { - t.last_stage = i == threadsPerStage.size() - 1; - if (i == 0) { - t.last_push_read = std::vector(1); - } else { - t.last_push_read = std::vector(threadsPerStage[i - 1]); - } - } - } + + initialize_all_threads(); } + // Legacy constructor for backward compatibility + ThreadPipeline(int lgSlotCount, const std::vector &threadsPerStage) + : ThreadPipeline(lgSlotCount, PipelineTopology(threadsPerStage)) {} + ThreadPipeline(ThreadPipeline const &) = delete; ThreadPipeline &operator=(ThreadPipeline const &) = delete; ThreadPipeline(ThreadPipeline &&) = delete; ThreadPipeline &operator=(ThreadPipeline &&) = delete; struct Batch { - Batch() : ring(), begin_(), end_() {} struct Iterator { @@ -223,115 +365,84 @@ struct ThreadPipeline { }; private: - Batch acquireHelper(int stage, int thread, uint32_t maxBatch, bool mayBlock) { - uint32_t begin = threadState[stage][thread].local_pops & slot_count_mask; - uint32_t len = getSafeLen(stage, thread, mayBlock); + // Pipeline configuration + PipelineTopology topology; + + // Core state + alignas(128) std::atomic slots{0}; + alignas(128) std::atomic pushes{0}; + const uint32_t slot_count; + const uint32_t slot_count_mask; + + // Flattened thread storage - single array instead of nested vectors + std::vector all_threads; + + // Ring buffer + std::vector ring; + + void initialize_all_threads() { + for (int stage = 0; stage < topology.num_stages; ++stage) { + int stage_offset = topology.stage_offset(stage); + int stage_thread_count = topology.threads_per_stage[stage]; + int prev_stage_threads = topology.prev_stage_thread_count(stage); + bool is_last_stage = (stage == topology.num_stages - 1); + + for (int thread = 0; thread < stage_thread_count; ++thread) { + auto &thread_state = all_threads[stage_offset + thread]; + thread_state.last_stage = is_last_stage; + thread_state.last_push_read = std::vector(prev_stage_threads); + } + } + } + + Batch acquire_helper(int stage, int thread, uint32_t maxBatch, + bool may_block) { + int thread_idx = topology.thread_index(stage, thread); + auto &thread_state = all_threads[thread_idx]; + + uint32_t begin = thread_state.local_pops & slot_count_mask; + uint32_t len = PipelineAlgorithms::calculate_safe_len( + topology, all_threads, pushes, stage, thread, may_block); + if (maxBatch != 0) { len = std::min(len, maxBatch); } if (len == 0) { return Batch{}; } + auto result = Batch{&ring, begin, begin + len}; - threadState[stage][thread].local_pops += len; + thread_state.local_pops += len; return result; } - // Used by producer threads to reserve slots in the ring buffer - alignas(128) std::atomic slots{0}; - // Used for producers to publish - alignas(128) std::atomic pushes{0}; - - const uint32_t slot_count; - const uint32_t slot_count_mask; - - // We can safely acquire this many items - uint32_t getSafeLen(int stage, int threadIndex, bool mayBlock) { - uint32_t safeLen = UINT32_MAX; - auto &thread = threadState[stage][threadIndex]; - // See if we can determine that there are entries we can acquire entirely - // from state local to the thread - for (int i = 0; i < int(thread.last_push_read.size()); ++i) { - auto &lastPush = stage == 0 ? pushes : threadState[stage - 1][i].pops; - if (thread.last_push_read[i] == thread.local_pops) { - // Re-read lastPush with memory order and try again - thread.last_push_read[i] = lastPush.load(std::memory_order_acquire); - if (thread.last_push_read[i] == thread.local_pops) { - if (!mayBlock) { - return 0; - } - - if constexpr (wait_strategy == WaitStrategy::Never) { - // Empty - } else if constexpr (wait_strategy == - WaitStrategy::WaitIfUpstreamIdle) { - auto push = pushes.load(std::memory_order_relaxed); - if (push == thread.local_pops) { - pushes.wait(push, std::memory_order_relaxed); - } - } else { - static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); - // Wait for lastPush to change and try again - lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed); - } - - thread.last_push_read[i] = lastPush.load(std::memory_order_acquire); - } - } - safeLen = std::min(safeLen, thread.last_push_read[i] - thread.local_pops); - } - return safeLen; - } - - struct ThreadState { - // Where this thread has published up to - alignas(128) std::atomic pops{0}; - // Where this thread will publish to the next time it publishes, or if idle - // where it has published to - uint32_t local_pops{0}; - // Where the previous stage's threads have published up to last we checked - std::vector last_push_read; - bool last_stage; - }; - // threadState[i][j] is the state for thread j in stage i - std::vector> threadState; - // Shared ring buffer - std::vector ring; - public: struct StageGuard { Batch batch; + ~StageGuard() { - if (ts != nullptr) { - if (wait_strategy == WaitStrategy::WaitIfStageEmpty || ts->last_stage) { - // seq_cst so that the notify can't be ordered before the store - ts->pops.store(local_pops, std::memory_order_seq_cst); - ts->pops.notify_all(); - } else { - ts->pops.store(local_pops, std::memory_order_release); - } + if (cleanup_func) { + cleanup_func(); } } StageGuard(StageGuard const &) = delete; StageGuard &operator=(StageGuard const &) = delete; - StageGuard(StageGuard &&other) - : batch(other.batch), local_pops(other.local_pops), - ts(std::exchange(other.ts, nullptr)) {} - StageGuard &operator=(StageGuard &&other) { + StageGuard(StageGuard &&other) noexcept + : batch(other.batch), + cleanup_func(std::exchange(other.cleanup_func, nullptr)) {} + StageGuard &operator=(StageGuard &&other) noexcept { batch = other.batch; - local_pops = other.local_pops; - ts = std::exchange(other.ts, nullptr); + cleanup_func = std::exchange(other.cleanup_func, nullptr); return *this; } private: - uint32_t local_pops; friend struct ThreadPipeline; - StageGuard(Batch batch, ThreadState *ts) - : batch(batch), local_pops(ts->local_pops), - ts(batch.empty() ? nullptr : ts) {} - ThreadState *ts; + std::function cleanup_func; + + StageGuard(Batch batch, std::function cleanup) + : batch(batch), cleanup_func(batch.empty() ? nullptr : cleanup) {} }; struct ProducerGuard { @@ -376,10 +487,21 @@ public: // none available) Returns: StageGuard with batch of items to process [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0, bool mayBlock = true) { - assert(stage < int(threadState.size())); - assert(thread < int(threadState[stage].size())); - auto batch = acquireHelper(stage, thread, maxBatch, mayBlock); - return StageGuard{std::move(batch), &threadState[stage][thread]}; + assert(stage >= 0 && stage < topology.num_stages); + assert(thread >= 0 && thread < topology.threads_per_stage[stage]); + + auto batch = acquire_helper(stage, thread, maxBatch, mayBlock); + + // Create cleanup function that will update thread state on destruction + int thread_idx = topology.thread_index(stage, thread); + uint32_t local_pops = all_threads[thread_idx].local_pops; + + auto cleanup = [this, stage, thread, local_pops]() { + PipelineAlgorithms::update_thread_pops( + topology, all_threads, stage, thread, local_pops); + }; + + return StageGuard{std::move(batch), cleanup}; } // Reserve slots in the ring buffer for a producer thread to fill with items. @@ -405,24 +527,24 @@ public: if (size > slot_count) { std::abort(); } - // Reserve a slot to construct an item, but don't publish to consumer yet + uint32_t slot; uint32_t begin; for (;;) { - begin_loop: slot = slots.load(std::memory_order_relaxed); begin = slot & slot_count_mask; - // Make sure we won't stomp the back of the ring buffer - for (auto &thread : threadState.back()) { - uint32_t pops = thread.pops.load(std::memory_order_acquire); - if (slot + size - pops > slot_count) { - if (!block) { - return ProducerGuard{}; - } - thread.pops.wait(pops, std::memory_order_relaxed); - goto begin_loop; - } + + // Use algorithm function to check capacity + int capacity_result = PipelineAlgorithms::check_producer_capacity( + topology, all_threads, slot, size, slot_count, block); + if (capacity_result == 1) { + continue; // Retry } + if (capacity_result == 2) { + return ProducerGuard{}; // Cannot proceed, return empty guard + } + // capacity_result == 0, can proceed + if (slots.compare_exchange_weak(slot, slot + size, std::memory_order_relaxed, std::memory_order_relaxed)) {