diff --git a/benchmarks/bench_thread_pipeline.cpp b/benchmarks/bench_thread_pipeline.cpp index e9ba307..2d5b3f0 100644 --- a/benchmarks/bench_thread_pipeline.cpp +++ b/benchmarks/bench_thread_pipeline.cpp @@ -24,15 +24,15 @@ int main() { } }); - StaticThreadPipeline - pipeline(LOG_PIPELINE_SIZE); + ThreadPipeline pipeline(WaitStrategy::WaitIfStageEmpty, {1}, + LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { for (;;) { - auto guard = pipeline.acquire<0, 0>(); + auto guard = pipeline.acquire(0, 0); for (auto &item : guard.batch) { spend_cpu_cycles(BUSY_ITERS); @@ -89,15 +89,15 @@ int main() { .warmup(100); for (int batch_size : {1, 4, 16, 64, 256}) { - StaticThreadPipeline - pipeline(LOG_PIPELINE_SIZE); + ThreadPipeline pipeline(WaitStrategy::WaitIfStageEmpty, {1}, + LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { for (;;) { - auto guard = pipeline.acquire<0, 0>(); + auto guard = pipeline.acquire(0, 0); for (auto &item : guard.batch) { spend_cpu_cycles(BUSY_ITERS); @@ -142,74 +142,73 @@ int main() { } // Helper function for wait strategy benchmarks - auto benchmark_wait_strategy = - [](const std::string &name, - ankerl::nanobench::Bench &bench) { - constexpr int LOG_PIPELINE_SIZE = - 8; // Smaller buffer to increase contention - constexpr int NUM_ITEMS = 50'000; - constexpr int BATCH_SIZE = 4; // Small batches to increase coordination - constexpr int BUSY_ITERS = - 10; // Light work to emphasize coordination overhead + auto benchmark_wait_strategy = [](WaitStrategy strategy, + const std::string &name, + ankerl::nanobench::Bench &bench) { + constexpr int LOG_PIPELINE_SIZE = + 8; // Smaller buffer to increase contention + constexpr int NUM_ITEMS = 50'000; + constexpr int BATCH_SIZE = 4; // Small batches to increase coordination + constexpr int BUSY_ITERS = + 10; // Light work to emphasize coordination overhead - StaticThreadPipeline pipeline( - LOG_PIPELINE_SIZE); + ThreadPipeline pipeline(strategy, {1, 1}, LOG_PIPELINE_SIZE); - std::latch done{0}; + std::latch done{0}; - // Stage 0 worker - std::thread stage0_thread([&pipeline, &done]() { - for (;;) { - auto guard = pipeline.template acquire<0, 0>(); - for (auto &item : guard.batch) { - spend_cpu_cycles(BUSY_ITERS); - if (item == &done) - return; - } - } - }); - - // Stage 1 worker (final stage - always calls futex wake) - std::thread stage1_thread([&pipeline, &done]() { - for (;;) { - auto guard = pipeline.template acquire<1, 0>(); - for (auto &item : guard.batch) { - spend_cpu_cycles(BUSY_ITERS); - if (item == &done) - return; - if (item) - item->count_down(); - } - } - }); - - bench.run(name, [&] { - int items_pushed = 0; - while (items_pushed < NUM_ITEMS - 1) { - auto guard = pipeline.push( - std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); - auto it = guard.batch.begin(); - items_pushed += guard.batch.size(); - for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { - *it = nullptr; - } - } - std::latch finish{1}; - { - auto guard = pipeline.push(1, true); - guard.batch[0] = &finish; - } - finish.wait(); - }); - - // Shutdown - { - auto guard = pipeline.push(1, true); - guard.batch[0] = &done; + // Stage 0 worker + std::thread stage0_thread([&pipeline, &done]() { + for (;;) { + auto guard = pipeline.acquire(0, 0); + for (auto &item : guard.batch) { + spend_cpu_cycles(BUSY_ITERS); + if (item == &done) + return; } - stage0_thread.join(); - stage1_thread.join(); - }; + } + }); + + // Stage 1 worker (final stage - always calls futex wake) + std::thread stage1_thread([&pipeline, &done]() { + for (;;) { + auto guard = pipeline.acquire(1, 0); + for (auto &item : guard.batch) { + spend_cpu_cycles(BUSY_ITERS); + if (item == &done) + return; + if (item) + item->count_down(); + } + } + }); + + bench.run(name, [&] { + int items_pushed = 0; + while (items_pushed < NUM_ITEMS - 1) { + auto guard = pipeline.push( + std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); + auto it = guard.batch.begin(); + items_pushed += guard.batch.size(); + for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { + *it = nullptr; + } + } + std::latch finish{1}; + { + auto guard = pipeline.push(1, true); + guard.batch[0] = &finish; + } + finish.wait(); + }); + + // Shutdown + { + auto guard = pipeline.push(1, true); + guard.batch[0] = &done; + } + stage0_thread.join(); + stage1_thread.join(); + }; // Wait strategy comparison benchmark - multiple stages to trigger futex wakes { @@ -220,12 +219,11 @@ int main() { .relative(true) .warmup(50); - benchmark_wait_strategy.template operator()( - "WaitIfStageEmpty", bench); - benchmark_wait_strategy.template - operator()("WaitIfUpstreamIdle", bench); - benchmark_wait_strategy.template operator()("Never", - bench); + benchmark_wait_strategy(WaitStrategy::WaitIfStageEmpty, "WaitIfStageEmpty", + bench); + benchmark_wait_strategy(WaitStrategy::WaitIfUpstreamIdle, + "WaitIfUpstreamIdle", bench); + benchmark_wait_strategy(WaitStrategy::Never, "Never", bench); } // TODO: Add more benchmarks for: diff --git a/config.md b/config.md index 6c629da..4629c7c 100644 --- a/config.md +++ b/config.md @@ -28,13 +28,15 @@ Controls server networking, threading, and request handling behavior. ### Commit Configuration (`[commit]`) -Controls behavior of the `/v1/commit` endpoint and request ID management. +Controls behavior of the `/v1/commit` endpoint, request ID management, and commit pipeline threading. | Parameter | Type | Default | Description | |-----------|------|---------|-------------| | `min_request_id_length` | integer | `20` | Minimum length required for client-provided `request_id` fields to ensure sufficient entropy for collision avoidance | | `request_id_retention_hours` | integer | `24` | How long to retain request IDs in memory for `/v1/status` queries. Longer retention reduces the chance of `log_truncated` responses | | `request_id_retention_versions` | integer | `100000000` | Minimum number of versions to retain request IDs for, regardless of time. Provides additional protection against `log_truncated` responses | +| `pipeline_wait_strategy` | string | `"WaitIfUpstreamIdle"` | Wait strategy for the commit pipeline. `"WaitIfStageEmpty"` = block when individual stages are empty (safe for shared CPUs), `"WaitIfUpstreamIdle"` = block only when all upstream stages are idle (requires dedicated cores, highest throughput), `"Never"` = never block, busy-wait continuously (requires dedicated cores, lowest latency) | +| `pipeline_release_threads` | integer | `1` | Number of threads in the release stage (final stage of commit pipeline). Higher values increase parallelism for connection release and response transmission | ### Subscription Configuration (`[subscription]`) @@ -77,6 +79,8 @@ read_buffer_size = 32768 # 32KB min_request_id_length = 32 request_id_retention_hours = 48 request_id_retention_versions = 50000 +pipeline_wait_strategy = "WaitIfUpstreamIdle" # Options: "WaitIfStageEmpty", "WaitIfUpstreamIdle", "Never" +pipeline_release_threads = 4 # Default: 1, increase for higher throughput [subscription] max_buffer_size_bytes = 52428800 # 50MB @@ -115,6 +119,11 @@ These configuration parameters directly affect server and API behavior: - **`request_id_retention_*`**: Affects availability of data for `/v1/status` queries and likelihood of `log_truncated` responses +**Commit Pipeline Performance:** + +- **`pipeline_wait_strategy`**: Controls CPU usage vs latency tradeoff in commit processing. `WaitIfStageEmpty` is safest for shared CPUs, `WaitIfUpstreamIdle` provides highest throughput with dedicated cores, `Never` provides lowest latency but uses 100% CPU +- **`pipeline_release_threads`**: Determines parallelism in the final stage of commit processing. More threads can improve throughput when processing many concurrent requests + **Subscription Streaming:** - **`max_buffer_size_bytes`**: Controls when `/v1/subscribe` connections are terminated due to slow consumption @@ -137,6 +146,8 @@ The configuration system includes comprehensive validation with specific bounds - **`min_request_id_length`**: Must be between 8 and 256 characters - **`request_id_retention_hours`**: Must be between 1 and 8760 hours (1 year) - **`request_id_retention_versions`**: Must be > 0 +- **`pipeline_wait_strategy`**: Must be one of: `"WaitIfStageEmpty"`, `"WaitIfUpstreamIdle"`, or `"Never"` +- **`pipeline_release_threads`**: Must be between 1 and 64 ### Subscription Configuration Limits diff --git a/src/commit_pipeline.cpp b/src/commit_pipeline.cpp index d3ac123..c551d08 100644 --- a/src/commit_pipeline.cpp +++ b/src/commit_pipeline.cpp @@ -17,7 +17,9 @@ auto banned_request_ids_memory_gauge = .create({}); CommitPipeline::CommitPipeline(const weaseldb::Config &config) - : config_(config), pipeline_(lg_size) { + : config_(config), + pipeline_(config.commit.pipeline_wait_strategy, + {1, 1, 1, config.commit.pipeline_release_threads}, lg_size) { // Stage 0: Sequence assignment thread sequence_thread_ = std::thread{[this]() { @@ -37,32 +39,35 @@ CommitPipeline::CommitPipeline(const weaseldb::Config &config) run_persist_stage(); }}; - // Stage 3: Connection return to server threads (2 threads) - release_thread_1_ = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-release-1"); - run_release_stage<0>(); - }}; - - release_thread_2_ = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-release-2"); - run_release_stage<1>(); - }}; + // Stage 3: Connection return to server threads (configurable count) + release_threads_.reserve(config.commit.pipeline_release_threads); + for (int i = 0; i < config.commit.pipeline_release_threads; ++i) { + release_threads_.emplace_back([this, i]() { + char name[16]; + std::snprintf(name, sizeof(name), "txn-release-%d", i); + pthread_setname_np(pthread_self(), name); + run_release_stage(i); + }); + } } CommitPipeline::~CommitPipeline() { - // Send two shutdown signals for both release threads (adjacent in same batch) + // Send shutdown signals for all release threads (adjacent in same batch) { - auto guard = pipeline_.push(2, true); - guard.batch[0] = ShutdownEntry{}; - guard.batch[1] = ShutdownEntry{}; + int num_release_threads = static_cast(release_threads_.size()); + auto guard = pipeline_.push(num_release_threads, true); + for (int i = 0; i < num_release_threads; ++i) { + guard.batch[i] = ShutdownEntry{}; + } } // Join all pipeline threads sequence_thread_.join(); resolve_thread_.join(); persist_thread_.join(); - release_thread_1_.join(); - release_thread_2_.join(); + for (auto &thread : release_threads_) { + thread.join(); + } } void CommitPipeline::submit_batch(std::span entries) { @@ -92,8 +97,9 @@ void CommitPipeline::run_sequence_stage() { BannedRequestIdSet banned_request_ids{ ArenaStlAllocator(&banned_request_arena)}; - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<0, 0>(); + int expected_shutdowns = config_.commit.pipeline_release_threads; + for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) { + auto guard = pipeline_.acquire(0, 0); auto &batch = guard.batch; // Stage 0: Sequence assignment @@ -160,8 +166,9 @@ void CommitPipeline::run_sequence_stage() { // AVOID BLOCKING IN THIS STAGE! void CommitPipeline::run_resolve_stage() { - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); + int expected_shutdowns = config_.commit.pipeline_release_threads; + for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) { + auto guard = pipeline_.acquire(1, 0, /*maxBatch*/ 1); auto &batch = guard.batch; // Stage 1: Precondition resolution @@ -197,8 +204,9 @@ void CommitPipeline::run_resolve_stage() { } void CommitPipeline::run_persist_stage() { - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<2, 0>(); + int expected_shutdowns = config_.commit.pipeline_release_threads; + for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) { + auto guard = pipeline_.acquire(2, 0); auto &batch = guard.batch; // Stage 2: Transaction persistence @@ -289,9 +297,9 @@ void CommitPipeline::run_persist_stage() { } } -template void CommitPipeline::run_release_stage() { +void CommitPipeline::run_release_stage(int thread_index) { for (int shutdowns_received = 0; shutdowns_received < 1;) { - auto guard = pipeline_.acquire<3, thread_index>(); + auto guard = pipeline_.acquire(3, thread_index); auto &batch = guard.batch; // Stage 3: Connection release @@ -302,7 +310,9 @@ template void CommitPipeline::run_release_stage() { // Partition work: thread 0 handles even indices, thread 1 handles odd // indices - if (static_cast(it.index() % 2) != thread_index) { + if (static_cast(it.index() % + config_.commit.pipeline_release_threads) != + thread_index) { continue; } diff --git a/src/commit_pipeline.hpp b/src/commit_pipeline.hpp index fd50ed5..941abcb 100644 --- a/src/commit_pipeline.hpp +++ b/src/commit_pipeline.hpp @@ -93,27 +93,24 @@ private: // Lock-free pipeline configuration static constexpr int lg_size = 16; // Ring buffer size (2^16 slots) - static constexpr auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle; // 4-stage pipeline: sequence -> resolve -> persist -> release - StaticThreadPipeline pipeline_; + ThreadPipeline pipeline_; // Stage processing threads std::thread sequence_thread_; std::thread resolve_thread_; std::thread persist_thread_; - std::thread release_thread_1_; - std::thread release_thread_2_; + std::vector release_threads_; // Pipeline stage main loops void run_sequence_stage(); void run_resolve_stage(); void run_persist_stage(); - template void run_release_stage(); + void run_release_stage(int thread_index); // Pipeline batch type alias - using BatchType = - StaticThreadPipeline::Batch; + using BatchType = ThreadPipeline::Batch; // Make non-copyable and non-movable CommitPipeline(const CommitPipeline &) = delete; diff --git a/src/config.cpp b/src/config.cpp index c0aeb86..12f9e54 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -126,6 +126,25 @@ void ConfigParser::parse_commit_config(const auto &toml_data, config.request_id_retention_hours); parse_field(commit, "request_id_retention_versions", config.request_id_retention_versions); + + // Parse wait strategy + if (commit.contains("pipeline_wait_strategy")) { + std::string strategy_str = + toml::get(commit.at("pipeline_wait_strategy")); + if (strategy_str == "WaitIfStageEmpty") { + config.pipeline_wait_strategy = WaitStrategy::WaitIfStageEmpty; + } else if (strategy_str == "WaitIfUpstreamIdle") { + config.pipeline_wait_strategy = WaitStrategy::WaitIfUpstreamIdle; + } else if (strategy_str == "Never") { + config.pipeline_wait_strategy = WaitStrategy::Never; + } else { + std::cerr << "Warning: Unknown pipeline_wait_strategy '" << strategy_str + << "', using default (WaitIfUpstreamIdle)" << std::endl; + } + } + + parse_field(commit, "pipeline_release_threads", + config.pipeline_release_threads); }); } @@ -253,6 +272,14 @@ bool ConfigParser::validate_config(const Config &config) { valid = false; } + if (config.commit.pipeline_release_threads < 1 || + config.commit.pipeline_release_threads > 64) { + std::cerr << "Configuration error: commit.pipeline_release_threads must be " + "between 1 and 64, got " + << config.commit.pipeline_release_threads << std::endl; + valid = false; + } + // Validate subscription configuration if (config.subscription.max_buffer_size_bytes == 0) { std::cerr << "Configuration error: subscription.max_buffer_size_bytes must " diff --git a/src/config.hpp b/src/config.hpp index 6f499de..01544e1 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -5,6 +5,8 @@ #include #include +#include "thread_pipeline.hpp" + namespace weaseldb { /** @@ -60,6 +62,16 @@ struct CommitConfig { std::chrono::hours request_id_retention_hours{24}; /// Minimum number of commit versions to retain request IDs for int64_t request_id_retention_versions = 100000000; + /// Wait strategy for the commit pipeline + /// - WaitIfStageEmpty: Block when individual stages are empty (default, safe + /// for shared CPUs) + /// - WaitIfUpstreamIdle: Block only when all upstream stages are idle + /// (requires dedicated cores) + /// - Never: Never block, busy-wait continuously (requires dedicated cores) + WaitStrategy pipeline_wait_strategy = WaitStrategy::WaitIfUpstreamIdle; + /// Number of threads in the release stage (final stage of commit pipeline) + /// Default: 1 thread for simplicity (can increase for higher throughput) + int pipeline_release_threads = 1; }; /** diff --git a/src/main.cpp b/src/main.cpp index 34ca057..6f9439d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -246,6 +246,24 @@ int main(int argc, char *argv[]) { std::cout << "Request ID retention: " << config->commit.request_id_retention_hours.count() << " hours" << std::endl; + + // Print pipeline configuration + std::string wait_strategy_str; + switch (config->commit.pipeline_wait_strategy) { + case WaitStrategy::WaitIfStageEmpty: + wait_strategy_str = "WaitIfStageEmpty"; + break; + case WaitStrategy::WaitIfUpstreamIdle: + wait_strategy_str = "WaitIfUpstreamIdle"; + break; + case WaitStrategy::Never: + wait_strategy_str = "Never"; + break; + } + std::cout << "Pipeline wait strategy: " << wait_strategy_str << std::endl; + std::cout << "Pipeline release threads: " + << config->commit.pipeline_release_threads << std::endl; + std::cout << "Subscription buffer size: " << config->subscription.max_buffer_size_bytes << " bytes" << std::endl; diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index 3018468..48d4557 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -1,6 +1,5 @@ #pragma once -#include #include #include #include @@ -48,151 +47,174 @@ struct ThreadState { bool last_stage; }; -// Compile-time topology configuration for static pipelines +// Runtime topology configuration for dynamic pipelines // -// This template defines a pipeline topology at compile-time: -// - Stage and thread calculations done at compile-time -// - Type-safe indexing: Stage and thread indices validated at compile-time -// - Fixed-size arrays with known bounds -// - Code specialization for each topology +// This class defines a pipeline topology at runtime: +// - Stage and thread calculations done at runtime +// - Flexible configuration: topology can be set via constructor +// - Dynamic arrays with runtime bounds checking +// - Single implementation works for any topology // -// Example: StaticPipelineTopology<1, 4, 2> creates: +// Example: PipelineTopology({1, 4, 2}) creates: // - Stage 0: 1 thread (index 0) // - Stage 1: 4 threads (indices 1-4) // - Stage 2: 2 threads (indices 5-6) // - Total: 7 threads across 3 stages -template struct StaticPipelineTopology { - static_assert(sizeof...(ThreadsPerStage) > 0, - "Must specify at least one stage"); - static_assert(((ThreadsPerStage > 0) && ...), - "All stages must have at least one thread"); +struct PipelineTopology { + const std::vector threads_per_stage; + const int num_stages; + const std::vector stage_offsets; + const int total_threads; - static constexpr int num_stages = sizeof...(ThreadsPerStage); - static constexpr std::array threads_per_stage = { - ThreadsPerStage...}; - static constexpr int total_threads = (ThreadsPerStage + ...); + explicit PipelineTopology(std::vector threads_per_stage_) + : threads_per_stage(validate_and_move(std::move(threads_per_stage_))), + num_stages(static_cast(threads_per_stage.size())), + stage_offsets(build_stage_offsets(threads_per_stage)), + total_threads(build_total_threads(threads_per_stage)) {} - // Compile-time stage offset calculation - template static constexpr int stage_offset() { - static_assert(Stage >= 0 && Stage < num_stages, - "Stage index out of bounds"); - if constexpr (Stage == 0) { - return 0; - } else { - return stage_offset() + threads_per_stage[Stage - 1]; + // Runtime stage offset calculation + int stage_offset(int stage) const { + if (stage < 0 || stage >= num_stages) { + std::abort(); // Stage index out of bounds } + return stage_offsets[stage]; } - // Compile-time thread index calculation - template static constexpr int thread_index() { - static_assert(Stage >= 0 && Stage < num_stages, - "Stage index out of bounds"); - static_assert(Thread >= 0 && Thread < threads_per_stage[Stage], - "Thread index out of bounds"); - return stage_offset() + Thread; + // Runtime thread index calculation + int thread_index(int stage, int thread) const { + if (stage < 0 || stage >= num_stages) { + std::abort(); // Stage index out of bounds + } + if (thread < 0 || thread >= threads_per_stage[stage]) { + std::abort(); // Thread index out of bounds + } + return stage_offsets[stage] + thread; } - // Compile-time previous stage thread count - template static constexpr int prev_stage_thread_count() { - static_assert(Stage >= 0 && Stage < num_stages, - "Stage index out of bounds"); - if constexpr (Stage == 0) { + // Runtime previous stage thread count + int prev_stage_thread_count(int stage) const { + if (stage < 0 || stage >= num_stages) { + std::abort(); // Stage index out of bounds + } + if (stage == 0) { return 1; } else { - return threads_per_stage[Stage - 1]; + return threads_per_stage[stage - 1]; } } + +private: + static std::vector validate_and_move(std::vector threads) { + if (threads.empty()) { + std::abort(); // Must specify at least one stage + } + for (int count : threads) { + if (count <= 0) { + std::abort(); // All stages must have at least one thread + } + } + return threads; + } + + static std::vector + build_stage_offsets(const std::vector &threads_per_stage) { + std::vector offsets(threads_per_stage.size()); + int offset = 0; + for (size_t i = 0; i < threads_per_stage.size(); ++i) { + offsets[i] = offset; + offset += threads_per_stage[i]; + } + return offsets; + } + + static int build_total_threads(const std::vector &threads_per_stage) { + int total = 0; + for (int count : threads_per_stage) { + total += count; + } + return total; + } }; -// Static pipeline algorithms - compile-time specialized versions -namespace StaticPipelineAlgorithms { +// Pipeline algorithms - runtime configurable versions +namespace PipelineAlgorithms { -template -uint32_t calculate_safe_len( - std::array &all_threads, - std::atomic &pushes, bool may_block) { - constexpr int thread_idx = - Topology::template thread_index(); +inline uint32_t calculate_safe_len(WaitStrategy wait_strategy, + const PipelineTopology &topology, int stage, + int thread_in_stage, + std::vector &all_threads, + std::atomic &pushes, + bool may_block) { + int thread_idx = topology.thread_index(stage, thread_in_stage); auto &thread = all_threads[thread_idx]; uint32_t safe_len = UINT32_MAX; - constexpr int prev_stage_threads = - Topology::template prev_stage_thread_count(); + int prev_stage_threads = topology.prev_stage_thread_count(stage); - // Compile-time loop over previous stage threads - [&](std::index_sequence) { - ( - [&] { - auto &last_push = [&]() -> std::atomic & { - if constexpr (Stage == 0) { - return pushes; - } else { - constexpr int prev_thread_idx = - Topology::template thread_index(); - return all_threads[prev_thread_idx].pops; + // Runtime loop over previous stage threads + for (int i = 0; i < prev_stage_threads; ++i) { + std::atomic &last_push = [&]() -> std::atomic & { + if (stage == 0) { + return pushes; + } else { + int prev_thread_idx = topology.thread_index(stage - 1, i); + return all_threads[prev_thread_idx].pops; + } + }(); + + if (thread.last_push_read[i] == thread.local_pops) { + thread.last_push_read[i] = last_push.load(std::memory_order_acquire); + if (thread.last_push_read[i] == thread.local_pops) { + if (!may_block) { + safe_len = 0; + return safe_len; + } + + if (wait_strategy == WaitStrategy::Never) { + // Empty - busy wait + } else if (wait_strategy == WaitStrategy::WaitIfUpstreamIdle) { + // We're allowed to spin as long as we eventually go to 0% cpu + // usage on idle + uint32_t push; + bool should_wait = true; + for (int j = 0; j < 100000; ++j) { + push = pushes.load(std::memory_order_relaxed); + if (push != thread.local_pops) { + should_wait = false; + break; } - }(); - - if (thread.last_push_read[Is] == thread.local_pops) { - thread.last_push_read[Is] = - last_push.load(std::memory_order_acquire); - if (thread.last_push_read[Is] == thread.local_pops) { - if (!may_block) { - safe_len = 0; - return; - } - - if constexpr (wait_strategy == WaitStrategy::Never) { - // Empty - busy wait - } else if constexpr (wait_strategy == - WaitStrategy::WaitIfUpstreamIdle) { - // We're allowed to spin as long as we eventually go to 0% cpu - // usage on idle - uint32_t push; - for (int i = 0; i < 100000; ++i) { - push = pushes.load(std::memory_order_relaxed); - if (push != thread.local_pops) { - goto dont_wait; - } #if defined(__x86_64__) || defined(_M_X64) - _mm_pause(); + _mm_pause(); #endif - } - pushes.wait(push, std::memory_order_relaxed); - dont_wait:; - } else { - static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); - last_push.wait(thread.last_push_read[Is], - std::memory_order_relaxed); - } - - thread.last_push_read[Is] = - last_push.load(std::memory_order_acquire); - } } - safe_len = - std::min(safe_len, thread.last_push_read[Is] - thread.local_pops); - }(), - ...); - }(std::make_index_sequence{}); + if (should_wait) { + pushes.wait(push, std::memory_order_relaxed); + } + } else { // WaitStrategy::WaitIfStageEmpty + last_push.wait(thread.last_push_read[i], std::memory_order_relaxed); + } + + thread.last_push_read[i] = last_push.load(std::memory_order_acquire); + } + } + safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops); + } return safe_len; } -template -void update_thread_pops( - std::array &all_threads, - uint32_t local_pops) { - constexpr int thread_idx = - Topology::template thread_index(); +inline void update_thread_pops(WaitStrategy wait_strategy, + const PipelineTopology &topology, int stage, + int thread_in_stage, + std::vector &all_threads, + uint32_t local_pops) { + int thread_idx = topology.thread_index(stage, thread_in_stage); auto &thread_state = all_threads[thread_idx]; - if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) { + if (wait_strategy == WaitStrategy::WaitIfStageEmpty) { thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.notify_all(); - } else if constexpr (Stage == Topology::num_stages - 1) { // last stage + } else if (stage == topology.num_stages - 1) { // last stage thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.notify_all(); } else { @@ -200,15 +222,13 @@ void update_thread_pops( } } -template -int check_producer_capacity( - std::array &all_threads, - uint32_t slot, uint32_t size, uint32_t slot_count, bool block) { - constexpr int last_stage = Topology::num_stages - 1; - constexpr int last_stage_offset = - Topology::template stage_offset(); - constexpr int last_stage_thread_count = - Topology::threads_per_stage[last_stage]; +inline int check_producer_capacity(const PipelineTopology &topology, + std::vector &all_threads, + uint32_t slot, uint32_t size, + uint32_t slot_count, bool block) { + int last_stage = topology.num_stages - 1; + int last_stage_offset = topology.stage_offset(last_stage); + int last_stage_thread_count = topology.threads_per_stage[last_stage]; for (int i = 0; i < last_stage_thread_count; ++i) { auto &thread = all_threads[last_stage_offset + i]; @@ -223,10 +243,10 @@ int check_producer_capacity( } return 0; // Can proceed } -} // namespace StaticPipelineAlgorithms +} // namespace PipelineAlgorithms -// Static multi-stage lock-free pipeline for inter-thread communication -// with compile-time topology specification. +// Multi-stage lock-free pipeline for inter-thread communication +// with runtime-configurable topology and wait strategy. // // Overview: // - Items flow from producers through multiple processing stages (stage 0 -> @@ -234,25 +254,17 @@ int check_producer_capacity( // - Each stage can have multiple worker threads processing items in parallel // - Uses a shared ring buffer with atomic counters for lock-free coordination // - Supports batch processing for efficiency -// - Compile-time topology specification via template parameters +// - Runtime-configurable topology and wait strategy via constructor parameters // // Architecture: // - Producers: External threads that add items to the pipeline via push() // - Stages: Processing stages numbered 0, 1, 2, ... that consume items via -// acquire() +// acquire(stage, thread) // - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage // -// Differences from Dynamic Version: -// - Template parameters specify topology at compile-time (e.g., ) -// - Stage and thread indices are template parameters, validated at compile-time -// - Fixed-size arrays replace dynamic vectors -// - Specialized algorithms for each stage/thread combination -// - Type-safe guards prevent runtime indexing errors -// // Usage Pattern: -// using Pipeline = StaticThreadPipeline; Pipeline pipeline(lgSlotCount); +// ThreadPipeline pipeline(WaitStrategy::WaitIfStageEmpty, {1, 4, 2}, +// lgSlotCount); // // // Producer threads (add items for stage 0 to consume): // auto guard = pipeline.push(batchSize, /*block=*/true); @@ -262,20 +274,21 @@ int check_producer_capacity( // // Guard destructor publishes batch to stage 0 consumers // // // Stage worker threads (process items and pass to next stage): -// auto guard = pipeline.acquire(maxBatch, /*may_block=*/true); +// auto guard = pipeline.acquire(stage, thread, maxBatch, /*may_block=*/true); // for (auto& item : guard.batch) { // // Process item // } // // Guard destructor marks items as consumed and available to next stage // // Multi-Thread Stage Processing: -// When a stage has multiple threads (e.g., 1, 1, 1, 2 = 2 threads in stage 3): +// When a stage has multiple threads (e.g., {1, 1, 1, 2} = 2 threads in stage +// 3): // // OVERLAPPING BATCHES - EACH THREAD SEES EVERY ENTRY: // - Multiple threads in the same stage get OVERLAPPING batches from the ring // buffer -// - Thread 0: calls acquire<3, 0>() - gets batch from ring positions 100-110 -// - Thread 1: calls acquire<3, 1>() - gets batch from ring positions 100-110 +// - Thread 0: calls acquire(3, 0) - gets batch from ring positions 100-110 +// - Thread 1: calls acquire(3, 1) - gets batch from ring positions 100-110 // (SAME) // - Both threads see the same entries and must coordinate processing // @@ -319,27 +332,27 @@ int check_producer_capacity( // ordering // - Uses C++20 atomic wait/notify for efficient blocking when no work available // - RAII guards ensure proper cleanup even with exceptions -template -struct StaticThreadPipeline { - using Topology = StaticPipelineTopology; - +template struct ThreadPipeline { // Constructor + // wait_strategy: blocking behavior when no work is available + // threads_per_stage: number of threads in each stage (e.g., {1, 4, 2}) // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) - // Template parameters specify pipeline topology (e.g., ) Note: Producer threads are external to the pipeline and not counted in - // ThreadsPerStage - explicit StaticThreadPipeline(int lgSlotCount) - : slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), - ring(slot_count) { + // Note: Producer threads are external to the pipeline and not counted in + // threads_per_stage + explicit ThreadPipeline(WaitStrategy wait_strategy, + std::vector threads_per_stage, int lgSlotCount) + : wait_strategy_(wait_strategy), topology_(std::move(threads_per_stage)), + slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), + ring(slot_count), all_threads(topology_.total_threads) { // Otherwise we can't tell the difference between full and empty. assert(!(slot_count_mask & 0x80000000)); initialize_all_threads(); } - StaticThreadPipeline(StaticThreadPipeline const &) = delete; - StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete; - StaticThreadPipeline(StaticThreadPipeline &&) = delete; - StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete; + ThreadPipeline(ThreadPipeline const &) = delete; + ThreadPipeline &operator=(ThreadPipeline const &) = delete; + ThreadPipeline(ThreadPipeline &&) = delete; + ThreadPipeline &operator=(ThreadPipeline &&) = delete; struct Batch { Batch() : ring(), begin_(), end_() {} @@ -442,7 +455,7 @@ struct StaticThreadPipeline { } private: - friend struct StaticThreadPipeline; + friend struct ThreadPipeline; Batch(std::vector *const ring, uint32_t begin_, uint32_t end_) : ring(ring), begin_(begin_), end_(end_) {} std::vector *const ring; @@ -450,29 +463,29 @@ struct StaticThreadPipeline { uint32_t end_; }; - // Static thread storage - fixed size array - std::array all_threads; - private: + WaitStrategy wait_strategy_; + PipelineTopology topology_; + alignas(128) std::atomic slots{0}; alignas(128) std::atomic pushes{0}; const uint32_t slot_count; const uint32_t slot_count_mask; std::vector ring; + std::vector all_threads; void initialize_all_threads() { - [&](std::index_sequence) { - (init_stage_threads(), ...); - }(std::make_index_sequence{}); + for (int stage = 0; stage < topology_.num_stages; ++stage) { + init_stage_threads(stage); + } } - template void init_stage_threads() { - constexpr int stage_offset = Topology::template stage_offset(); - constexpr int stage_thread_count = Topology::threads_per_stage[Stage]; - constexpr int prev_stage_threads = - Topology::template prev_stage_thread_count(); - constexpr bool is_last_stage = (Stage == Topology::num_stages - 1); + void init_stage_threads(int stage) { + int stage_offset = topology_.stage_offset(stage); + int stage_thread_count = topology_.threads_per_stage[stage]; + int prev_stage_threads = topology_.prev_stage_thread_count(stage); + bool is_last_stage = (stage == topology_.num_stages - 1); for (int thread = 0; thread < stage_thread_count; ++thread) { auto &thread_state = all_threads[stage_offset + thread]; @@ -481,14 +494,15 @@ private: } } - template - Batch acquire_helper(uint32_t maxBatch, bool mayBlock) { - constexpr int thread_idx = Topology::template thread_index(); + Batch acquire_helper(int stage, int thread, uint32_t maxBatch, + bool may_block) { + int thread_idx = topology_.thread_index(stage, thread); auto &thread_state = all_threads[thread_idx]; uint32_t begin = thread_state.local_pops & slot_count_mask; - uint32_t len = StaticPipelineAlgorithms::calculate_safe_len< - wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock); + uint32_t len = PipelineAlgorithms::calculate_safe_len( + wait_strategy_, topology_, stage, thread, all_threads, pushes, + may_block); if (maxBatch != 0) { len = std::min(len, maxBatch); @@ -503,13 +517,13 @@ private: } public: - template struct StageGuard { + struct StageGuard { Batch batch; ~StageGuard() { if (!batch.empty()) { - StaticPipelineAlgorithms::update_thread_pops( + PipelineAlgorithms::update_thread_pops( + pipeline->wait_strategy_, pipeline->topology_, stage, thread, pipeline->all_threads, local_pops); } } @@ -517,22 +531,28 @@ public: StageGuard(StageGuard const &) = delete; StageGuard &operator=(StageGuard const &) = delete; StageGuard(StageGuard &&other) noexcept - : batch(other.batch), local_pops(other.local_pops), + : batch(other.batch), local_pops(other.local_pops), stage(other.stage), + thread(other.thread), pipeline(std::exchange(other.pipeline, nullptr)) {} StageGuard &operator=(StageGuard &&other) noexcept { batch = other.batch; local_pops = other.local_pops; + stage = other.stage; + thread = other.thread; pipeline = std::exchange(other.pipeline, nullptr); return *this; } private: - friend struct StaticThreadPipeline; + friend struct ThreadPipeline; uint32_t local_pops; - StaticThreadPipeline *pipeline; + int stage; + int thread; + ThreadPipeline *pipeline; - StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline) - : batch(batch), local_pops(local_pops), + StageGuard(Batch batch, uint32_t local_pops, int stage, int thread, + ThreadPipeline *pipeline) + : batch(batch), local_pops(local_pops), stage(stage), thread(thread), pipeline(batch.empty() ? nullptr : pipeline) {} }; @@ -555,37 +575,30 @@ public: } private: - friend struct StaticThreadPipeline; + friend struct ThreadPipeline; ProducerGuard() : batch(), tp() {} - ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot, + ProducerGuard(Batch batch, ThreadPipeline *tp, uint32_t old_slot, uint32_t new_slot) : batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {} - StaticThreadPipeline *const tp; + ThreadPipeline *const tp; uint32_t old_slot; uint32_t new_slot; }; // Acquire a batch of items for processing by a consumer thread. - // Stage: which processing stage (0 = first consumer stage after producers) - - // compile-time parameter Thread: thread ID within the stage (0 to - // ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items - // to acquire (0 = no limit) may_block: whether to block waiting for items - // (false = return empty batch if none available) Returns: StageGuard with batch of items to process and compile-time type safety - template - [[nodiscard]] StageGuard acquire(int maxBatch = 0, - bool may_block = true) { - static_assert(Stage >= 0 && Stage < Topology::num_stages, - "Stage index out of bounds"); - static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage], - "Thread index out of bounds"); + // stage: which processing stage (0 = first consumer stage after producers) + // thread: thread ID within the stage (0 to threads_per_stage[stage]-1) + // maxBatch: maximum items to acquire (0 = no limit) + // may_block: whether to block waiting for items (false = return empty batch + // if none available) Returns: StageGuard with batch of items to process + [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0, + bool may_block = true) { + auto batch = acquire_helper(stage, thread, maxBatch, may_block); - auto batch = acquire_helper(maxBatch, may_block); - - constexpr int thread_idx = Topology::template thread_index(); + int thread_idx = topology_.thread_index(stage, thread); uint32_t local_pops = all_threads[thread_idx].local_pops; - return StageGuard{std::move(batch), local_pops, this}; + return StageGuard{std::move(batch), local_pops, stage, thread, this}; } // Reserve slots in the ring buffer for a producer thread to fill with items. @@ -618,9 +631,8 @@ public: slot = slots.load(std::memory_order_relaxed); begin = slot & slot_count_mask; - int capacity_result = - StaticPipelineAlgorithms::check_producer_capacity( - all_threads, slot, size, slot_count, block); + int capacity_result = PipelineAlgorithms::check_producer_capacity( + topology_, all_threads, slot, size, slot_count, block); if (capacity_result == 1) { continue; }