Files
weaseldb/src/thread_pipeline.hpp

558 lines
20 KiB
C++

#pragma once
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <functional>
#include <iterator>
#include <numeric>
#include <utility>
#include <vector>
// Topology configuration - separates stage layout from pipeline logic
struct PipelineTopology {
std::vector<int> threads_per_stage;
int num_stages;
int total_threads;
PipelineTopology(const std::vector<int> &threads)
: threads_per_stage(threads), num_stages(threads.size()) {
assert(!threads.empty());
assert(std::all_of(threads.begin(), threads.end(),
[](int t) { return t > 0; }));
total_threads = std::accumulate(threads.begin(), threads.end(), 0);
}
// Get the flat array offset for the first thread of a stage
int stage_offset(int stage) const {
assert(stage >= 0 && stage < num_stages);
int offset = 0;
for (int i = 0; i < stage; ++i) {
offset += threads_per_stage[i];
}
return offset;
}
// Get flat array index for a specific thread in a stage
int thread_index(int stage, int thread) const {
assert(stage >= 0 && stage < num_stages);
assert(thread >= 0 && thread < threads_per_stage[stage]);
return stage_offset(stage) + thread;
}
// Get number of threads in the previous stage (for initialization)
int prev_stage_thread_count(int stage) const {
return (stage == 0) ? 1 : threads_per_stage[stage - 1];
}
};
// Wait strategies for controlling thread blocking behavior when no work is
// available
enum class WaitStrategy {
// Never block - threads busy-wait (spin) when no work available.
// Stage threads will always use 100% CPU even when idle.
// Requires dedicated CPU cores to avoid scheduler thrashing.
// Use when: latency is critical and you have spare cores.
Never,
// Block only when all upstream stages are idle (no new work entering
// pipeline).
// Downstream threads busy-wait if upstream has work but not for their stage.
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
// Requires dedicated cores to avoid priority inversion when pipeline has
// work.
// Use when: high throughput with spare cores and sustained workloads.
WaitIfUpstreamIdle,
// Block when individual stages are empty (original behavior).
// Each stage waits independently on its input sources.
// Safe for shared CPU environments, works well with variable workloads.
// Use when: general purpose, shared cores, or unpredictable workloads.
WaitIfStageEmpty,
};
// Core thread state - extracted from pipeline concerns
struct ThreadState {
// Where this thread has published up to
alignas(128) std::atomic<uint32_t> pops{0};
// Where this thread will publish to the next time it publishes, or if idle
// where it has published to
uint32_t local_pops{0};
// Where the previous stage's threads have published up to last we checked
std::vector<uint32_t> last_push_read;
bool last_stage;
};
// Core pipeline algorithms - independent of storage layout
namespace PipelineAlgorithms {
// Calculate how many items can be safely acquired from a stage
template <WaitStrategy wait_strategy>
uint32_t calculate_safe_len(const PipelineTopology &topology,
std::vector<ThreadState> &all_threads,
std::atomic<uint32_t> &pushes, int stage,
int thread_in_stage, bool may_block) {
int thread_idx = topology.thread_index(stage, thread_in_stage);
auto &thread = all_threads[thread_idx];
uint32_t safe_len = UINT32_MAX;
// Check all threads from the previous stage (or pushes for stage 0)
int prev_stage_threads = topology.prev_stage_thread_count(stage);
for (int i = 0; i < prev_stage_threads; ++i) {
auto &last_push =
(stage == 0) ? pushes
: all_threads[topology.thread_index(stage - 1, i)].pops;
if (thread.last_push_read[i] == thread.local_pops) {
// Re-read with memory order and try again
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
if (thread.last_push_read[i] == thread.local_pops) {
if (!may_block) {
return 0;
}
if constexpr (wait_strategy == WaitStrategy::Never) {
// Empty - busy wait
} else if constexpr (wait_strategy ==
WaitStrategy::WaitIfUpstreamIdle) {
auto push = pushes.load(std::memory_order_relaxed);
if (push == thread.local_pops) {
pushes.wait(push, std::memory_order_relaxed);
}
} else {
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
last_push.wait(thread.last_push_read[i], std::memory_order_relaxed);
}
thread.last_push_read[i] = last_push.load(std::memory_order_acquire);
}
}
safe_len = std::min(safe_len, thread.last_push_read[i] - thread.local_pops);
}
return safe_len;
}
// Update thread pops counter after processing
template <WaitStrategy wait_strategy>
void update_thread_pops(const PipelineTopology &topology,
std::vector<ThreadState> &all_threads, int stage,
int thread_in_stage, uint32_t local_pops) {
int thread_idx = topology.thread_index(stage, thread_in_stage);
auto &thread_state = all_threads[thread_idx];
if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) {
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all();
} else if (thread_state.last_stage) {
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all();
} else {
thread_state.pops.store(local_pops, std::memory_order_release);
}
}
// Check if producer can proceed without stomping the ring buffer
// Returns: 0 = can proceed, 1 = should retry, 2 = cannot proceed (return empty
// guard)
inline int check_producer_capacity(const PipelineTopology &topology,
std::vector<ThreadState> &all_threads,
uint32_t slot, uint32_t size,
uint32_t slot_count, bool block) {
// Check against last stage threads
int last_stage = topology.num_stages - 1;
int last_stage_offset = topology.stage_offset(last_stage);
int last_stage_thread_count = topology.threads_per_stage[last_stage];
for (int i = 0; i < last_stage_thread_count; ++i) {
auto &thread = all_threads[last_stage_offset + i];
uint32_t pops = thread.pops.load(std::memory_order_acquire);
if (slot + size - pops > slot_count) {
if (!block) {
return 2; // Cannot proceed, caller should return empty guard
}
thread.pops.wait(pops, std::memory_order_relaxed);
return 1; // Should retry
}
}
return 0; // Can proceed
}
} // namespace PipelineAlgorithms
// Multi-stage lock-free pipeline for high-throughput inter-thread
// communication.
//
// Overview:
// - Items flow from producers through multiple processing stages (stage 0 ->
// stage 1 -> ... -> final stage)
// - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency
//
// Architecture:
// - Producers: External threads that add items to the pipeline via push()
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
// acquire()
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
//
// Usage Pattern:
// // Producer threads (external to pipeline stages - add items for stage 0 to
// consume): auto guard = pipeline.push(batchSize, /*block=*/true); for (auto&
// item : guard.batch) {
// // Initialize item data
// }
// // Guard destructor publishes batch to stage 0 consumers
//
// // Stage worker threads (process items and pass to next stage):
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
// /*mayBlock=*/true); for (auto& item : guard.batch) {
// // Process item
// }
// // Guard destructor marks items as consumed and available to next stage
//
// Memory Model:
// - Ring buffer size must be power of 2 for efficient masking
// - Actual ring slots accessed via: index & (slotCount - 1)
// - 128-byte aligned atomics prevent false sharing between CPU cache lines
//
// Thread Safety:
// - Fully lock-free using atomic operations with acquire/release memory
// ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions
//
// Refactored Design:
// - Topology configuration separated from pipeline logic
// - Flattened thread storage replaces nested vectors
// - Core algorithms extracted into pure functions
// - Ready for compile-time static version implementation
template <class T, WaitStrategy wait_strategy = WaitStrategy::WaitIfStageEmpty>
struct ThreadPipeline {
// Constructor now takes topology configuration
ThreadPipeline(int lgSlotCount, const PipelineTopology &topo)
: topology(topo), slot_count(1 << lgSlotCount),
slot_count_mask(slot_count - 1), all_threads(topology.total_threads),
ring(slot_count) {
// Otherwise we can't tell the difference between full and empty
assert(!(slot_count_mask & 0x80000000));
initialize_all_threads();
}
// Legacy constructor for backward compatibility
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
: ThreadPipeline(lgSlotCount, PipelineTopology(threadsPerStage)) {}
ThreadPipeline(ThreadPipeline const &) = delete;
ThreadPipeline &operator=(ThreadPipeline const &) = delete;
ThreadPipeline(ThreadPipeline &&) = delete;
ThreadPipeline &operator=(ThreadPipeline &&) = delete;
struct Batch {
Batch() : ring(), begin_(), end_() {}
struct Iterator {
using iterator_category = std::random_access_iterator_tag;
using difference_type = std::ptrdiff_t;
using value_type = T;
using pointer = value_type *;
using reference = value_type &;
reference operator*() const {
return (*ring)[index_ & (ring->size() - 1)];
}
pointer operator->() const {
return &(*ring)[index_ & (ring->size() - 1)];
}
Iterator &operator++() {
++index_;
return *this;
}
Iterator operator++(int) {
auto tmp = *this;
++(*this);
return tmp;
}
Iterator &operator--() {
--index_;
return *this;
}
Iterator operator--(int) {
auto tmp = *this;
--(*this);
return tmp;
}
Iterator &operator+=(difference_type n) {
index_ += n;
return *this;
}
Iterator &operator-=(difference_type n) {
index_ -= n;
return *this;
}
Iterator operator+(difference_type n) const {
return Iterator(index_ + n, ring);
}
Iterator operator-(difference_type n) const {
return Iterator(index_ - n, ring);
}
difference_type operator-(const Iterator &rhs) const {
assert(ring == rhs.ring);
return static_cast<difference_type>(index_) -
static_cast<difference_type>(rhs.index_);
}
friend Iterator operator+(difference_type n, const Iterator &iter) {
return iter + n;
}
friend bool operator==(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
return lhs.index_ == rhs.index_;
}
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
return lhs.index_ != rhs.index_;
}
friend bool operator<(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
// Handle potential uint32_t wraparound by using signed difference
return static_cast<int32_t>(lhs.index_ - rhs.index_) < 0;
}
friend bool operator<=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index_ - rhs.index_) <= 0;
}
friend bool operator>(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index_ - rhs.index_) > 0;
}
friend bool operator>=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index_ - rhs.index_) >= 0;
}
/// Returns the ring buffer index (0 to ring->size()-1) for this iterator
/// position. Useful for distributing work across multiple threads by
/// using modulo operations.
uint32_t index() const { return index_ & (ring->size() - 1); }
private:
Iterator(uint32_t index, std::vector<T> *const ring)
: index_(index), ring(ring) {}
friend struct Batch;
uint32_t index_;
std::vector<T> *const ring;
};
[[nodiscard]] Iterator begin() { return Iterator(begin_, ring); }
[[nodiscard]] Iterator end() { return Iterator(end_, ring); }
[[nodiscard]] size_t size() const { return end_ - begin_; }
[[nodiscard]] bool empty() const { return end_ == begin_; }
T &operator[](uint32_t n) {
return (*ring)[(begin_ + n) & (ring->size() - 1)];
}
private:
friend struct ThreadPipeline;
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
: ring(ring), begin_(begin_), end_(end_) {}
std::vector<T> *const ring;
uint32_t begin_;
uint32_t end_;
};
private:
// Pipeline configuration
PipelineTopology topology;
// Core state
alignas(128) std::atomic<uint32_t> slots{0};
alignas(128) std::atomic<uint32_t> pushes{0};
const uint32_t slot_count;
const uint32_t slot_count_mask;
// Flattened thread storage - single array instead of nested vectors
std::vector<ThreadState> all_threads;
// Ring buffer
std::vector<T> ring;
void initialize_all_threads() {
for (int stage = 0; stage < topology.num_stages; ++stage) {
int stage_offset = topology.stage_offset(stage);
int stage_thread_count = topology.threads_per_stage[stage];
int prev_stage_threads = topology.prev_stage_thread_count(stage);
bool is_last_stage = (stage == topology.num_stages - 1);
for (int thread = 0; thread < stage_thread_count; ++thread) {
auto &thread_state = all_threads[stage_offset + thread];
thread_state.last_stage = is_last_stage;
thread_state.last_push_read = std::vector<uint32_t>(prev_stage_threads);
}
}
}
Batch acquire_helper(int stage, int thread, uint32_t maxBatch,
bool may_block) {
int thread_idx = topology.thread_index(stage, thread);
auto &thread_state = all_threads[thread_idx];
uint32_t begin = thread_state.local_pops & slot_count_mask;
uint32_t len = PipelineAlgorithms::calculate_safe_len<wait_strategy>(
topology, all_threads, pushes, stage, thread, may_block);
if (maxBatch != 0) {
len = std::min(len, maxBatch);
}
if (len == 0) {
return Batch{};
}
auto result = Batch{&ring, begin, begin + len};
thread_state.local_pops += len;
return result;
}
public:
struct StageGuard {
Batch batch;
~StageGuard() {
if (cleanup_func) {
cleanup_func();
}
}
StageGuard(StageGuard const &) = delete;
StageGuard &operator=(StageGuard const &) = delete;
StageGuard(StageGuard &&other) noexcept
: batch(other.batch),
cleanup_func(std::exchange(other.cleanup_func, nullptr)) {}
StageGuard &operator=(StageGuard &&other) noexcept {
batch = other.batch;
cleanup_func = std::exchange(other.cleanup_func, nullptr);
return *this;
}
private:
friend struct ThreadPipeline;
std::function<void()> cleanup_func;
StageGuard(Batch batch, std::function<void()> cleanup)
: batch(batch), cleanup_func(batch.empty() ? nullptr : cleanup) {}
};
struct ProducerGuard {
Batch batch;
~ProducerGuard() {
if (tp == nullptr) {
return;
}
// Wait for earlier slots to finish being published, since publishing
// implies that all previous slots were also published.
for (;;) {
uint32_t p = tp->pushes.load(std::memory_order_acquire);
if (p == old_slot) {
break;
}
tp->pushes.wait(p, std::memory_order_relaxed);
}
// Publish. seq_cst so that the notify can't be ordered before the store
tp->pushes.store(new_slot, std::memory_order_seq_cst);
// We have to notify every time, since we don't know if this is the last
// push ever
tp->pushes.notify_all();
}
private:
friend struct ThreadPipeline;
ProducerGuard() : batch(), tp() {}
ProducerGuard(Batch batch, ThreadPipeline<T, wait_strategy> *tp,
uint32_t old_slot, uint32_t new_slot)
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
ThreadPipeline<T, wait_strategy> *const tp;
uint32_t old_slot;
uint32_t new_slot;
};
// Acquire a batch of items for processing by a consumer thread.
// stage: which processing stage (0 = first consumer stage after producers)
// thread: thread ID within the stage (0 to threadsPerStage[stage]-1)
// maxBatch: maximum items to acquire (0 = no limit)
// mayBlock: whether to block waiting for items (false = return empty batch if
// none available) Returns: StageGuard with batch of items to process
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
bool mayBlock = true) {
assert(stage >= 0 && stage < topology.num_stages);
assert(thread >= 0 && thread < topology.threads_per_stage[stage]);
auto batch = acquire_helper(stage, thread, maxBatch, mayBlock);
// Create cleanup function that will update thread state on destruction
int thread_idx = topology.thread_index(stage, thread);
uint32_t local_pops = all_threads[thread_idx].local_pops;
auto cleanup = [this, stage, thread, local_pops]() {
PipelineAlgorithms::update_thread_pops<wait_strategy>(
topology, all_threads, stage, thread, local_pops);
};
return StageGuard{std::move(batch), cleanup};
}
// Reserve slots in the ring buffer for a producer thread to fill with items.
// This is used by producer threads to add new items to stage 0 of the
// pipeline.
//
// size: number of slots to reserve (must be > 0 and <= ring buffer capacity)
// block: if true, blocks when ring buffer is full; if false, returns empty
// guard Returns: ProducerGuard with exclusive access to reserved slots
//
// Usage: Fill items in the returned batch, then let guard destructor publish
// them. The guard destructor ensures items are published in the correct
// order.
//
// Preconditions:
// - size > 0 (must request at least one slot)
// - size <= slotCount (cannot request more slots than ring buffer capacity)
// Violating preconditions results in program termination via abort().
[[nodiscard]] ProducerGuard push(uint32_t const size, bool block) {
if (size == 0) {
std::abort();
}
if (size > slot_count) {
std::abort();
}
uint32_t slot;
uint32_t begin;
for (;;) {
slot = slots.load(std::memory_order_relaxed);
begin = slot & slot_count_mask;
// Use algorithm function to check capacity
int capacity_result = PipelineAlgorithms::check_producer_capacity(
topology, all_threads, slot, size, slot_count, block);
if (capacity_result == 1) {
continue; // Retry
}
if (capacity_result == 2) {
return ProducerGuard{}; // Cannot proceed, return empty guard
}
// capacity_result == 0, can proceed
if (slots.compare_exchange_weak(slot, slot + size,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
break;
}
}
return ProducerGuard{Batch{&ring, begin, begin + size}, this, slot,
slot + size};
}
};