#include "thread_pipeline.hpp" #include #include #include #include int main() { { constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots constexpr int NUM_ITEMS = 100'000; constexpr int BATCH_SIZE = 16; constexpr int BUSY_ITERS = 100; auto bench = ankerl::nanobench::Bench() .title("Pipeline Throughput") .unit("item") .batch(NUM_ITEMS) .relative(true) .warmup(100); bench.run("Zero stage pipeline", [&] { for (int i = 0; i < NUM_ITEMS; ++i) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } } }); std::vector threads_per_stage = {1}; ThreadPipeline pipeline(LOG_PIPELINE_SIZE, threads_per_stage); std::latch done{0}; // Stage 0 consumer thread std::thread stage0_thread([&pipeline, &done]() { const int stage = 0; const int thread_id = 0; for (;;) { auto guard = pipeline.acquire(stage, thread_id); for (auto &item : guard.batch) { for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { } if (item == &done) { return; } if (item) { item->count_down(); } } } }); bench.run("One stage pipeline", [&] { // Producer (main thread) int items_pushed = 0; while (items_pushed < NUM_ITEMS - 1) { auto guard = pipeline.push( std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); auto it = guard.batch.begin(); items_pushed += guard.batch.size(); for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { *it = nullptr; } } std::latch finish{1}; { auto guard = pipeline.push(1, true); *guard.batch.begin() = &finish; } finish.wait(); }); { auto guard = pipeline.push(1, true); *guard.batch.begin() = &done; } stage0_thread.join(); } // TODO: Add more benchmarks for: // - Multi-stage pipelines (3+ stages) // - Multiple threads per stage // - Different batch sizes // - Pipeline contention under load // - Memory usage patterns // - Latency measurements // - Different wait strategies return 0; }