StaticThreadPipeline
This commit is contained in:
@@ -64,33 +64,47 @@ struct HttpConnectionState {
|
||||
*/
|
||||
struct HttpHandler : ConnectionHandler {
|
||||
HttpHandler() {
|
||||
for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) {
|
||||
finalStageThreads.emplace_back([this, threadId]() {
|
||||
pthread_setname_np(pthread_self(),
|
||||
("stage-1-" + std::to_string(threadId)).c_str());
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire(1, threadId);
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % kFinalStageThreads) == threadId) {
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "pipeline thread",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-0");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 0>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-1");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 1>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
stage0Thread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-0");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire(0, 0, 0, false);
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
auto &c = *it;
|
||||
auto guard = pipeline.acquire<0, 0>();
|
||||
for (auto &c : guard.batch) {
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
@@ -102,7 +116,7 @@ struct HttpHandler : ConnectionHandler {
|
||||
}
|
||||
~HttpHandler() {
|
||||
{
|
||||
auto guard = pipeline.push(kFinalStageThreads, true);
|
||||
auto guard = pipeline.push(2, true);
|
||||
for (auto &c : guard.batch) {
|
||||
c = {};
|
||||
}
|
||||
@@ -137,8 +151,9 @@ struct HttpHandler : ConnectionHandler {
|
||||
private:
|
||||
static constexpr int kFinalStageThreads = 2;
|
||||
static constexpr int kLogSize = 12;
|
||||
ThreadPipeline<std::unique_ptr<Connection>> pipeline{
|
||||
kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}};
|
||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||
WaitStrategy::WaitIfStageEmpty, 1, 2>
|
||||
pipeline{kLogSize};
|
||||
std::thread stage0Thread;
|
||||
std::vector<std::thread> finalStageThreads;
|
||||
|
||||
|
||||
@@ -1,152 +1,155 @@
|
||||
#pragma once
|
||||
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <functional>
|
||||
#include <iterator>
|
||||
#include <numeric>
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// Topology configuration - separates stage layout from pipeline logic
|
||||
struct PipelineTopology {
|
||||
std::vector<int> threads_per_stage;
|
||||
int num_stages;
|
||||
int total_threads;
|
||||
|
||||
PipelineTopology(const std::vector<int> &threads)
|
||||
: threads_per_stage(threads), num_stages(threads.size()) {
|
||||
assert(!threads.empty());
|
||||
assert(std::all_of(threads.begin(), threads.end(),
|
||||
[](int t) { return t > 0; }));
|
||||
total_threads = std::accumulate(threads.begin(), threads.end(), 0);
|
||||
}
|
||||
|
||||
// Get the flat array offset for the first thread of a stage
|
||||
int stage_offset(int stage) const {
|
||||
assert(stage >= 0 && stage < num_stages);
|
||||
int offset = 0;
|
||||
for (int i = 0; i < stage; ++i) {
|
||||
offset += threads_per_stage[i];
|
||||
}
|
||||
return offset;
|
||||
}
|
||||
|
||||
// Get flat array index for a specific thread in a stage
|
||||
int thread_index(int stage, int thread) const {
|
||||
assert(stage >= 0 && stage < num_stages);
|
||||
assert(thread >= 0 && thread < threads_per_stage[stage]);
|
||||
return stage_offset(stage) + thread;
|
||||
}
|
||||
|
||||
// Get number of threads in the previous stage (for initialization)
|
||||
int prev_stage_thread_count(int stage) const {
|
||||
return (stage == 0) ? 1 : threads_per_stage[stage - 1];
|
||||
}
|
||||
};
|
||||
|
||||
// Wait strategies for controlling thread blocking behavior when no work is
|
||||
// available
|
||||
enum class WaitStrategy {
|
||||
// Never block - threads busy-wait (spin) when no work available.
|
||||
// Stage threads will always use 100% CPU even when idle.
|
||||
// Requires dedicated CPU cores to avoid scheduler thrashing.
|
||||
// Use when: latency is critical and you have spare cores.
|
||||
Never,
|
||||
|
||||
// Block only when all upstream stages are idle (no new work entering
|
||||
// pipeline).
|
||||
// Downstream threads busy-wait if upstream has work but not for their stage.
|
||||
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
|
||||
// Requires dedicated cores to avoid priority inversion when pipeline has
|
||||
// work.
|
||||
// Use when: high throughput with spare cores and sustained workloads.
|
||||
WaitIfUpstreamIdle,
|
||||
|
||||
// Block when individual stages are empty (original behavior).
|
||||
// Each stage waits independently on its input sources.
|
||||
// Safe for shared CPU environments, works well with variable workloads.
|
||||
// Use when: general purpose, shared cores, or unpredictable workloads.
|
||||
WaitIfStageEmpty,
|
||||
};
|
||||
|
||||
// Core thread state - extracted from pipeline concerns
|
||||
// Core thread state
|
||||
struct ThreadState {
|
||||
// Where this thread has published up to
|
||||
alignas(128) std::atomic<uint32_t> pops{0};
|
||||
// Where this thread will publish to the next time it publishes, or if idle
|
||||
// where it has published to
|
||||
uint32_t local_pops{0};
|
||||
// Where the previous stage's threads have published up to last we checked
|
||||
std::vector<uint32_t> last_push_read;
|
||||
bool last_stage;
|
||||
};
|
||||
|
||||
// Core pipeline algorithms - independent of storage layout
|
||||
namespace PipelineAlgorithms {
|
||||
// Compile-time topology configuration
|
||||
template <int... ThreadsPerStage> struct StaticPipelineTopology {
|
||||
static_assert(sizeof...(ThreadsPerStage) > 0,
|
||||
"Must specify at least one stage");
|
||||
static_assert(((ThreadsPerStage > 0) && ...),
|
||||
"All stages must have at least one thread");
|
||||
|
||||
// Calculate how many items can be safely acquired from a stage
|
||||
template <WaitStrategy wait_strategy>
|
||||
uint32_t calculate_safe_len(const PipelineTopology &topology,
|
||||
std::vector<ThreadState> &all_threads,
|
||||
std::atomic<uint32_t> &pushes, int stage,
|
||||
int thread_in_stage, bool may_block) {
|
||||
int thread_idx = topology.thread_index(stage, thread_in_stage);
|
||||
static constexpr int num_stages = sizeof...(ThreadsPerStage);
|
||||
static constexpr std::array<int, num_stages> threads_per_stage = {
|
||||
ThreadsPerStage...};
|
||||
static constexpr int total_threads = (ThreadsPerStage + ...);
|
||||
|
||||
// Compile-time stage offset calculation
|
||||
template <int Stage> static constexpr int stage_offset() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
if constexpr (Stage == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return stage_offset<Stage - 1>() + threads_per_stage[Stage - 1];
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time thread index calculation
|
||||
template <int Stage, int Thread> static constexpr int thread_index() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
static_assert(Thread >= 0 && Thread < threads_per_stage[Stage],
|
||||
"Thread index out of bounds");
|
||||
return stage_offset<Stage>() + Thread;
|
||||
}
|
||||
|
||||
// Compile-time previous stage thread count
|
||||
template <int Stage> static constexpr int prev_stage_thread_count() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
if constexpr (Stage == 0) {
|
||||
return 1;
|
||||
} else {
|
||||
return threads_per_stage[Stage - 1];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Static pipeline algorithms - compile-time specialized versions
|
||||
namespace StaticPipelineAlgorithms {
|
||||
|
||||
template <WaitStrategy wait_strategy, typename Topology, int Stage,
|
||||
int ThreadInStage>
|
||||
uint32_t calculate_safe_len(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
std::atomic<uint32_t> &pushes, bool may_block) {
|
||||
constexpr int thread_idx =
|
||||
Topology::template thread_index<Stage, ThreadInStage>();
|
||||
auto &thread = all_threads[thread_idx];
|
||||
uint32_t safe_len = UINT32_MAX;
|
||||
|
||||
// Check all threads from the previous stage (or pushes for stage 0)
|
||||
int prev_stage_threads = topology.prev_stage_thread_count(stage);
|
||||
for (int i = 0; i < prev_stage_threads; ++i) {
|
||||
auto &last_push =
|
||||
(stage == 0) ? pushes
|
||||
: all_threads[topology.thread_index(stage - 1, i)].pops;
|
||||
constexpr int prev_stage_threads =
|
||||
Topology::template prev_stage_thread_count<Stage>();
|
||||
|
||||
if (thread.last_push_read[i] == thread.local_pops) {
|
||||
// Re-read with memory order and try again
|
||||
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
|
||||
if (thread.last_push_read[i] == thread.local_pops) {
|
||||
if (!may_block) {
|
||||
return 0;
|
||||
}
|
||||
// Compile-time loop over previous stage threads
|
||||
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
|
||||
(
|
||||
[&] {
|
||||
auto &last_push = [&]() -> std::atomic<uint32_t> & {
|
||||
if constexpr (Stage == 0) {
|
||||
return pushes;
|
||||
} else {
|
||||
constexpr int prev_thread_idx =
|
||||
Topology::template thread_index<Stage - 1, Is>();
|
||||
return all_threads[prev_thread_idx].pops;
|
||||
}
|
||||
}();
|
||||
|
||||
if constexpr (wait_strategy == WaitStrategy::Never) {
|
||||
// Empty - busy wait
|
||||
} else if constexpr (wait_strategy ==
|
||||
WaitStrategy::WaitIfUpstreamIdle) {
|
||||
auto push = pushes.load(std::memory_order_relaxed);
|
||||
if (push == thread.local_pops) {
|
||||
pushes.wait(push, std::memory_order_relaxed);
|
||||
if (thread.last_push_read[Is] == thread.local_pops) {
|
||||
thread.last_push_read[Is] =
|
||||
last_push.load(std::memory_order_acquire);
|
||||
if (thread.last_push_read[Is] == thread.local_pops) {
|
||||
if (!may_block) {
|
||||
safe_len = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
if constexpr (wait_strategy == WaitStrategy::Never) {
|
||||
// Empty - busy wait
|
||||
} else if constexpr (wait_strategy ==
|
||||
WaitStrategy::WaitIfUpstreamIdle) {
|
||||
auto push = pushes.load(std::memory_order_relaxed);
|
||||
if (push == thread.local_pops) {
|
||||
pushes.wait(push, std::memory_order_relaxed);
|
||||
}
|
||||
} else {
|
||||
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
|
||||
last_push.wait(thread.last_push_read[Is],
|
||||
std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
thread.last_push_read[Is] =
|
||||
last_push.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
|
||||
last_push.wait(thread.last_push_read[i], std::memory_order_relaxed);
|
||||
}
|
||||
safe_len =
|
||||
std::min(safe_len, thread.last_push_read[Is] - thread.local_pops);
|
||||
}(),
|
||||
...);
|
||||
}(std::make_index_sequence<prev_stage_threads>{});
|
||||
|
||||
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops);
|
||||
}
|
||||
return safe_len;
|
||||
}
|
||||
|
||||
// Update thread pops counter after processing
|
||||
template <WaitStrategy wait_strategy>
|
||||
void update_thread_pops(const PipelineTopology &topology,
|
||||
std::vector<ThreadState> &all_threads, int stage,
|
||||
int thread_in_stage, uint32_t local_pops) {
|
||||
int thread_idx = topology.thread_index(stage, thread_in_stage);
|
||||
template <WaitStrategy wait_strategy, typename Topology, int Stage,
|
||||
int ThreadInStage>
|
||||
void update_thread_pops(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
uint32_t local_pops) {
|
||||
constexpr int thread_idx =
|
||||
Topology::template thread_index<Stage, ThreadInStage>();
|
||||
auto &thread_state = all_threads[thread_idx];
|
||||
|
||||
if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) {
|
||||
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
|
||||
thread_state.pops.notify_all();
|
||||
} else if (thread_state.last_stage) {
|
||||
} else if constexpr (Stage == Topology::num_stages - 1) { // last stage
|
||||
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
|
||||
thread_state.pops.notify_all();
|
||||
} else {
|
||||
@@ -154,24 +157,22 @@ void update_thread_pops(const PipelineTopology &topology,
|
||||
}
|
||||
}
|
||||
|
||||
// Check if producer can proceed without stomping the ring buffer
|
||||
// Returns: 0 = can proceed, 1 = should retry, 2 = cannot proceed (return empty
|
||||
// guard)
|
||||
inline int check_producer_capacity(const PipelineTopology &topology,
|
||||
std::vector<ThreadState> &all_threads,
|
||||
uint32_t slot, uint32_t size,
|
||||
uint32_t slot_count, bool block) {
|
||||
// Check against last stage threads
|
||||
int last_stage = topology.num_stages - 1;
|
||||
int last_stage_offset = topology.stage_offset(last_stage);
|
||||
int last_stage_thread_count = topology.threads_per_stage[last_stage];
|
||||
template <typename Topology>
|
||||
int check_producer_capacity(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
uint32_t slot, uint32_t size, uint32_t slot_count, bool block) {
|
||||
constexpr int last_stage = Topology::num_stages - 1;
|
||||
constexpr int last_stage_offset =
|
||||
Topology::template stage_offset<last_stage>();
|
||||
constexpr int last_stage_thread_count =
|
||||
Topology::threads_per_stage[last_stage];
|
||||
|
||||
for (int i = 0; i < last_stage_thread_count; ++i) {
|
||||
auto &thread = all_threads[last_stage_offset + i];
|
||||
uint32_t pops = thread.pops.load(std::memory_order_acquire);
|
||||
if (slot + size - pops > slot_count) {
|
||||
if (!block) {
|
||||
return 2; // Cannot proceed, caller should return empty guard
|
||||
return 2; // Cannot proceed
|
||||
}
|
||||
thread.pops.wait(pops, std::memory_order_relaxed);
|
||||
return 1; // Should retry
|
||||
@@ -179,77 +180,24 @@ inline int check_producer_capacity(const PipelineTopology &topology,
|
||||
}
|
||||
return 0; // Can proceed
|
||||
}
|
||||
} // namespace PipelineAlgorithms
|
||||
} // namespace StaticPipelineAlgorithms
|
||||
|
||||
// Multi-stage lock-free pipeline for high-throughput inter-thread
|
||||
// communication.
|
||||
//
|
||||
// Overview:
|
||||
// - Items flow from producers through multiple processing stages (stage 0 ->
|
||||
// stage 1 -> ... -> final stage)
|
||||
// - Each stage can have multiple worker threads processing items in parallel
|
||||
// - Uses a shared ring buffer with atomic counters for lock-free coordination
|
||||
// - Supports batch processing for efficiency
|
||||
//
|
||||
// Architecture:
|
||||
// - Producers: External threads that add items to the pipeline via push()
|
||||
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
|
||||
// acquire()
|
||||
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
|
||||
//
|
||||
// Usage Pattern:
|
||||
// // Producer threads (external to pipeline stages - add items for stage 0 to
|
||||
// consume): auto guard = pipeline.push(batchSize, /*block=*/true); for (auto&
|
||||
// item : guard.batch) {
|
||||
// // Initialize item data
|
||||
// }
|
||||
// // Guard destructor publishes batch to stage 0 consumers
|
||||
//
|
||||
// // Stage worker threads (process items and pass to next stage):
|
||||
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
|
||||
// /*mayBlock=*/true); for (auto& item : guard.batch) {
|
||||
// // Process item
|
||||
// }
|
||||
// // Guard destructor marks items as consumed and available to next stage
|
||||
//
|
||||
// Memory Model:
|
||||
// - Ring buffer size must be power of 2 for efficient masking
|
||||
// - Actual ring slots accessed via: index & (slotCount - 1)
|
||||
// - 128-byte aligned atomics prevent false sharing between CPU cache lines
|
||||
//
|
||||
// Thread Safety:
|
||||
// - Fully lock-free using atomic operations with acquire/release memory
|
||||
// ordering
|
||||
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
|
||||
// - RAII guards ensure proper cleanup even with exceptions
|
||||
//
|
||||
// Refactored Design:
|
||||
// - Topology configuration separated from pipeline logic
|
||||
// - Flattened thread storage replaces nested vectors
|
||||
// - Core algorithms extracted into pure functions
|
||||
// - Ready for compile-time static version implementation
|
||||
template <class T, WaitStrategy wait_strategy = WaitStrategy::WaitIfStageEmpty>
|
||||
struct ThreadPipeline {
|
||||
// Static multi-stage lock-free pipeline
|
||||
template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage>
|
||||
struct StaticThreadPipeline {
|
||||
using Topology = StaticPipelineTopology<ThreadsPerStage...>;
|
||||
|
||||
// Constructor now takes topology configuration
|
||||
ThreadPipeline(int lgSlotCount, const PipelineTopology &topo)
|
||||
: topology(topo), slot_count(1 << lgSlotCount),
|
||||
slot_count_mask(slot_count - 1), all_threads(topology.total_threads),
|
||||
explicit StaticThreadPipeline(int lgSlotCount)
|
||||
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
|
||||
ring(slot_count) {
|
||||
// Otherwise we can't tell the difference between full and empty
|
||||
assert(!(slot_count_mask & 0x80000000));
|
||||
|
||||
initialize_all_threads();
|
||||
}
|
||||
|
||||
// Legacy constructor for backward compatibility
|
||||
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
|
||||
: ThreadPipeline(lgSlotCount, PipelineTopology(threadsPerStage)) {}
|
||||
|
||||
ThreadPipeline(ThreadPipeline const &) = delete;
|
||||
ThreadPipeline &operator=(ThreadPipeline const &) = delete;
|
||||
ThreadPipeline(ThreadPipeline &&) = delete;
|
||||
ThreadPipeline &operator=(ThreadPipeline &&) = delete;
|
||||
StaticThreadPipeline(StaticThreadPipeline const &) = delete;
|
||||
StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete;
|
||||
StaticThreadPipeline(StaticThreadPipeline &&) = delete;
|
||||
StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete;
|
||||
|
||||
struct Batch {
|
||||
Batch() : ring(), begin_(), end_() {}
|
||||
@@ -317,7 +265,6 @@ struct ThreadPipeline {
|
||||
}
|
||||
friend bool operator<(const Iterator &lhs, const Iterator &rhs) {
|
||||
assert(lhs.ring == rhs.ring);
|
||||
// Handle potential uint32_t wraparound by using signed difference
|
||||
return static_cast<int32_t>(lhs.index_ - rhs.index_) < 0;
|
||||
}
|
||||
friend bool operator<=(const Iterator &lhs, const Iterator &rhs) {
|
||||
@@ -333,9 +280,6 @@ struct ThreadPipeline {
|
||||
return static_cast<int32_t>(lhs.index_ - rhs.index_) >= 0;
|
||||
}
|
||||
|
||||
/// Returns the ring buffer index (0 to ring->size()-1) for this iterator
|
||||
/// position. Useful for distributing work across multiple threads by
|
||||
/// using modulo operations.
|
||||
uint32_t index() const { return index_ & (ring->size() - 1); }
|
||||
|
||||
private:
|
||||
@@ -356,7 +300,7 @@ struct ThreadPipeline {
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ThreadPipeline;
|
||||
friend struct StaticThreadPipeline;
|
||||
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
|
||||
: ring(ring), begin_(begin_), end_(end_) {}
|
||||
std::vector<T> *const ring;
|
||||
@@ -364,45 +308,45 @@ struct ThreadPipeline {
|
||||
uint32_t end_;
|
||||
};
|
||||
|
||||
private:
|
||||
// Pipeline configuration
|
||||
PipelineTopology topology;
|
||||
// Static thread storage - fixed size array
|
||||
std::array<ThreadState, Topology::total_threads> all_threads;
|
||||
|
||||
// Core state
|
||||
private:
|
||||
alignas(128) std::atomic<uint32_t> slots{0};
|
||||
alignas(128) std::atomic<uint32_t> pushes{0};
|
||||
const uint32_t slot_count;
|
||||
const uint32_t slot_count_mask;
|
||||
|
||||
// Flattened thread storage - single array instead of nested vectors
|
||||
std::vector<ThreadState> all_threads;
|
||||
|
||||
// Ring buffer
|
||||
std::vector<T> ring;
|
||||
|
||||
void initialize_all_threads() {
|
||||
for (int stage = 0; stage < topology.num_stages; ++stage) {
|
||||
int stage_offset = topology.stage_offset(stage);
|
||||
int stage_thread_count = topology.threads_per_stage[stage];
|
||||
int prev_stage_threads = topology.prev_stage_thread_count(stage);
|
||||
bool is_last_stage = (stage == topology.num_stages - 1);
|
||||
[&]<std::size_t... StageIndices>(std::index_sequence<StageIndices...>) {
|
||||
(init_stage_threads<StageIndices>(), ...);
|
||||
}(std::make_index_sequence<Topology::num_stages>{});
|
||||
}
|
||||
|
||||
for (int thread = 0; thread < stage_thread_count; ++thread) {
|
||||
auto &thread_state = all_threads[stage_offset + thread];
|
||||
thread_state.last_stage = is_last_stage;
|
||||
thread_state.last_push_read = std::vector<uint32_t>(prev_stage_threads);
|
||||
}
|
||||
template <int Stage> void init_stage_threads() {
|
||||
constexpr int stage_offset = Topology::template stage_offset<Stage>();
|
||||
constexpr int stage_thread_count = Topology::threads_per_stage[Stage];
|
||||
constexpr int prev_stage_threads =
|
||||
Topology::template prev_stage_thread_count<Stage>();
|
||||
constexpr bool is_last_stage = (Stage == Topology::num_stages - 1);
|
||||
|
||||
for (int thread = 0; thread < stage_thread_count; ++thread) {
|
||||
auto &thread_state = all_threads[stage_offset + thread];
|
||||
thread_state.last_stage = is_last_stage;
|
||||
thread_state.last_push_read = std::vector<uint32_t>(prev_stage_threads);
|
||||
}
|
||||
}
|
||||
|
||||
Batch acquire_helper(int stage, int thread, uint32_t maxBatch,
|
||||
bool may_block) {
|
||||
int thread_idx = topology.thread_index(stage, thread);
|
||||
template <int Stage, int Thread>
|
||||
Batch acquire_helper(uint32_t maxBatch, bool mayBlock) {
|
||||
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
|
||||
auto &thread_state = all_threads[thread_idx];
|
||||
|
||||
uint32_t begin = thread_state.local_pops & slot_count_mask;
|
||||
uint32_t len = PipelineAlgorithms::calculate_safe_len<wait_strategy>(
|
||||
topology, all_threads, pushes, stage, thread, may_block);
|
||||
uint32_t len = StaticPipelineAlgorithms::calculate_safe_len<
|
||||
wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock);
|
||||
|
||||
if (maxBatch != 0) {
|
||||
len = std::min(len, maxBatch);
|
||||
@@ -417,32 +361,37 @@ private:
|
||||
}
|
||||
|
||||
public:
|
||||
struct StageGuard {
|
||||
template <int Stage, int Thread> struct StageGuard {
|
||||
Batch batch;
|
||||
|
||||
~StageGuard() {
|
||||
if (cleanup_func) {
|
||||
cleanup_func();
|
||||
if (!batch.empty()) {
|
||||
StaticPipelineAlgorithms::update_thread_pops<wait_strategy, Topology,
|
||||
Stage, Thread>(
|
||||
pipeline->all_threads, local_pops);
|
||||
}
|
||||
}
|
||||
|
||||
StageGuard(StageGuard const &) = delete;
|
||||
StageGuard &operator=(StageGuard const &) = delete;
|
||||
StageGuard(StageGuard &&other) noexcept
|
||||
: batch(other.batch),
|
||||
cleanup_func(std::exchange(other.cleanup_func, nullptr)) {}
|
||||
: batch(other.batch), local_pops(other.local_pops),
|
||||
pipeline(std::exchange(other.pipeline, nullptr)) {}
|
||||
StageGuard &operator=(StageGuard &&other) noexcept {
|
||||
batch = other.batch;
|
||||
cleanup_func = std::exchange(other.cleanup_func, nullptr);
|
||||
local_pops = other.local_pops;
|
||||
pipeline = std::exchange(other.pipeline, nullptr);
|
||||
return *this;
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ThreadPipeline;
|
||||
std::function<void()> cleanup_func;
|
||||
friend struct StaticThreadPipeline;
|
||||
uint32_t local_pops;
|
||||
StaticThreadPipeline *pipeline;
|
||||
|
||||
StageGuard(Batch batch, std::function<void()> cleanup)
|
||||
: batch(batch), cleanup_func(batch.empty() ? nullptr : cleanup) {}
|
||||
StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline)
|
||||
: batch(batch), local_pops(local_pops),
|
||||
pipeline(batch.empty() ? nullptr : pipeline) {}
|
||||
};
|
||||
|
||||
struct ProducerGuard {
|
||||
@@ -452,8 +401,6 @@ public:
|
||||
if (tp == nullptr) {
|
||||
return;
|
||||
}
|
||||
// Wait for earlier slots to finish being published, since publishing
|
||||
// implies that all previous slots were also published.
|
||||
for (;;) {
|
||||
uint32_t p = tp->pushes.load(std::memory_order_acquire);
|
||||
if (p == old_slot) {
|
||||
@@ -461,65 +408,38 @@ public:
|
||||
}
|
||||
tp->pushes.wait(p, std::memory_order_relaxed);
|
||||
}
|
||||
// Publish. seq_cst so that the notify can't be ordered before the store
|
||||
tp->pushes.store(new_slot, std::memory_order_seq_cst);
|
||||
// We have to notify every time, since we don't know if this is the last
|
||||
// push ever
|
||||
tp->pushes.notify_all();
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ThreadPipeline;
|
||||
friend struct StaticThreadPipeline;
|
||||
ProducerGuard() : batch(), tp() {}
|
||||
ProducerGuard(Batch batch, ThreadPipeline<T, wait_strategy> *tp,
|
||||
uint32_t old_slot, uint32_t new_slot)
|
||||
ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot,
|
||||
uint32_t new_slot)
|
||||
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
|
||||
ThreadPipeline<T, wait_strategy> *const tp;
|
||||
StaticThreadPipeline *const tp;
|
||||
uint32_t old_slot;
|
||||
uint32_t new_slot;
|
||||
};
|
||||
|
||||
// Acquire a batch of items for processing by a consumer thread.
|
||||
// stage: which processing stage (0 = first consumer stage after producers)
|
||||
// thread: thread ID within the stage (0 to threadsPerStage[stage]-1)
|
||||
// maxBatch: maximum items to acquire (0 = no limit)
|
||||
// mayBlock: whether to block waiting for items (false = return empty batch if
|
||||
// none available) Returns: StageGuard with batch of items to process
|
||||
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
|
||||
bool mayBlock = true) {
|
||||
assert(stage >= 0 && stage < topology.num_stages);
|
||||
assert(thread >= 0 && thread < topology.threads_per_stage[stage]);
|
||||
// Static acquire - Stage and Thread are compile-time parameters
|
||||
template <int Stage, int Thread>
|
||||
[[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0,
|
||||
bool may_block = true) {
|
||||
static_assert(Stage >= 0 && Stage < Topology::num_stages,
|
||||
"Stage index out of bounds");
|
||||
static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage],
|
||||
"Thread index out of bounds");
|
||||
|
||||
auto batch = acquire_helper(stage, thread, maxBatch, mayBlock);
|
||||
auto batch = acquire_helper<Stage, Thread>(maxBatch, may_block);
|
||||
|
||||
// Create cleanup function that will update thread state on destruction
|
||||
int thread_idx = topology.thread_index(stage, thread);
|
||||
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
|
||||
uint32_t local_pops = all_threads[thread_idx].local_pops;
|
||||
|
||||
auto cleanup = [this, stage, thread, local_pops]() {
|
||||
PipelineAlgorithms::update_thread_pops<wait_strategy>(
|
||||
topology, all_threads, stage, thread, local_pops);
|
||||
};
|
||||
|
||||
return StageGuard{std::move(batch), cleanup};
|
||||
return StageGuard<Stage, Thread>{std::move(batch), local_pops, this};
|
||||
}
|
||||
|
||||
// Reserve slots in the ring buffer for a producer thread to fill with items.
|
||||
// This is used by producer threads to add new items to stage 0 of the
|
||||
// pipeline.
|
||||
//
|
||||
// size: number of slots to reserve (must be > 0 and <= ring buffer capacity)
|
||||
// block: if true, blocks when ring buffer is full; if false, returns empty
|
||||
// guard Returns: ProducerGuard with exclusive access to reserved slots
|
||||
//
|
||||
// Usage: Fill items in the returned batch, then let guard destructor publish
|
||||
// them. The guard destructor ensures items are published in the correct
|
||||
// order.
|
||||
//
|
||||
// Preconditions:
|
||||
// - size > 0 (must request at least one slot)
|
||||
// - size <= slotCount (cannot request more slots than ring buffer capacity)
|
||||
// Violating preconditions results in program termination via abort().
|
||||
[[nodiscard]] ProducerGuard push(uint32_t const size, bool block) {
|
||||
if (size == 0) {
|
||||
std::abort();
|
||||
@@ -534,16 +454,15 @@ public:
|
||||
slot = slots.load(std::memory_order_relaxed);
|
||||
begin = slot & slot_count_mask;
|
||||
|
||||
// Use algorithm function to check capacity
|
||||
int capacity_result = PipelineAlgorithms::check_producer_capacity(
|
||||
topology, all_threads, slot, size, slot_count, block);
|
||||
int capacity_result =
|
||||
StaticPipelineAlgorithms::check_producer_capacity<Topology>(
|
||||
all_threads, slot, size, slot_count, block);
|
||||
if (capacity_result == 1) {
|
||||
continue; // Retry
|
||||
continue;
|
||||
}
|
||||
if (capacity_result == 2) {
|
||||
return ProducerGuard{}; // Cannot proceed, return empty guard
|
||||
return ProducerGuard{};
|
||||
}
|
||||
// capacity_result == 0, can proceed
|
||||
|
||||
if (slots.compare_exchange_weak(slot, slot + size,
|
||||
std::memory_order_relaxed,
|
||||
|
||||
Reference in New Issue
Block a user