diff --git a/benchmarks/bench_thread_pipeline.cpp b/benchmarks/bench_thread_pipeline.cpp index cb99116..fcffae8 100644 --- a/benchmarks/bench_thread_pipeline.cpp +++ b/benchmarks/bench_thread_pipeline.cpp @@ -3,7 +3,6 @@ #include #include #include -#include int main() { { @@ -25,18 +24,15 @@ int main() { } }); - std::vector threads_per_stage = {1}; - ThreadPipeline pipeline(LOG_PIPELINE_SIZE, threads_per_stage); + StaticThreadPipeline + pipeline(LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { - const int stage = 0; - const int thread_id = 0; - for (;;) { - auto guard = pipeline.acquire(stage, thread_id); + auto guard = pipeline.acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { @@ -94,19 +90,15 @@ int main() { .warmup(100); for (int batch_size : {1, 4, 16, 64, 256}) { - std::vector threads_per_stage = {1}; - ThreadPipeline pipeline(LOG_PIPELINE_SIZE, - threads_per_stage); + StaticThreadPipeline + pipeline(LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { - const int stage = 0; - const int thread_id = 0; - for (;;) { - auto guard = pipeline.acquire(stage, thread_id); + auto guard = pipeline.acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { @@ -162,18 +154,15 @@ int main() { constexpr int BUSY_ITERS = 10; // Light work to emphasize coordination overhead - std::vector threads_per_stage = {1, 1}; // Two stages - ThreadPipeline pipeline(LOG_PIPELINE_SIZE, - threads_per_stage); + StaticThreadPipeline pipeline( + LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 worker std::thread stage0_thread([&pipeline, &done]() { - const int stage = 0; - const int thread_id = 0; for (;;) { - auto guard = pipeline.acquire(stage, thread_id); + auto guard = pipeline.template acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } @@ -185,10 +174,8 @@ int main() { // Stage 1 worker (final stage - always calls futex wake) std::thread stage1_thread([&pipeline, &done]() { - const int stage = 1; - const int thread_id = 0; for (;;) { - auto guard = pipeline.acquire(stage, thread_id); + auto guard = pipeline.template acquire<1, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 24a9b86..f1fd3c8 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -64,33 +64,47 @@ struct HttpConnectionState { */ struct HttpHandler : ConnectionHandler { HttpHandler() { - for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) { - finalStageThreads.emplace_back([this, threadId]() { - pthread_setname_np(pthread_self(), - ("stage-1-" + std::to_string(threadId)).c_str()); - for (;;) { - auto guard = pipeline.acquire(1, threadId); - for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { - if ((it.index() % kFinalStageThreads) == threadId) { - auto &c = *it; - if (!c) { - return; - } - auto *state = static_cast(c->user_data); - TRACE_EVENT("http", "pipeline thread", - perfetto::Flow::Global(state->request_id)); - Server::release_back_to_server(std::move(c)); + finalStageThreads.emplace_back([this]() { + pthread_setname_np(pthread_self(), "stage-1-0"); + for (;;) { + auto guard = pipeline.acquire<1, 0>(); + for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { + if ((it.index() % 2) == 0) { // Thread 0 handles even indices + auto &c = *it; + if (!c) { + return; } + auto *state = static_cast(c->user_data); + TRACE_EVENT("http", "release", + perfetto::Flow::Global(state->request_id)); + Server::release_back_to_server(std::move(c)); } } - }); - } + } + }); + finalStageThreads.emplace_back([this]() { + pthread_setname_np(pthread_self(), "stage-1-1"); + for (;;) { + auto guard = pipeline.acquire<1, 1>(); + for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { + if ((it.index() % 2) == 1) { // Thread 1 handles odd indices + auto &c = *it; + if (!c) { + return; + } + auto *state = static_cast(c->user_data); + TRACE_EVENT("http", "release", + perfetto::Flow::Global(state->request_id)); + Server::release_back_to_server(std::move(c)); + } + } + } + }); stage0Thread = std::thread{[this]() { pthread_setname_np(pthread_self(), "stage-0"); for (;;) { - auto guard = pipeline.acquire(0, 0, 0, false); - for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { - auto &c = *it; + auto guard = pipeline.acquire<0, 0>(); + for (auto &c : guard.batch) { if (!c) { return; } @@ -102,7 +116,7 @@ struct HttpHandler : ConnectionHandler { } ~HttpHandler() { { - auto guard = pipeline.push(kFinalStageThreads, true); + auto guard = pipeline.push(2, true); for (auto &c : guard.batch) { c = {}; } @@ -137,8 +151,9 @@ struct HttpHandler : ConnectionHandler { private: static constexpr int kFinalStageThreads = 2; static constexpr int kLogSize = 12; - ThreadPipeline> pipeline{ - kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}}; + StaticThreadPipeline, + WaitStrategy::WaitIfStageEmpty, 1, 2> + pipeline{kLogSize}; std::thread stage0Thread; std::vector finalStageThreads; diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index 79b4f0a..b6652b5 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -1,152 +1,155 @@ #pragma once +#include #include #include #include #include #include #include -#include #include -#include #include #include -// Topology configuration - separates stage layout from pipeline logic -struct PipelineTopology { - std::vector threads_per_stage; - int num_stages; - int total_threads; - - PipelineTopology(const std::vector &threads) - : threads_per_stage(threads), num_stages(threads.size()) { - assert(!threads.empty()); - assert(std::all_of(threads.begin(), threads.end(), - [](int t) { return t > 0; })); - total_threads = std::accumulate(threads.begin(), threads.end(), 0); - } - - // Get the flat array offset for the first thread of a stage - int stage_offset(int stage) const { - assert(stage >= 0 && stage < num_stages); - int offset = 0; - for (int i = 0; i < stage; ++i) { - offset += threads_per_stage[i]; - } - return offset; - } - - // Get flat array index for a specific thread in a stage - int thread_index(int stage, int thread) const { - assert(stage >= 0 && stage < num_stages); - assert(thread >= 0 && thread < threads_per_stage[stage]); - return stage_offset(stage) + thread; - } - - // Get number of threads in the previous stage (for initialization) - int prev_stage_thread_count(int stage) const { - return (stage == 0) ? 1 : threads_per_stage[stage - 1]; - } -}; - // Wait strategies for controlling thread blocking behavior when no work is // available enum class WaitStrategy { - // Never block - threads busy-wait (spin) when no work available. - // Stage threads will always use 100% CPU even when idle. - // Requires dedicated CPU cores to avoid scheduler thrashing. - // Use when: latency is critical and you have spare cores. Never, - - // Block only when all upstream stages are idle (no new work entering - // pipeline). - // Downstream threads busy-wait if upstream has work but not for their stage. - // Eliminates futex notifications between stages, reduces to 0% CPU when idle. - // Requires dedicated cores to avoid priority inversion when pipeline has - // work. - // Use when: high throughput with spare cores and sustained workloads. WaitIfUpstreamIdle, - - // Block when individual stages are empty (original behavior). - // Each stage waits independently on its input sources. - // Safe for shared CPU environments, works well with variable workloads. - // Use when: general purpose, shared cores, or unpredictable workloads. WaitIfStageEmpty, }; -// Core thread state - extracted from pipeline concerns +// Core thread state struct ThreadState { - // Where this thread has published up to alignas(128) std::atomic pops{0}; - // Where this thread will publish to the next time it publishes, or if idle - // where it has published to uint32_t local_pops{0}; - // Where the previous stage's threads have published up to last we checked std::vector last_push_read; bool last_stage; }; -// Core pipeline algorithms - independent of storage layout -namespace PipelineAlgorithms { +// Compile-time topology configuration +template struct StaticPipelineTopology { + static_assert(sizeof...(ThreadsPerStage) > 0, + "Must specify at least one stage"); + static_assert(((ThreadsPerStage > 0) && ...), + "All stages must have at least one thread"); -// Calculate how many items can be safely acquired from a stage -template -uint32_t calculate_safe_len(const PipelineTopology &topology, - std::vector &all_threads, - std::atomic &pushes, int stage, - int thread_in_stage, bool may_block) { - int thread_idx = topology.thread_index(stage, thread_in_stage); + static constexpr int num_stages = sizeof...(ThreadsPerStage); + static constexpr std::array threads_per_stage = { + ThreadsPerStage...}; + static constexpr int total_threads = (ThreadsPerStage + ...); + + // Compile-time stage offset calculation + template static constexpr int stage_offset() { + static_assert(Stage >= 0 && Stage < num_stages, + "Stage index out of bounds"); + if constexpr (Stage == 0) { + return 0; + } else { + return stage_offset() + threads_per_stage[Stage - 1]; + } + } + + // Compile-time thread index calculation + template static constexpr int thread_index() { + static_assert(Stage >= 0 && Stage < num_stages, + "Stage index out of bounds"); + static_assert(Thread >= 0 && Thread < threads_per_stage[Stage], + "Thread index out of bounds"); + return stage_offset() + Thread; + } + + // Compile-time previous stage thread count + template static constexpr int prev_stage_thread_count() { + static_assert(Stage >= 0 && Stage < num_stages, + "Stage index out of bounds"); + if constexpr (Stage == 0) { + return 1; + } else { + return threads_per_stage[Stage - 1]; + } + } +}; + +// Static pipeline algorithms - compile-time specialized versions +namespace StaticPipelineAlgorithms { + +template +uint32_t calculate_safe_len( + std::array &all_threads, + std::atomic &pushes, bool may_block) { + constexpr int thread_idx = + Topology::template thread_index(); auto &thread = all_threads[thread_idx]; uint32_t safe_len = UINT32_MAX; - // Check all threads from the previous stage (or pushes for stage 0) - int prev_stage_threads = topology.prev_stage_thread_count(stage); - for (int i = 0; i < prev_stage_threads; ++i) { - auto &last_push = - (stage == 0) ? pushes - : all_threads[topology.thread_index(stage - 1, i)].pops; + constexpr int prev_stage_threads = + Topology::template prev_stage_thread_count(); - if (thread.last_push_read[i] == thread.local_pops) { - // Re-read with memory order and try again - thread.last_push_read[i] = last_push.load(std::memory_order_acquire); - if (thread.last_push_read[i] == thread.local_pops) { - if (!may_block) { - return 0; - } + // Compile-time loop over previous stage threads + [&](std::index_sequence) { + ( + [&] { + auto &last_push = [&]() -> std::atomic & { + if constexpr (Stage == 0) { + return pushes; + } else { + constexpr int prev_thread_idx = + Topology::template thread_index(); + return all_threads[prev_thread_idx].pops; + } + }(); - if constexpr (wait_strategy == WaitStrategy::Never) { - // Empty - busy wait - } else if constexpr (wait_strategy == - WaitStrategy::WaitIfUpstreamIdle) { - auto push = pushes.load(std::memory_order_relaxed); - if (push == thread.local_pops) { - pushes.wait(push, std::memory_order_relaxed); + if (thread.last_push_read[Is] == thread.local_pops) { + thread.last_push_read[Is] = + last_push.load(std::memory_order_acquire); + if (thread.last_push_read[Is] == thread.local_pops) { + if (!may_block) { + safe_len = 0; + return; + } + + if constexpr (wait_strategy == WaitStrategy::Never) { + // Empty - busy wait + } else if constexpr (wait_strategy == + WaitStrategy::WaitIfUpstreamIdle) { + auto push = pushes.load(std::memory_order_relaxed); + if (push == thread.local_pops) { + pushes.wait(push, std::memory_order_relaxed); + } + } else { + static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); + last_push.wait(thread.last_push_read[Is], + std::memory_order_relaxed); + } + + thread.last_push_read[Is] = + last_push.load(std::memory_order_acquire); + } } - } else { - static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); - last_push.wait(thread.last_push_read[i], std::memory_order_relaxed); - } + safe_len = + std::min(safe_len, thread.last_push_read[Is] - thread.local_pops); + }(), + ...); + }(std::make_index_sequence{}); - thread.last_push_read[i] = last_push.load(std::memory_order_acquire); - } - } - safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops); - } return safe_len; } -// Update thread pops counter after processing -template -void update_thread_pops(const PipelineTopology &topology, - std::vector &all_threads, int stage, - int thread_in_stage, uint32_t local_pops) { - int thread_idx = topology.thread_index(stage, thread_in_stage); +template +void update_thread_pops( + std::array &all_threads, + uint32_t local_pops) { + constexpr int thread_idx = + Topology::template thread_index(); auto &thread_state = all_threads[thread_idx]; if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) { thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.notify_all(); - } else if (thread_state.last_stage) { + } else if constexpr (Stage == Topology::num_stages - 1) { // last stage thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.notify_all(); } else { @@ -154,24 +157,22 @@ void update_thread_pops(const PipelineTopology &topology, } } -// Check if producer can proceed without stomping the ring buffer -// Returns: 0 = can proceed, 1 = should retry, 2 = cannot proceed (return empty -// guard) -inline int check_producer_capacity(const PipelineTopology &topology, - std::vector &all_threads, - uint32_t slot, uint32_t size, - uint32_t slot_count, bool block) { - // Check against last stage threads - int last_stage = topology.num_stages - 1; - int last_stage_offset = topology.stage_offset(last_stage); - int last_stage_thread_count = topology.threads_per_stage[last_stage]; +template +int check_producer_capacity( + std::array &all_threads, + uint32_t slot, uint32_t size, uint32_t slot_count, bool block) { + constexpr int last_stage = Topology::num_stages - 1; + constexpr int last_stage_offset = + Topology::template stage_offset(); + constexpr int last_stage_thread_count = + Topology::threads_per_stage[last_stage]; for (int i = 0; i < last_stage_thread_count; ++i) { auto &thread = all_threads[last_stage_offset + i]; uint32_t pops = thread.pops.load(std::memory_order_acquire); if (slot + size - pops > slot_count) { if (!block) { - return 2; // Cannot proceed, caller should return empty guard + return 2; // Cannot proceed } thread.pops.wait(pops, std::memory_order_relaxed); return 1; // Should retry @@ -179,77 +180,24 @@ inline int check_producer_capacity(const PipelineTopology &topology, } return 0; // Can proceed } -} // namespace PipelineAlgorithms +} // namespace StaticPipelineAlgorithms -// Multi-stage lock-free pipeline for high-throughput inter-thread -// communication. -// -// Overview: -// - Items flow from producers through multiple processing stages (stage 0 -> -// stage 1 -> ... -> final stage) -// - Each stage can have multiple worker threads processing items in parallel -// - Uses a shared ring buffer with atomic counters for lock-free coordination -// - Supports batch processing for efficiency -// -// Architecture: -// - Producers: External threads that add items to the pipeline via push() -// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via -// acquire() -// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage -// -// Usage Pattern: -// // Producer threads (external to pipeline stages - add items for stage 0 to -// consume): auto guard = pipeline.push(batchSize, /*block=*/true); for (auto& -// item : guard.batch) { -// // Initialize item data -// } -// // Guard destructor publishes batch to stage 0 consumers -// -// // Stage worker threads (process items and pass to next stage): -// auto guard = pipeline.acquire(stageNum, threadId, maxBatch, -// /*mayBlock=*/true); for (auto& item : guard.batch) { -// // Process item -// } -// // Guard destructor marks items as consumed and available to next stage -// -// Memory Model: -// - Ring buffer size must be power of 2 for efficient masking -// - Actual ring slots accessed via: index & (slotCount - 1) -// - 128-byte aligned atomics prevent false sharing between CPU cache lines -// -// Thread Safety: -// - Fully lock-free using atomic operations with acquire/release memory -// ordering -// - Uses C++20 atomic wait/notify for efficient blocking when no work available -// - RAII guards ensure proper cleanup even with exceptions -// -// Refactored Design: -// - Topology configuration separated from pipeline logic -// - Flattened thread storage replaces nested vectors -// - Core algorithms extracted into pure functions -// - Ready for compile-time static version implementation -template -struct ThreadPipeline { +// Static multi-stage lock-free pipeline +template +struct StaticThreadPipeline { + using Topology = StaticPipelineTopology; - // Constructor now takes topology configuration - ThreadPipeline(int lgSlotCount, const PipelineTopology &topo) - : topology(topo), slot_count(1 << lgSlotCount), - slot_count_mask(slot_count - 1), all_threads(topology.total_threads), + explicit StaticThreadPipeline(int lgSlotCount) + : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), ring(slot_count) { - // Otherwise we can't tell the difference between full and empty assert(!(slot_count_mask & 0x80000000)); - initialize_all_threads(); } - // Legacy constructor for backward compatibility - ThreadPipeline(int lgSlotCount, const std::vector &threadsPerStage) - : ThreadPipeline(lgSlotCount, PipelineTopology(threadsPerStage)) {} - - ThreadPipeline(ThreadPipeline const &) = delete; - ThreadPipeline &operator=(ThreadPipeline const &) = delete; - ThreadPipeline(ThreadPipeline &&) = delete; - ThreadPipeline &operator=(ThreadPipeline &&) = delete; + StaticThreadPipeline(StaticThreadPipeline const &) = delete; + StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete; + StaticThreadPipeline(StaticThreadPipeline &&) = delete; + StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete; struct Batch { Batch() : ring(), begin_(), end_() {} @@ -317,7 +265,6 @@ struct ThreadPipeline { } friend bool operator<(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - // Handle potential uint32_t wraparound by using signed difference return static_cast(lhs.index_ - rhs.index_) < 0; } friend bool operator<=(const Iterator &lhs, const Iterator &rhs) { @@ -333,9 +280,6 @@ struct ThreadPipeline { return static_cast(lhs.index_ - rhs.index_) >= 0; } - /// Returns the ring buffer index (0 to ring->size()-1) for this iterator - /// position. Useful for distributing work across multiple threads by - /// using modulo operations. uint32_t index() const { return index_ & (ring->size() - 1); } private: @@ -356,7 +300,7 @@ struct ThreadPipeline { } private: - friend struct ThreadPipeline; + friend struct StaticThreadPipeline; Batch(std::vector *const ring, uint32_t begin_, uint32_t end_) : ring(ring), begin_(begin_), end_(end_) {} std::vector *const ring; @@ -364,45 +308,45 @@ struct ThreadPipeline { uint32_t end_; }; -private: - // Pipeline configuration - PipelineTopology topology; + // Static thread storage - fixed size array + std::array all_threads; - // Core state +private: alignas(128) std::atomic slots{0}; alignas(128) std::atomic pushes{0}; const uint32_t slot_count; const uint32_t slot_count_mask; - // Flattened thread storage - single array instead of nested vectors - std::vector all_threads; - - // Ring buffer std::vector ring; void initialize_all_threads() { - for (int stage = 0; stage < topology.num_stages; ++stage) { - int stage_offset = topology.stage_offset(stage); - int stage_thread_count = topology.threads_per_stage[stage]; - int prev_stage_threads = topology.prev_stage_thread_count(stage); - bool is_last_stage = (stage == topology.num_stages - 1); + [&](std::index_sequence) { + (init_stage_threads(), ...); + }(std::make_index_sequence{}); + } - for (int thread = 0; thread < stage_thread_count; ++thread) { - auto &thread_state = all_threads[stage_offset + thread]; - thread_state.last_stage = is_last_stage; - thread_state.last_push_read = std::vector(prev_stage_threads); - } + template void init_stage_threads() { + constexpr int stage_offset = Topology::template stage_offset(); + constexpr int stage_thread_count = Topology::threads_per_stage[Stage]; + constexpr int prev_stage_threads = + Topology::template prev_stage_thread_count(); + constexpr bool is_last_stage = (Stage == Topology::num_stages - 1); + + for (int thread = 0; thread < stage_thread_count; ++thread) { + auto &thread_state = all_threads[stage_offset + thread]; + thread_state.last_stage = is_last_stage; + thread_state.last_push_read = std::vector(prev_stage_threads); } } - Batch acquire_helper(int stage, int thread, uint32_t maxBatch, - bool may_block) { - int thread_idx = topology.thread_index(stage, thread); + template + Batch acquire_helper(uint32_t maxBatch, bool mayBlock) { + constexpr int thread_idx = Topology::template thread_index(); auto &thread_state = all_threads[thread_idx]; uint32_t begin = thread_state.local_pops & slot_count_mask; - uint32_t len = PipelineAlgorithms::calculate_safe_len( - topology, all_threads, pushes, stage, thread, may_block); + uint32_t len = StaticPipelineAlgorithms::calculate_safe_len< + wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock); if (maxBatch != 0) { len = std::min(len, maxBatch); @@ -417,32 +361,37 @@ private: } public: - struct StageGuard { + template struct StageGuard { Batch batch; ~StageGuard() { - if (cleanup_func) { - cleanup_func(); + if (!batch.empty()) { + StaticPipelineAlgorithms::update_thread_pops( + pipeline->all_threads, local_pops); } } StageGuard(StageGuard const &) = delete; StageGuard &operator=(StageGuard const &) = delete; StageGuard(StageGuard &&other) noexcept - : batch(other.batch), - cleanup_func(std::exchange(other.cleanup_func, nullptr)) {} + : batch(other.batch), local_pops(other.local_pops), + pipeline(std::exchange(other.pipeline, nullptr)) {} StageGuard &operator=(StageGuard &&other) noexcept { batch = other.batch; - cleanup_func = std::exchange(other.cleanup_func, nullptr); + local_pops = other.local_pops; + pipeline = std::exchange(other.pipeline, nullptr); return *this; } private: - friend struct ThreadPipeline; - std::function cleanup_func; + friend struct StaticThreadPipeline; + uint32_t local_pops; + StaticThreadPipeline *pipeline; - StageGuard(Batch batch, std::function cleanup) - : batch(batch), cleanup_func(batch.empty() ? nullptr : cleanup) {} + StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline) + : batch(batch), local_pops(local_pops), + pipeline(batch.empty() ? nullptr : pipeline) {} }; struct ProducerGuard { @@ -452,8 +401,6 @@ public: if (tp == nullptr) { return; } - // Wait for earlier slots to finish being published, since publishing - // implies that all previous slots were also published. for (;;) { uint32_t p = tp->pushes.load(std::memory_order_acquire); if (p == old_slot) { @@ -461,65 +408,38 @@ public: } tp->pushes.wait(p, std::memory_order_relaxed); } - // Publish. seq_cst so that the notify can't be ordered before the store tp->pushes.store(new_slot, std::memory_order_seq_cst); - // We have to notify every time, since we don't know if this is the last - // push ever tp->pushes.notify_all(); } private: - friend struct ThreadPipeline; + friend struct StaticThreadPipeline; ProducerGuard() : batch(), tp() {} - ProducerGuard(Batch batch, ThreadPipeline *tp, - uint32_t old_slot, uint32_t new_slot) + ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot, + uint32_t new_slot) : batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {} - ThreadPipeline *const tp; + StaticThreadPipeline *const tp; uint32_t old_slot; uint32_t new_slot; }; - // Acquire a batch of items for processing by a consumer thread. - // stage: which processing stage (0 = first consumer stage after producers) - // thread: thread ID within the stage (0 to threadsPerStage[stage]-1) - // maxBatch: maximum items to acquire (0 = no limit) - // mayBlock: whether to block waiting for items (false = return empty batch if - // none available) Returns: StageGuard with batch of items to process - [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0, - bool mayBlock = true) { - assert(stage >= 0 && stage < topology.num_stages); - assert(thread >= 0 && thread < topology.threads_per_stage[stage]); + // Static acquire - Stage and Thread are compile-time parameters + template + [[nodiscard]] StageGuard acquire(int maxBatch = 0, + bool may_block = true) { + static_assert(Stage >= 0 && Stage < Topology::num_stages, + "Stage index out of bounds"); + static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage], + "Thread index out of bounds"); - auto batch = acquire_helper(stage, thread, maxBatch, mayBlock); + auto batch = acquire_helper(maxBatch, may_block); - // Create cleanup function that will update thread state on destruction - int thread_idx = topology.thread_index(stage, thread); + constexpr int thread_idx = Topology::template thread_index(); uint32_t local_pops = all_threads[thread_idx].local_pops; - auto cleanup = [this, stage, thread, local_pops]() { - PipelineAlgorithms::update_thread_pops( - topology, all_threads, stage, thread, local_pops); - }; - - return StageGuard{std::move(batch), cleanup}; + return StageGuard{std::move(batch), local_pops, this}; } - // Reserve slots in the ring buffer for a producer thread to fill with items. - // This is used by producer threads to add new items to stage 0 of the - // pipeline. - // - // size: number of slots to reserve (must be > 0 and <= ring buffer capacity) - // block: if true, blocks when ring buffer is full; if false, returns empty - // guard Returns: ProducerGuard with exclusive access to reserved slots - // - // Usage: Fill items in the returned batch, then let guard destructor publish - // them. The guard destructor ensures items are published in the correct - // order. - // - // Preconditions: - // - size > 0 (must request at least one slot) - // - size <= slotCount (cannot request more slots than ring buffer capacity) - // Violating preconditions results in program termination via abort(). [[nodiscard]] ProducerGuard push(uint32_t const size, bool block) { if (size == 0) { std::abort(); @@ -534,16 +454,15 @@ public: slot = slots.load(std::memory_order_relaxed); begin = slot & slot_count_mask; - // Use algorithm function to check capacity - int capacity_result = PipelineAlgorithms::check_producer_capacity( - topology, all_threads, slot, size, slot_count, block); + int capacity_result = + StaticPipelineAlgorithms::check_producer_capacity( + all_threads, slot, size, slot_count, block); if (capacity_result == 1) { - continue; // Retry + continue; } if (capacity_result == 2) { - return ProducerGuard{}; // Cannot proceed, return empty guard + return ProducerGuard{}; } - // capacity_result == 0, can proceed if (slots.compare_exchange_weak(slot, slot + size, std::memory_order_relaxed, diff --git a/tests/test_server_connection_return.cpp b/tests/test_server_connection_return.cpp index 9b51ef7..1752227 100644 --- a/tests/test_server_connection_return.cpp +++ b/tests/test_server_connection_return.cpp @@ -18,10 +18,12 @@ struct Message { struct EchoHandler : public ConnectionHandler { private: - ThreadPipeline &pipeline; + StaticThreadPipeline &pipeline; public: - explicit EchoHandler(ThreadPipeline &pipeline) + explicit EchoHandler( + StaticThreadPipeline + &pipeline) : pipeline(pipeline) {} void on_data_arrived(std::string_view data, @@ -42,11 +44,11 @@ TEST_CASE( config.server.io_threads = 1; config.server.epoll_instances = 1; - ThreadPipeline pipeline{10, {1}}; + StaticThreadPipeline pipeline{10}; EchoHandler handler{pipeline}; auto echoThread = std::thread{[&]() { for (;;) { - auto guard = pipeline.acquire(0, 0); + auto guard = pipeline.acquire<0, 0>(); for (auto &message : guard.batch) { bool done = message.done; if (done) {