#include "thread_pipeline.hpp" #include #include #include int main() { { constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots constexpr int NUM_ITEMS = 100'000; constexpr int BATCH_SIZE = 16; constexpr int BUSY_ITERS = 100; auto bench = ankerl::nanobench::Bench() .title("Pipeline Throughput") .unit("item") .batch(NUM_ITEMS) .relative(true) .warmup(100); bench.run("Zero stage pipeline", [&] { for (int i = 0; i < NUM_ITEMS; ++i) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } } }); StaticThreadPipeline pipeline(LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { for (;;) { auto guard = pipeline.acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } if (item == &done) { return; } if (item) { item->count_down(); } } } }); bench.run("One stage pipeline", [&] { // Producer (main thread) int items_pushed = 0; while (items_pushed < NUM_ITEMS - 1) { auto guard = pipeline.push( std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); auto it = guard.batch.begin(); items_pushed += guard.batch.size(); for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { *it = nullptr; } } std::latch finish{1}; { auto guard = pipeline.push(1, true); guard.batch[0] = &finish; } finish.wait(); }); { auto guard = pipeline.push(1, true); guard.batch[0] = &done; } stage0_thread.join(); } // Batch size comparison benchmark { constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots constexpr int NUM_ITEMS = 100'000; constexpr int BUSY_ITERS = 100; auto bench = ankerl::nanobench::Bench() .title("Batch Size Impact") .unit("item") .batch(NUM_ITEMS) .relative(true) .warmup(100); for (int batch_size : {1, 4, 16, 64, 256}) { StaticThreadPipeline pipeline(LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { for (;;) { auto guard = pipeline.acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } if (item == &done) { return; } if (item) { item->count_down(); } } } }); bench.run("Batch size " + std::to_string(batch_size), [&] { // Producer (main thread) int items_pushed = 0; while (items_pushed < NUM_ITEMS - 1) { auto guard = pipeline.push( std::min(NUM_ITEMS - 1 - items_pushed, batch_size), true); auto it = guard.batch.begin(); items_pushed += guard.batch.size(); for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { *it = nullptr; } } std::latch finish{1}; { auto guard = pipeline.push(1, true); guard.batch[0] = &finish; } finish.wait(); }); { auto guard = pipeline.push(1, true); guard.batch[0] = &done; } stage0_thread.join(); } } // Helper function for wait strategy benchmarks auto benchmark_wait_strategy = [](const std::string &name, ankerl::nanobench::Bench &bench) { constexpr int LOG_PIPELINE_SIZE = 8; // Smaller buffer to increase contention constexpr int NUM_ITEMS = 50'000; constexpr int BATCH_SIZE = 4; // Small batches to increase coordination constexpr int BUSY_ITERS = 10; // Light work to emphasize coordination overhead StaticThreadPipeline pipeline( LOG_PIPELINE_SIZE); std::latch done{0}; // Stage 0 worker std::thread stage0_thread([&pipeline, &done]() { for (;;) { auto guard = pipeline.template acquire<0, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } if (item == &done) return; } } }); // Stage 1 worker (final stage - always calls futex wake) std::thread stage1_thread([&pipeline, &done]() { for (;;) { auto guard = pipeline.template acquire<1, 0>(); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } if (item == &done) return; if (item) item->count_down(); } } }); bench.run(name, [&] { int items_pushed = 0; while (items_pushed < NUM_ITEMS - 1) { auto guard = pipeline.push( std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); auto it = guard.batch.begin(); items_pushed += guard.batch.size(); for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { *it = nullptr; } } std::latch finish{1}; { auto guard = pipeline.push(1, true); guard.batch[0] = &finish; } finish.wait(); }); // Shutdown { auto guard = pipeline.push(1, true); guard.batch[0] = &done; } stage0_thread.join(); stage1_thread.join(); }; // Wait strategy comparison benchmark - multiple stages to trigger futex wakes { auto bench = ankerl::nanobench::Bench() .title("Wait Strategy Comparison") .unit("item") .batch(50'000) .relative(true) .warmup(50); benchmark_wait_strategy.template operator()( "WaitIfStageEmpty", bench); benchmark_wait_strategy.template operator()("WaitIfUpstreamIdle", bench); benchmark_wait_strategy.template operator()("Never", bench); } // TODO: Add more benchmarks for: // - Multi-stage pipelines (3+ stages) // - Multiple threads per stage // - Different batch sizes // - Pipeline contention under load // - Memory usage patterns // - Latency measurements // - Different wait strategies return 0; }