Make pipeline policy/topology configurable

This commit is contained in:
2025-11-06 15:55:27 -05:00
parent 9f8562e30f
commit f458c6b249
8 changed files with 396 additions and 311 deletions

View File

@@ -24,15 +24,15 @@ int main() {
} }
}); });
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1> ThreadPipeline<std::latch *> pipeline(WaitStrategy::WaitIfStageEmpty, {1},
pipeline(LOG_PIPELINE_SIZE); LOG_PIPELINE_SIZE);
std::latch done{0}; std::latch done{0};
// Stage 0 consumer thread // Stage 0 consumer thread
std::thread stage0_thread([&pipeline, &done]() { std::thread stage0_thread([&pipeline, &done]() {
for (;;) { for (;;) {
auto guard = pipeline.acquire<0, 0>(); auto guard = pipeline.acquire(0, 0);
for (auto &item : guard.batch) { for (auto &item : guard.batch) {
spend_cpu_cycles(BUSY_ITERS); spend_cpu_cycles(BUSY_ITERS);
@@ -89,15 +89,15 @@ int main() {
.warmup(100); .warmup(100);
for (int batch_size : {1, 4, 16, 64, 256}) { for (int batch_size : {1, 4, 16, 64, 256}) {
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1> ThreadPipeline<std::latch *> pipeline(WaitStrategy::WaitIfStageEmpty, {1},
pipeline(LOG_PIPELINE_SIZE); LOG_PIPELINE_SIZE);
std::latch done{0}; std::latch done{0};
// Stage 0 consumer thread // Stage 0 consumer thread
std::thread stage0_thread([&pipeline, &done]() { std::thread stage0_thread([&pipeline, &done]() {
for (;;) { for (;;) {
auto guard = pipeline.acquire<0, 0>(); auto guard = pipeline.acquire(0, 0);
for (auto &item : guard.batch) { for (auto &item : guard.batch) {
spend_cpu_cycles(BUSY_ITERS); spend_cpu_cycles(BUSY_ITERS);
@@ -142,74 +142,73 @@ int main() {
} }
// Helper function for wait strategy benchmarks // Helper function for wait strategy benchmarks
auto benchmark_wait_strategy = auto benchmark_wait_strategy = [](WaitStrategy strategy,
[]<WaitStrategy strategy>(const std::string &name, const std::string &name,
ankerl::nanobench::Bench &bench) { ankerl::nanobench::Bench &bench) {
constexpr int LOG_PIPELINE_SIZE = constexpr int LOG_PIPELINE_SIZE =
8; // Smaller buffer to increase contention 8; // Smaller buffer to increase contention
constexpr int NUM_ITEMS = 50'000; constexpr int NUM_ITEMS = 50'000;
constexpr int BATCH_SIZE = 4; // Small batches to increase coordination constexpr int BATCH_SIZE = 4; // Small batches to increase coordination
constexpr int BUSY_ITERS = constexpr int BUSY_ITERS =
10; // Light work to emphasize coordination overhead 10; // Light work to emphasize coordination overhead
StaticThreadPipeline<std::latch *, strategy, 1, 1> pipeline( ThreadPipeline<std::latch *> pipeline(strategy, {1, 1}, LOG_PIPELINE_SIZE);
LOG_PIPELINE_SIZE);
std::latch done{0}; std::latch done{0};
// Stage 0 worker // Stage 0 worker
std::thread stage0_thread([&pipeline, &done]() { std::thread stage0_thread([&pipeline, &done]() {
for (;;) { for (;;) {
auto guard = pipeline.template acquire<0, 0>(); auto guard = pipeline.acquire(0, 0);
for (auto &item : guard.batch) { for (auto &item : guard.batch) {
spend_cpu_cycles(BUSY_ITERS); spend_cpu_cycles(BUSY_ITERS);
if (item == &done) if (item == &done)
return; return;
}
}
});
// Stage 1 worker (final stage - always calls futex wake)
std::thread stage1_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.template acquire<1, 0>();
for (auto &item : guard.batch) {
spend_cpu_cycles(BUSY_ITERS);
if (item == &done)
return;
if (item)
item->count_down();
}
}
});
bench.run(name, [&] {
int items_pushed = 0;
while (items_pushed < NUM_ITEMS - 1) {
auto guard = pipeline.push(
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
auto it = guard.batch.begin();
items_pushed += guard.batch.size();
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
*it = nullptr;
}
}
std::latch finish{1};
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &finish;
}
finish.wait();
});
// Shutdown
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &done;
} }
stage0_thread.join(); }
stage1_thread.join(); });
};
// Stage 1 worker (final stage - always calls futex wake)
std::thread stage1_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.acquire(1, 0);
for (auto &item : guard.batch) {
spend_cpu_cycles(BUSY_ITERS);
if (item == &done)
return;
if (item)
item->count_down();
}
}
});
bench.run(name, [&] {
int items_pushed = 0;
while (items_pushed < NUM_ITEMS - 1) {
auto guard = pipeline.push(
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
auto it = guard.batch.begin();
items_pushed += guard.batch.size();
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
*it = nullptr;
}
}
std::latch finish{1};
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &finish;
}
finish.wait();
});
// Shutdown
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &done;
}
stage0_thread.join();
stage1_thread.join();
};
// Wait strategy comparison benchmark - multiple stages to trigger futex wakes // Wait strategy comparison benchmark - multiple stages to trigger futex wakes
{ {
@@ -220,12 +219,11 @@ int main() {
.relative(true) .relative(true)
.warmup(50); .warmup(50);
benchmark_wait_strategy.template operator()<WaitStrategy::WaitIfStageEmpty>( benchmark_wait_strategy(WaitStrategy::WaitIfStageEmpty, "WaitIfStageEmpty",
"WaitIfStageEmpty", bench); bench);
benchmark_wait_strategy.template benchmark_wait_strategy(WaitStrategy::WaitIfUpstreamIdle,
operator()<WaitStrategy::WaitIfUpstreamIdle>("WaitIfUpstreamIdle", bench); "WaitIfUpstreamIdle", bench);
benchmark_wait_strategy.template operator()<WaitStrategy::Never>("Never", benchmark_wait_strategy(WaitStrategy::Never, "Never", bench);
bench);
} }
// TODO: Add more benchmarks for: // TODO: Add more benchmarks for:

View File

@@ -28,13 +28,15 @@ Controls server networking, threading, and request handling behavior.
### Commit Configuration (`[commit]`) ### Commit Configuration (`[commit]`)
Controls behavior of the `/v1/commit` endpoint and request ID management. Controls behavior of the `/v1/commit` endpoint, request ID management, and commit pipeline threading.
| Parameter | Type | Default | Description | | Parameter | Type | Default | Description |
|-----------|------|---------|-------------| |-----------|------|---------|-------------|
| `min_request_id_length` | integer | `20` | Minimum length required for client-provided `request_id` fields to ensure sufficient entropy for collision avoidance | | `min_request_id_length` | integer | `20` | Minimum length required for client-provided `request_id` fields to ensure sufficient entropy for collision avoidance |
| `request_id_retention_hours` | integer | `24` | How long to retain request IDs in memory for `/v1/status` queries. Longer retention reduces the chance of `log_truncated` responses | | `request_id_retention_hours` | integer | `24` | How long to retain request IDs in memory for `/v1/status` queries. Longer retention reduces the chance of `log_truncated` responses |
| `request_id_retention_versions` | integer | `100000000` | Minimum number of versions to retain request IDs for, regardless of time. Provides additional protection against `log_truncated` responses | | `request_id_retention_versions` | integer | `100000000` | Minimum number of versions to retain request IDs for, regardless of time. Provides additional protection against `log_truncated` responses |
| `pipeline_wait_strategy` | string | `"WaitIfUpstreamIdle"` | Wait strategy for the commit pipeline. `"WaitIfStageEmpty"` = block when individual stages are empty (safe for shared CPUs), `"WaitIfUpstreamIdle"` = block only when all upstream stages are idle (requires dedicated cores, highest throughput), `"Never"` = never block, busy-wait continuously (requires dedicated cores, lowest latency) |
| `pipeline_release_threads` | integer | `1` | Number of threads in the release stage (final stage of commit pipeline). Higher values increase parallelism for connection release and response transmission |
### Subscription Configuration (`[subscription]`) ### Subscription Configuration (`[subscription]`)
@@ -77,6 +79,8 @@ read_buffer_size = 32768 # 32KB
min_request_id_length = 32 min_request_id_length = 32
request_id_retention_hours = 48 request_id_retention_hours = 48
request_id_retention_versions = 50000 request_id_retention_versions = 50000
pipeline_wait_strategy = "WaitIfUpstreamIdle" # Options: "WaitIfStageEmpty", "WaitIfUpstreamIdle", "Never"
pipeline_release_threads = 4 # Default: 1, increase for higher throughput
[subscription] [subscription]
max_buffer_size_bytes = 52428800 # 50MB max_buffer_size_bytes = 52428800 # 50MB
@@ -115,6 +119,11 @@ These configuration parameters directly affect server and API behavior:
- **`request_id_retention_*`**: Affects availability of data for `/v1/status` queries and likelihood of `log_truncated` responses - **`request_id_retention_*`**: Affects availability of data for `/v1/status` queries and likelihood of `log_truncated` responses
**Commit Pipeline Performance:**
- **`pipeline_wait_strategy`**: Controls CPU usage vs latency tradeoff in commit processing. `WaitIfStageEmpty` is safest for shared CPUs, `WaitIfUpstreamIdle` provides highest throughput with dedicated cores, `Never` provides lowest latency but uses 100% CPU
- **`pipeline_release_threads`**: Determines parallelism in the final stage of commit processing. More threads can improve throughput when processing many concurrent requests
**Subscription Streaming:** **Subscription Streaming:**
- **`max_buffer_size_bytes`**: Controls when `/v1/subscribe` connections are terminated due to slow consumption - **`max_buffer_size_bytes`**: Controls when `/v1/subscribe` connections are terminated due to slow consumption
@@ -137,6 +146,8 @@ The configuration system includes comprehensive validation with specific bounds
- **`min_request_id_length`**: Must be between 8 and 256 characters - **`min_request_id_length`**: Must be between 8 and 256 characters
- **`request_id_retention_hours`**: Must be between 1 and 8760 hours (1 year) - **`request_id_retention_hours`**: Must be between 1 and 8760 hours (1 year)
- **`request_id_retention_versions`**: Must be > 0 - **`request_id_retention_versions`**: Must be > 0
- **`pipeline_wait_strategy`**: Must be one of: `"WaitIfStageEmpty"`, `"WaitIfUpstreamIdle"`, or `"Never"`
- **`pipeline_release_threads`**: Must be between 1 and 64
### Subscription Configuration Limits ### Subscription Configuration Limits

View File

@@ -17,7 +17,9 @@ auto banned_request_ids_memory_gauge =
.create({}); .create({});
CommitPipeline::CommitPipeline(const weaseldb::Config &config) CommitPipeline::CommitPipeline(const weaseldb::Config &config)
: config_(config), pipeline_(lg_size) { : config_(config),
pipeline_(config.commit.pipeline_wait_strategy,
{1, 1, 1, config.commit.pipeline_release_threads}, lg_size) {
// Stage 0: Sequence assignment thread // Stage 0: Sequence assignment thread
sequence_thread_ = std::thread{[this]() { sequence_thread_ = std::thread{[this]() {
@@ -37,32 +39,35 @@ CommitPipeline::CommitPipeline(const weaseldb::Config &config)
run_persist_stage(); run_persist_stage();
}}; }};
// Stage 3: Connection return to server threads (2 threads) // Stage 3: Connection return to server threads (configurable count)
release_thread_1_ = std::thread{[this]() { release_threads_.reserve(config.commit.pipeline_release_threads);
pthread_setname_np(pthread_self(), "txn-release-1"); for (int i = 0; i < config.commit.pipeline_release_threads; ++i) {
run_release_stage<0>(); release_threads_.emplace_back([this, i]() {
}}; char name[16];
std::snprintf(name, sizeof(name), "txn-release-%d", i);
release_thread_2_ = std::thread{[this]() { pthread_setname_np(pthread_self(), name);
pthread_setname_np(pthread_self(), "txn-release-2"); run_release_stage(i);
run_release_stage<1>(); });
}}; }
} }
CommitPipeline::~CommitPipeline() { CommitPipeline::~CommitPipeline() {
// Send two shutdown signals for both release threads (adjacent in same batch) // Send shutdown signals for all release threads (adjacent in same batch)
{ {
auto guard = pipeline_.push(2, true); int num_release_threads = static_cast<int>(release_threads_.size());
guard.batch[0] = ShutdownEntry{}; auto guard = pipeline_.push(num_release_threads, true);
guard.batch[1] = ShutdownEntry{}; for (int i = 0; i < num_release_threads; ++i) {
guard.batch[i] = ShutdownEntry{};
}
} }
// Join all pipeline threads // Join all pipeline threads
sequence_thread_.join(); sequence_thread_.join();
resolve_thread_.join(); resolve_thread_.join();
persist_thread_.join(); persist_thread_.join();
release_thread_1_.join(); for (auto &thread : release_threads_) {
release_thread_2_.join(); thread.join();
}
} }
void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) { void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) {
@@ -92,8 +97,9 @@ void CommitPipeline::run_sequence_stage() {
BannedRequestIdSet banned_request_ids{ BannedRequestIdSet banned_request_ids{
ArenaStlAllocator<std::string_view>(&banned_request_arena)}; ArenaStlAllocator<std::string_view>(&banned_request_arena)};
for (int shutdowns_received = 0; shutdowns_received < 2;) { int expected_shutdowns = config_.commit.pipeline_release_threads;
auto guard = pipeline_.acquire<0, 0>(); for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) {
auto guard = pipeline_.acquire(0, 0);
auto &batch = guard.batch; auto &batch = guard.batch;
// Stage 0: Sequence assignment // Stage 0: Sequence assignment
@@ -160,8 +166,9 @@ void CommitPipeline::run_sequence_stage() {
// AVOID BLOCKING IN THIS STAGE! // AVOID BLOCKING IN THIS STAGE!
void CommitPipeline::run_resolve_stage() { void CommitPipeline::run_resolve_stage() {
for (int shutdowns_received = 0; shutdowns_received < 2;) { int expected_shutdowns = config_.commit.pipeline_release_threads;
auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) {
auto guard = pipeline_.acquire(1, 0, /*maxBatch*/ 1);
auto &batch = guard.batch; auto &batch = guard.batch;
// Stage 1: Precondition resolution // Stage 1: Precondition resolution
@@ -197,8 +204,9 @@ void CommitPipeline::run_resolve_stage() {
} }
void CommitPipeline::run_persist_stage() { void CommitPipeline::run_persist_stage() {
for (int shutdowns_received = 0; shutdowns_received < 2;) { int expected_shutdowns = config_.commit.pipeline_release_threads;
auto guard = pipeline_.acquire<2, 0>(); for (int shutdowns_received = 0; shutdowns_received < expected_shutdowns;) {
auto guard = pipeline_.acquire(2, 0);
auto &batch = guard.batch; auto &batch = guard.batch;
// Stage 2: Transaction persistence // Stage 2: Transaction persistence
@@ -289,9 +297,9 @@ void CommitPipeline::run_persist_stage() {
} }
} }
template <int thread_index> void CommitPipeline::run_release_stage() { void CommitPipeline::run_release_stage(int thread_index) {
for (int shutdowns_received = 0; shutdowns_received < 1;) { for (int shutdowns_received = 0; shutdowns_received < 1;) {
auto guard = pipeline_.acquire<3, thread_index>(); auto guard = pipeline_.acquire(3, thread_index);
auto &batch = guard.batch; auto &batch = guard.batch;
// Stage 3: Connection release // Stage 3: Connection release
@@ -302,7 +310,9 @@ template <int thread_index> void CommitPipeline::run_release_stage() {
// Partition work: thread 0 handles even indices, thread 1 handles odd // Partition work: thread 0 handles even indices, thread 1 handles odd
// indices // indices
if (static_cast<int>(it.index() % 2) != thread_index) { if (static_cast<int>(it.index() %
config_.commit.pipeline_release_threads) !=
thread_index) {
continue; continue;
} }

View File

@@ -93,27 +93,24 @@ private:
// Lock-free pipeline configuration // Lock-free pipeline configuration
static constexpr int lg_size = 16; // Ring buffer size (2^16 slots) static constexpr int lg_size = 16; // Ring buffer size (2^16 slots)
static constexpr auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
// 4-stage pipeline: sequence -> resolve -> persist -> release // 4-stage pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2> pipeline_; ThreadPipeline<PipelineEntry> pipeline_;
// Stage processing threads // Stage processing threads
std::thread sequence_thread_; std::thread sequence_thread_;
std::thread resolve_thread_; std::thread resolve_thread_;
std::thread persist_thread_; std::thread persist_thread_;
std::thread release_thread_1_; std::vector<std::thread> release_threads_;
std::thread release_thread_2_;
// Pipeline stage main loops // Pipeline stage main loops
void run_sequence_stage(); void run_sequence_stage();
void run_resolve_stage(); void run_resolve_stage();
void run_persist_stage(); void run_persist_stage();
template <int thread_index> void run_release_stage(); void run_release_stage(int thread_index);
// Pipeline batch type alias // Pipeline batch type alias
using BatchType = using BatchType = ThreadPipeline<PipelineEntry>::Batch;
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2>::Batch;
// Make non-copyable and non-movable // Make non-copyable and non-movable
CommitPipeline(const CommitPipeline &) = delete; CommitPipeline(const CommitPipeline &) = delete;

View File

@@ -126,6 +126,25 @@ void ConfigParser::parse_commit_config(const auto &toml_data,
config.request_id_retention_hours); config.request_id_retention_hours);
parse_field(commit, "request_id_retention_versions", parse_field(commit, "request_id_retention_versions",
config.request_id_retention_versions); config.request_id_retention_versions);
// Parse wait strategy
if (commit.contains("pipeline_wait_strategy")) {
std::string strategy_str =
toml::get<std::string>(commit.at("pipeline_wait_strategy"));
if (strategy_str == "WaitIfStageEmpty") {
config.pipeline_wait_strategy = WaitStrategy::WaitIfStageEmpty;
} else if (strategy_str == "WaitIfUpstreamIdle") {
config.pipeline_wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
} else if (strategy_str == "Never") {
config.pipeline_wait_strategy = WaitStrategy::Never;
} else {
std::cerr << "Warning: Unknown pipeline_wait_strategy '" << strategy_str
<< "', using default (WaitIfUpstreamIdle)" << std::endl;
}
}
parse_field(commit, "pipeline_release_threads",
config.pipeline_release_threads);
}); });
} }
@@ -253,6 +272,14 @@ bool ConfigParser::validate_config(const Config &config) {
valid = false; valid = false;
} }
if (config.commit.pipeline_release_threads < 1 ||
config.commit.pipeline_release_threads > 64) {
std::cerr << "Configuration error: commit.pipeline_release_threads must be "
"between 1 and 64, got "
<< config.commit.pipeline_release_threads << std::endl;
valid = false;
}
// Validate subscription configuration // Validate subscription configuration
if (config.subscription.max_buffer_size_bytes == 0) { if (config.subscription.max_buffer_size_bytes == 0) {
std::cerr << "Configuration error: subscription.max_buffer_size_bytes must " std::cerr << "Configuration error: subscription.max_buffer_size_bytes must "

View File

@@ -5,6 +5,8 @@
#include <string> #include <string>
#include <vector> #include <vector>
#include "thread_pipeline.hpp"
namespace weaseldb { namespace weaseldb {
/** /**
@@ -60,6 +62,16 @@ struct CommitConfig {
std::chrono::hours request_id_retention_hours{24}; std::chrono::hours request_id_retention_hours{24};
/// Minimum number of commit versions to retain request IDs for /// Minimum number of commit versions to retain request IDs for
int64_t request_id_retention_versions = 100000000; int64_t request_id_retention_versions = 100000000;
/// Wait strategy for the commit pipeline
/// - WaitIfStageEmpty: Block when individual stages are empty (default, safe
/// for shared CPUs)
/// - WaitIfUpstreamIdle: Block only when all upstream stages are idle
/// (requires dedicated cores)
/// - Never: Never block, busy-wait continuously (requires dedicated cores)
WaitStrategy pipeline_wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
/// Number of threads in the release stage (final stage of commit pipeline)
/// Default: 1 thread for simplicity (can increase for higher throughput)
int pipeline_release_threads = 1;
}; };
/** /**

View File

@@ -246,6 +246,24 @@ int main(int argc, char *argv[]) {
std::cout << "Request ID retention: " std::cout << "Request ID retention: "
<< config->commit.request_id_retention_hours.count() << " hours" << config->commit.request_id_retention_hours.count() << " hours"
<< std::endl; << std::endl;
// Print pipeline configuration
std::string wait_strategy_str;
switch (config->commit.pipeline_wait_strategy) {
case WaitStrategy::WaitIfStageEmpty:
wait_strategy_str = "WaitIfStageEmpty";
break;
case WaitStrategy::WaitIfUpstreamIdle:
wait_strategy_str = "WaitIfUpstreamIdle";
break;
case WaitStrategy::Never:
wait_strategy_str = "Never";
break;
}
std::cout << "Pipeline wait strategy: " << wait_strategy_str << std::endl;
std::cout << "Pipeline release threads: "
<< config->commit.pipeline_release_threads << std::endl;
std::cout << "Subscription buffer size: " std::cout << "Subscription buffer size: "
<< config->subscription.max_buffer_size_bytes << " bytes" << config->subscription.max_buffer_size_bytes << " bytes"
<< std::endl; << std::endl;

View File

@@ -1,6 +1,5 @@
#pragma once #pragma once
#include <array>
#include <atomic> #include <atomic>
#include <cassert> #include <cassert>
#include <cstddef> #include <cstddef>
@@ -48,151 +47,174 @@ struct ThreadState {
bool last_stage; bool last_stage;
}; };
// Compile-time topology configuration for static pipelines // Runtime topology configuration for dynamic pipelines
// //
// This template defines a pipeline topology at compile-time: // This class defines a pipeline topology at runtime:
// - Stage and thread calculations done at compile-time // - Stage and thread calculations done at runtime
// - Type-safe indexing: Stage and thread indices validated at compile-time // - Flexible configuration: topology can be set via constructor
// - Fixed-size arrays with known bounds // - Dynamic arrays with runtime bounds checking
// - Code specialization for each topology // - Single implementation works for any topology
// //
// Example: StaticPipelineTopology<1, 4, 2> creates: // Example: PipelineTopology({1, 4, 2}) creates:
// - Stage 0: 1 thread (index 0) // - Stage 0: 1 thread (index 0)
// - Stage 1: 4 threads (indices 1-4) // - Stage 1: 4 threads (indices 1-4)
// - Stage 2: 2 threads (indices 5-6) // - Stage 2: 2 threads (indices 5-6)
// - Total: 7 threads across 3 stages // - Total: 7 threads across 3 stages
template <int... ThreadsPerStage> struct StaticPipelineTopology { struct PipelineTopology {
static_assert(sizeof...(ThreadsPerStage) > 0, const std::vector<int> threads_per_stage;
"Must specify at least one stage"); const int num_stages;
static_assert(((ThreadsPerStage > 0) && ...), const std::vector<int> stage_offsets;
"All stages must have at least one thread"); const int total_threads;
static constexpr int num_stages = sizeof...(ThreadsPerStage); explicit PipelineTopology(std::vector<int> threads_per_stage_)
static constexpr std::array<int, num_stages> threads_per_stage = { : threads_per_stage(validate_and_move(std::move(threads_per_stage_))),
ThreadsPerStage...}; num_stages(static_cast<int>(threads_per_stage.size())),
static constexpr int total_threads = (ThreadsPerStage + ...); stage_offsets(build_stage_offsets(threads_per_stage)),
total_threads(build_total_threads(threads_per_stage)) {}
// Compile-time stage offset calculation // Runtime stage offset calculation
template <int Stage> static constexpr int stage_offset() { int stage_offset(int stage) const {
static_assert(Stage >= 0 && Stage < num_stages, if (stage < 0 || stage >= num_stages) {
"Stage index out of bounds"); std::abort(); // Stage index out of bounds
if constexpr (Stage == 0) {
return 0;
} else {
return stage_offset<Stage - 1>() + threads_per_stage[Stage - 1];
} }
return stage_offsets[stage];
} }
// Compile-time thread index calculation // Runtime thread index calculation
template <int Stage, int Thread> static constexpr int thread_index() { int thread_index(int stage, int thread) const {
static_assert(Stage >= 0 && Stage < num_stages, if (stage < 0 || stage >= num_stages) {
"Stage index out of bounds"); std::abort(); // Stage index out of bounds
static_assert(Thread >= 0 && Thread < threads_per_stage[Stage], }
"Thread index out of bounds"); if (thread < 0 || thread >= threads_per_stage[stage]) {
return stage_offset<Stage>() + Thread; std::abort(); // Thread index out of bounds
}
return stage_offsets[stage] + thread;
} }
// Compile-time previous stage thread count // Runtime previous stage thread count
template <int Stage> static constexpr int prev_stage_thread_count() { int prev_stage_thread_count(int stage) const {
static_assert(Stage >= 0 && Stage < num_stages, if (stage < 0 || stage >= num_stages) {
"Stage index out of bounds"); std::abort(); // Stage index out of bounds
if constexpr (Stage == 0) { }
if (stage == 0) {
return 1; return 1;
} else { } else {
return threads_per_stage[Stage - 1]; return threads_per_stage[stage - 1];
} }
} }
private:
static std::vector<int> validate_and_move(std::vector<int> threads) {
if (threads.empty()) {
std::abort(); // Must specify at least one stage
}
for (int count : threads) {
if (count <= 0) {
std::abort(); // All stages must have at least one thread
}
}
return threads;
}
static std::vector<int>
build_stage_offsets(const std::vector<int> &threads_per_stage) {
std::vector<int> offsets(threads_per_stage.size());
int offset = 0;
for (size_t i = 0; i < threads_per_stage.size(); ++i) {
offsets[i] = offset;
offset += threads_per_stage[i];
}
return offsets;
}
static int build_total_threads(const std::vector<int> &threads_per_stage) {
int total = 0;
for (int count : threads_per_stage) {
total += count;
}
return total;
}
}; };
// Static pipeline algorithms - compile-time specialized versions // Pipeline algorithms - runtime configurable versions
namespace StaticPipelineAlgorithms { namespace PipelineAlgorithms {
template <WaitStrategy wait_strategy, typename Topology, int Stage, inline uint32_t calculate_safe_len(WaitStrategy wait_strategy,
int ThreadInStage> const PipelineTopology &topology, int stage,
uint32_t calculate_safe_len( int thread_in_stage,
std::array<ThreadState, Topology::total_threads> &all_threads, std::vector<ThreadState> &all_threads,
std::atomic<uint32_t> &pushes, bool may_block) { std::atomic<uint32_t> &pushes,
constexpr int thread_idx = bool may_block) {
Topology::template thread_index<Stage, ThreadInStage>(); int thread_idx = topology.thread_index(stage, thread_in_stage);
auto &thread = all_threads[thread_idx]; auto &thread = all_threads[thread_idx];
uint32_t safe_len = UINT32_MAX; uint32_t safe_len = UINT32_MAX;
constexpr int prev_stage_threads = int prev_stage_threads = topology.prev_stage_thread_count(stage);
Topology::template prev_stage_thread_count<Stage>();
// Compile-time loop over previous stage threads // Runtime loop over previous stage threads
[&]<std::size_t... Is>(std::index_sequence<Is...>) { for (int i = 0; i < prev_stage_threads; ++i) {
( std::atomic<uint32_t> &last_push = [&]() -> std::atomic<uint32_t> & {
[&] { if (stage == 0) {
auto &last_push = [&]() -> std::atomic<uint32_t> & { return pushes;
if constexpr (Stage == 0) { } else {
return pushes; int prev_thread_idx = topology.thread_index(stage - 1, i);
} else { return all_threads[prev_thread_idx].pops;
constexpr int prev_thread_idx = }
Topology::template thread_index<Stage - 1, Is>(); }();
return all_threads[prev_thread_idx].pops;
if (thread.last_push_read[i] == thread.local_pops) {
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
if (thread.last_push_read[i] == thread.local_pops) {
if (!may_block) {
safe_len = 0;
return safe_len;
}
if (wait_strategy == WaitStrategy::Never) {
// Empty - busy wait
} else if (wait_strategy == WaitStrategy::WaitIfUpstreamIdle) {
// We're allowed to spin as long as we eventually go to 0% cpu
// usage on idle
uint32_t push;
bool should_wait = true;
for (int j = 0; j < 100000; ++j) {
push = pushes.load(std::memory_order_relaxed);
if (push != thread.local_pops) {
should_wait = false;
break;
} }
}();
if (thread.last_push_read[Is] == thread.local_pops) {
thread.last_push_read[Is] =
last_push.load(std::memory_order_acquire);
if (thread.last_push_read[Is] == thread.local_pops) {
if (!may_block) {
safe_len = 0;
return;
}
if constexpr (wait_strategy == WaitStrategy::Never) {
// Empty - busy wait
} else if constexpr (wait_strategy ==
WaitStrategy::WaitIfUpstreamIdle) {
// We're allowed to spin as long as we eventually go to 0% cpu
// usage on idle
uint32_t push;
for (int i = 0; i < 100000; ++i) {
push = pushes.load(std::memory_order_relaxed);
if (push != thread.local_pops) {
goto dont_wait;
}
#if defined(__x86_64__) || defined(_M_X64) #if defined(__x86_64__) || defined(_M_X64)
_mm_pause(); _mm_pause();
#endif #endif
}
pushes.wait(push, std::memory_order_relaxed);
dont_wait:;
} else {
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
last_push.wait(thread.last_push_read[Is],
std::memory_order_relaxed);
}
thread.last_push_read[Is] =
last_push.load(std::memory_order_acquire);
}
} }
safe_len = if (should_wait) {
std::min(safe_len, thread.last_push_read[Is] - thread.local_pops); pushes.wait(push, std::memory_order_relaxed);
}(), }
...); } else { // WaitStrategy::WaitIfStageEmpty
}(std::make_index_sequence<prev_stage_threads>{}); last_push.wait(thread.last_push_read[i], std::memory_order_relaxed);
}
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
}
}
safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops);
}
return safe_len; return safe_len;
} }
template <WaitStrategy wait_strategy, typename Topology, int Stage, inline void update_thread_pops(WaitStrategy wait_strategy,
int ThreadInStage> const PipelineTopology &topology, int stage,
void update_thread_pops( int thread_in_stage,
std::array<ThreadState, Topology::total_threads> &all_threads, std::vector<ThreadState> &all_threads,
uint32_t local_pops) { uint32_t local_pops) {
constexpr int thread_idx = int thread_idx = topology.thread_index(stage, thread_in_stage);
Topology::template thread_index<Stage, ThreadInStage>();
auto &thread_state = all_threads[thread_idx]; auto &thread_state = all_threads[thread_idx];
if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) { if (wait_strategy == WaitStrategy::WaitIfStageEmpty) {
thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all(); thread_state.pops.notify_all();
} else if constexpr (Stage == Topology::num_stages - 1) { // last stage } else if (stage == topology.num_stages - 1) { // last stage
thread_state.pops.store(local_pops, std::memory_order_seq_cst); thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all(); thread_state.pops.notify_all();
} else { } else {
@@ -200,15 +222,13 @@ void update_thread_pops(
} }
} }
template <typename Topology> inline int check_producer_capacity(const PipelineTopology &topology,
int check_producer_capacity( std::vector<ThreadState> &all_threads,
std::array<ThreadState, Topology::total_threads> &all_threads, uint32_t slot, uint32_t size,
uint32_t slot, uint32_t size, uint32_t slot_count, bool block) { uint32_t slot_count, bool block) {
constexpr int last_stage = Topology::num_stages - 1; int last_stage = topology.num_stages - 1;
constexpr int last_stage_offset = int last_stage_offset = topology.stage_offset(last_stage);
Topology::template stage_offset<last_stage>(); int last_stage_thread_count = topology.threads_per_stage[last_stage];
constexpr int last_stage_thread_count =
Topology::threads_per_stage[last_stage];
for (int i = 0; i < last_stage_thread_count; ++i) { for (int i = 0; i < last_stage_thread_count; ++i) {
auto &thread = all_threads[last_stage_offset + i]; auto &thread = all_threads[last_stage_offset + i];
@@ -223,10 +243,10 @@ int check_producer_capacity(
} }
return 0; // Can proceed return 0; // Can proceed
} }
} // namespace StaticPipelineAlgorithms } // namespace PipelineAlgorithms
// Static multi-stage lock-free pipeline for inter-thread communication // Multi-stage lock-free pipeline for inter-thread communication
// with compile-time topology specification. // with runtime-configurable topology and wait strategy.
// //
// Overview: // Overview:
// - Items flow from producers through multiple processing stages (stage 0 -> // - Items flow from producers through multiple processing stages (stage 0 ->
@@ -234,25 +254,17 @@ int check_producer_capacity(
// - Each stage can have multiple worker threads processing items in parallel // - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination // - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency // - Supports batch processing for efficiency
// - Compile-time topology specification via template parameters // - Runtime-configurable topology and wait strategy via constructor parameters
// //
// Architecture: // Architecture:
// - Producers: External threads that add items to the pipeline via push() // - Producers: External threads that add items to the pipeline via push()
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via // - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
// acquire<Stage, Thread>() // acquire(stage, thread)
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage // - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
// //
// Differences from Dynamic Version:
// - Template parameters specify topology at compile-time (e.g., <Item,
// WaitStrategy::Never, 1, 4, 2>)
// - Stage and thread indices are template parameters, validated at compile-time
// - Fixed-size arrays replace dynamic vectors
// - Specialized algorithms for each stage/thread combination
// - Type-safe guards prevent runtime indexing errors
//
// Usage Pattern: // Usage Pattern:
// using Pipeline = StaticThreadPipeline<Item, WaitStrategy::WaitIfStageEmpty, // ThreadPipeline<Item> pipeline(WaitStrategy::WaitIfStageEmpty, {1, 4, 2},
// 1, 4, 2>; Pipeline pipeline(lgSlotCount); // lgSlotCount);
// //
// // Producer threads (add items for stage 0 to consume): // // Producer threads (add items for stage 0 to consume):
// auto guard = pipeline.push(batchSize, /*block=*/true); // auto guard = pipeline.push(batchSize, /*block=*/true);
@@ -262,20 +274,21 @@ int check_producer_capacity(
// // Guard destructor publishes batch to stage 0 consumers // // Guard destructor publishes batch to stage 0 consumers
// //
// // Stage worker threads (process items and pass to next stage): // // Stage worker threads (process items and pass to next stage):
// auto guard = pipeline.acquire<Stage, Thread>(maxBatch, /*may_block=*/true); // auto guard = pipeline.acquire(stage, thread, maxBatch, /*may_block=*/true);
// for (auto& item : guard.batch) { // for (auto& item : guard.batch) {
// // Process item // // Process item
// } // }
// // Guard destructor marks items as consumed and available to next stage // // Guard destructor marks items as consumed and available to next stage
// //
// Multi-Thread Stage Processing: // Multi-Thread Stage Processing:
// When a stage has multiple threads (e.g., 1, 1, 1, 2 = 2 threads in stage 3): // When a stage has multiple threads (e.g., {1, 1, 1, 2} = 2 threads in stage
// 3):
// //
// OVERLAPPING BATCHES - EACH THREAD SEES EVERY ENTRY: // OVERLAPPING BATCHES - EACH THREAD SEES EVERY ENTRY:
// - Multiple threads in the same stage get OVERLAPPING batches from the ring // - Multiple threads in the same stage get OVERLAPPING batches from the ring
// buffer // buffer
// - Thread 0: calls acquire<3, 0>() - gets batch from ring positions 100-110 // - Thread 0: calls acquire(3, 0) - gets batch from ring positions 100-110
// - Thread 1: calls acquire<3, 1>() - gets batch from ring positions 100-110 // - Thread 1: calls acquire(3, 1) - gets batch from ring positions 100-110
// (SAME) // (SAME)
// - Both threads see the same entries and must coordinate processing // - Both threads see the same entries and must coordinate processing
// //
@@ -319,27 +332,27 @@ int check_producer_capacity(
// ordering // ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available // - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions // - RAII guards ensure proper cleanup even with exceptions
template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage> template <class T> struct ThreadPipeline {
struct StaticThreadPipeline {
using Topology = StaticPipelineTopology<ThreadsPerStage...>;
// Constructor // Constructor
// wait_strategy: blocking behavior when no work is available
// threads_per_stage: number of threads in each stage (e.g., {1, 4, 2})
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots) // lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// Template parameters specify pipeline topology (e.g., <Item, Never, 1, 4, // Note: Producer threads are external to the pipeline and not counted in
// 2>) Note: Producer threads are external to the pipeline and not counted in // threads_per_stage
// ThreadsPerStage explicit ThreadPipeline(WaitStrategy wait_strategy,
explicit StaticThreadPipeline(int lgSlotCount) std::vector<int> threads_per_stage, int lgSlotCount)
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1), : wait_strategy_(wait_strategy), topology_(std::move(threads_per_stage)),
ring(slot_count) { slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
ring(slot_count), all_threads(topology_.total_threads) {
// Otherwise we can't tell the difference between full and empty. // Otherwise we can't tell the difference between full and empty.
assert(!(slot_count_mask & 0x80000000)); assert(!(slot_count_mask & 0x80000000));
initialize_all_threads(); initialize_all_threads();
} }
StaticThreadPipeline(StaticThreadPipeline const &) = delete; ThreadPipeline(ThreadPipeline const &) = delete;
StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete; ThreadPipeline &operator=(ThreadPipeline const &) = delete;
StaticThreadPipeline(StaticThreadPipeline &&) = delete; ThreadPipeline(ThreadPipeline &&) = delete;
StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete; ThreadPipeline &operator=(ThreadPipeline &&) = delete;
struct Batch { struct Batch {
Batch() : ring(), begin_(), end_() {} Batch() : ring(), begin_(), end_() {}
@@ -442,7 +455,7 @@ struct StaticThreadPipeline {
} }
private: private:
friend struct StaticThreadPipeline; friend struct ThreadPipeline;
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_) Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
: ring(ring), begin_(begin_), end_(end_) {} : ring(ring), begin_(begin_), end_(end_) {}
std::vector<T> *const ring; std::vector<T> *const ring;
@@ -450,29 +463,29 @@ struct StaticThreadPipeline {
uint32_t end_; uint32_t end_;
}; };
// Static thread storage - fixed size array
std::array<ThreadState, Topology::total_threads> all_threads;
private: private:
WaitStrategy wait_strategy_;
PipelineTopology topology_;
alignas(128) std::atomic<uint32_t> slots{0}; alignas(128) std::atomic<uint32_t> slots{0};
alignas(128) std::atomic<uint32_t> pushes{0}; alignas(128) std::atomic<uint32_t> pushes{0};
const uint32_t slot_count; const uint32_t slot_count;
const uint32_t slot_count_mask; const uint32_t slot_count_mask;
std::vector<T> ring; std::vector<T> ring;
std::vector<ThreadState> all_threads;
void initialize_all_threads() { void initialize_all_threads() {
[&]<std::size_t... StageIndices>(std::index_sequence<StageIndices...>) { for (int stage = 0; stage < topology_.num_stages; ++stage) {
(init_stage_threads<StageIndices>(), ...); init_stage_threads(stage);
}(std::make_index_sequence<Topology::num_stages>{}); }
} }
template <int Stage> void init_stage_threads() { void init_stage_threads(int stage) {
constexpr int stage_offset = Topology::template stage_offset<Stage>(); int stage_offset = topology_.stage_offset(stage);
constexpr int stage_thread_count = Topology::threads_per_stage[Stage]; int stage_thread_count = topology_.threads_per_stage[stage];
constexpr int prev_stage_threads = int prev_stage_threads = topology_.prev_stage_thread_count(stage);
Topology::template prev_stage_thread_count<Stage>(); bool is_last_stage = (stage == topology_.num_stages - 1);
constexpr bool is_last_stage = (Stage == Topology::num_stages - 1);
for (int thread = 0; thread < stage_thread_count; ++thread) { for (int thread = 0; thread < stage_thread_count; ++thread) {
auto &thread_state = all_threads[stage_offset + thread]; auto &thread_state = all_threads[stage_offset + thread];
@@ -481,14 +494,15 @@ private:
} }
} }
template <int Stage, int Thread> Batch acquire_helper(int stage, int thread, uint32_t maxBatch,
Batch acquire_helper(uint32_t maxBatch, bool mayBlock) { bool may_block) {
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>(); int thread_idx = topology_.thread_index(stage, thread);
auto &thread_state = all_threads[thread_idx]; auto &thread_state = all_threads[thread_idx];
uint32_t begin = thread_state.local_pops & slot_count_mask; uint32_t begin = thread_state.local_pops & slot_count_mask;
uint32_t len = StaticPipelineAlgorithms::calculate_safe_len< uint32_t len = PipelineAlgorithms::calculate_safe_len(
wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock); wait_strategy_, topology_, stage, thread, all_threads, pushes,
may_block);
if (maxBatch != 0) { if (maxBatch != 0) {
len = std::min(len, maxBatch); len = std::min(len, maxBatch);
@@ -503,13 +517,13 @@ private:
} }
public: public:
template <int Stage, int Thread> struct StageGuard { struct StageGuard {
Batch batch; Batch batch;
~StageGuard() { ~StageGuard() {
if (!batch.empty()) { if (!batch.empty()) {
StaticPipelineAlgorithms::update_thread_pops<wait_strategy, Topology, PipelineAlgorithms::update_thread_pops(
Stage, Thread>( pipeline->wait_strategy_, pipeline->topology_, stage, thread,
pipeline->all_threads, local_pops); pipeline->all_threads, local_pops);
} }
} }
@@ -517,22 +531,28 @@ public:
StageGuard(StageGuard const &) = delete; StageGuard(StageGuard const &) = delete;
StageGuard &operator=(StageGuard const &) = delete; StageGuard &operator=(StageGuard const &) = delete;
StageGuard(StageGuard &&other) noexcept StageGuard(StageGuard &&other) noexcept
: batch(other.batch), local_pops(other.local_pops), : batch(other.batch), local_pops(other.local_pops), stage(other.stage),
thread(other.thread),
pipeline(std::exchange(other.pipeline, nullptr)) {} pipeline(std::exchange(other.pipeline, nullptr)) {}
StageGuard &operator=(StageGuard &&other) noexcept { StageGuard &operator=(StageGuard &&other) noexcept {
batch = other.batch; batch = other.batch;
local_pops = other.local_pops; local_pops = other.local_pops;
stage = other.stage;
thread = other.thread;
pipeline = std::exchange(other.pipeline, nullptr); pipeline = std::exchange(other.pipeline, nullptr);
return *this; return *this;
} }
private: private:
friend struct StaticThreadPipeline; friend struct ThreadPipeline;
uint32_t local_pops; uint32_t local_pops;
StaticThreadPipeline *pipeline; int stage;
int thread;
ThreadPipeline *pipeline;
StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline) StageGuard(Batch batch, uint32_t local_pops, int stage, int thread,
: batch(batch), local_pops(local_pops), ThreadPipeline *pipeline)
: batch(batch), local_pops(local_pops), stage(stage), thread(thread),
pipeline(batch.empty() ? nullptr : pipeline) {} pipeline(batch.empty() ? nullptr : pipeline) {}
}; };
@@ -555,37 +575,30 @@ public:
} }
private: private:
friend struct StaticThreadPipeline; friend struct ThreadPipeline;
ProducerGuard() : batch(), tp() {} ProducerGuard() : batch(), tp() {}
ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot, ProducerGuard(Batch batch, ThreadPipeline *tp, uint32_t old_slot,
uint32_t new_slot) uint32_t new_slot)
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {} : batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
StaticThreadPipeline *const tp; ThreadPipeline *const tp;
uint32_t old_slot; uint32_t old_slot;
uint32_t new_slot; uint32_t new_slot;
}; };
// Acquire a batch of items for processing by a consumer thread. // Acquire a batch of items for processing by a consumer thread.
// Stage: which processing stage (0 = first consumer stage after producers) - // stage: which processing stage (0 = first consumer stage after producers)
// compile-time parameter Thread: thread ID within the stage (0 to // thread: thread ID within the stage (0 to threads_per_stage[stage]-1)
// ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items // maxBatch: maximum items to acquire (0 = no limit)
// to acquire (0 = no limit) may_block: whether to block waiting for items // may_block: whether to block waiting for items (false = return empty batch
// (false = return empty batch if none available) Returns: StageGuard<Stage, // if none available) Returns: StageGuard with batch of items to process
// Thread> with batch of items to process and compile-time type safety [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
template <int Stage, int Thread> bool may_block = true) {
[[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0, auto batch = acquire_helper(stage, thread, maxBatch, may_block);
bool may_block = true) {
static_assert(Stage >= 0 && Stage < Topology::num_stages,
"Stage index out of bounds");
static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage],
"Thread index out of bounds");
auto batch = acquire_helper<Stage, Thread>(maxBatch, may_block); int thread_idx = topology_.thread_index(stage, thread);
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
uint32_t local_pops = all_threads[thread_idx].local_pops; uint32_t local_pops = all_threads[thread_idx].local_pops;
return StageGuard<Stage, Thread>{std::move(batch), local_pops, this}; return StageGuard{std::move(batch), local_pops, stage, thread, this};
} }
// Reserve slots in the ring buffer for a producer thread to fill with items. // Reserve slots in the ring buffer for a producer thread to fill with items.
@@ -618,9 +631,8 @@ public:
slot = slots.load(std::memory_order_relaxed); slot = slots.load(std::memory_order_relaxed);
begin = slot & slot_count_mask; begin = slot & slot_count_mask;
int capacity_result = int capacity_result = PipelineAlgorithms::check_producer_capacity(
StaticPipelineAlgorithms::check_producer_capacity<Topology>( topology_, all_threads, slot, size, slot_count, block);
all_threads, slot, size, slot_count, block);
if (capacity_result == 1) { if (capacity_result == 1) {
continue; continue;
} }