From 3c3555da7adc307044715d5382c254c2c7997bce Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 26 Aug 2025 13:33:16 -0400 Subject: [PATCH] Benchmark different batches and wait strategies --- benchmarks/bench_thread_pipeline.cpp | 166 +++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/benchmarks/bench_thread_pipeline.cpp b/benchmarks/bench_thread_pipeline.cpp index c77115a..2e643d3 100644 --- a/benchmarks/bench_thread_pipeline.cpp +++ b/benchmarks/bench_thread_pipeline.cpp @@ -80,6 +80,172 @@ int main() { stage0_thread.join(); } + // Batch size comparison benchmark + { + constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots + constexpr int NUM_ITEMS = 100'000; + constexpr int BUSY_ITERS = 100; + + auto bench = ankerl::nanobench::Bench() + .title("Batch Size Impact") + .unit("item") + .batch(NUM_ITEMS) + .relative(true) + .warmup(100); + + for (int batch_size : {1, 4, 16, 64, 256}) { + std::vector threads_per_stage = {1}; + ThreadPipeline pipeline(LOG_PIPELINE_SIZE, + threads_per_stage); + + std::latch done{0}; + + // Stage 0 consumer thread + std::thread stage0_thread([&pipeline, &done]() { + const int stage = 0; + const int thread_id = 0; + + for (;;) { + auto guard = pipeline.acquire(stage, thread_id); + + for (auto &item : guard.batch) { + for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { + } + if (item == &done) { + return; + } + if (item) { + item->count_down(); + } + } + } + }); + + bench.run("Batch size " + std::to_string(batch_size), [&] { + // Producer (main thread) + int items_pushed = 0; + while (items_pushed < NUM_ITEMS - 1) { + auto guard = pipeline.push( + std::min(NUM_ITEMS - 1 - items_pushed, batch_size), true); + + auto it = guard.batch.begin(); + items_pushed += guard.batch.size(); + for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { + *it = nullptr; + } + } + std::latch finish{1}; + { + auto guard = pipeline.push(1, true); + *guard.batch.begin() = &finish; + } + finish.wait(); + }); + + { + auto guard = pipeline.push(1, true); + *guard.batch.begin() = &done; + } + + stage0_thread.join(); + } + } + + // Helper function for wait strategy benchmarks + auto benchmark_wait_strategy = + [](const std::string &name, + ankerl::nanobench::Bench &bench) { + constexpr int LOG_PIPELINE_SIZE = + 8; // Smaller buffer to increase contention + constexpr int NUM_ITEMS = 50'000; + constexpr int BATCH_SIZE = 4; // Small batches to increase coordination + constexpr int BUSY_ITERS = + 10; // Light work to emphasize coordination overhead + + std::vector threads_per_stage = {1, 1}; // Two stages + ThreadPipeline pipeline(LOG_PIPELINE_SIZE, + threads_per_stage); + + std::latch done{0}; + + // Stage 0 worker + std::thread stage0_thread([&pipeline, &done]() { + const int stage = 0; + const int thread_id = 0; + for (;;) { + auto guard = pipeline.acquire(stage, thread_id); + for (auto &item : guard.batch) { + for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { + } + if (item == &done) + return; + } + } + }); + + // Stage 1 worker (final stage - always calls futex wake) + std::thread stage1_thread([&pipeline, &done]() { + const int stage = 1; + const int thread_id = 0; + for (;;) { + auto guard = pipeline.acquire(stage, thread_id); + for (auto &item : guard.batch) { + for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { + } + if (item == &done) + return; + if (item) + item->count_down(); + } + } + }); + + bench.run(name, [&] { + int items_pushed = 0; + while (items_pushed < NUM_ITEMS - 1) { + auto guard = pipeline.push( + std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); + auto it = guard.batch.begin(); + items_pushed += guard.batch.size(); + for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { + *it = nullptr; + } + } + std::latch finish{1}; + { + auto guard = pipeline.push(1, true); + *guard.batch.begin() = &finish; + } + finish.wait(); + }); + + // Shutdown + { + auto guard = pipeline.push(1, true); + auto it = guard.batch.begin(); + *it = &done; + } + stage0_thread.join(); + stage1_thread.join(); + }; + + // Wait strategy comparison benchmark - multiple stages to trigger futex wakes + { + auto bench = ankerl::nanobench::Bench() + .title("Wait Strategy Comparison") + .unit("item") + .batch(50'000) + .relative(true) + .warmup(50); + + benchmark_wait_strategy.template operator()( + "WaitIfStageEmpty", bench); + benchmark_wait_strategy.template + operator()("WaitIfUpstreamIdle", bench); + benchmark_wait_strategy.template operator()("Never", + bench); + } + // TODO: Add more benchmarks for: // - Multi-stage pipelines (3+ stages) // - Multiple threads per stage