From 4e9e4d634c3a2917ad31551bb92e4056c4a44637 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 26 Aug 2025 13:11:20 -0400 Subject: [PATCH] Add initial thread pipeline benchmark --- CMakeLists.txt | 5 ++ benchmarks/bench_thread_pipeline.cpp | 93 ++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 benchmarks/bench_thread_pipeline.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 1318856..d001404 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -212,6 +212,10 @@ target_link_libraries(bench_parser_comparison nanobench weaseljson test_data target_include_directories(bench_parser_comparison PRIVATE src ${rapidjson_SOURCE_DIR}/include) +add_executable(bench_thread_pipeline benchmarks/bench_thread_pipeline.cpp) +target_link_libraries(bench_thread_pipeline nanobench Threads::Threads) +target_include_directories(bench_thread_pipeline PRIVATE src) + # Debug tools add_executable( debug_arena tools/debug_arena.cpp src/json_commit_request_parser.cpp @@ -232,3 +236,4 @@ add_test(NAME server_connection_return_tests add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator) add_test(NAME commit_request_benchmarks COMMAND bench_commit_request) add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison) +add_test(NAME thread_pipeline_benchmarks COMMAND bench_thread_pipeline) diff --git a/benchmarks/bench_thread_pipeline.cpp b/benchmarks/bench_thread_pipeline.cpp new file mode 100644 index 0000000..c77115a --- /dev/null +++ b/benchmarks/bench_thread_pipeline.cpp @@ -0,0 +1,93 @@ +#include "thread_pipeline.hpp" + +#include +#include +#include +#include + +int main() { + { + constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots + constexpr int NUM_ITEMS = 100'000; + constexpr int BATCH_SIZE = 16; + constexpr int BUSY_ITERS = 100; + + auto bench = ankerl::nanobench::Bench() + .title("Pipeline Throughput") + .unit("item") + .batch(NUM_ITEMS) + .relative(true) + .warmup(100); + bench.run("Zero stage pipeline", [&] { + for (int i = 0; i < NUM_ITEMS; ++i) { + for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { + } + } + }); + + std::vector threads_per_stage = {1}; + ThreadPipeline pipeline(LOG_PIPELINE_SIZE, threads_per_stage); + + std::latch done{0}; + + // Stage 0 consumer thread + std::thread stage0_thread([&pipeline, &done]() { + const int stage = 0; + const int thread_id = 0; + + for (;;) { + auto guard = pipeline.acquire(stage, thread_id); + + for (auto &item : guard.batch) { + for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) { + } + if (item == &done) { + return; + } + if (item) { + item->count_down(); + } + } + } + }); + + bench.run("One stage pipeline", [&] { + // Producer (main thread) + int items_pushed = 0; + while (items_pushed < NUM_ITEMS - 1) { + auto guard = pipeline.push( + std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true); + + auto it = guard.batch.begin(); + items_pushed += guard.batch.size(); + for (size_t i = 0; i < guard.batch.size(); ++i, ++it) { + *it = nullptr; + } + } + std::latch finish{1}; + { + auto guard = pipeline.push(1, true); + *guard.batch.begin() = &finish; + } + finish.wait(); + }); + + { + auto guard = pipeline.push(1, true); + *guard.batch.begin() = &done; + } + + stage0_thread.join(); + } + + // TODO: Add more benchmarks for: + // - Multi-stage pipelines (3+ stages) + // - Multiple threads per stage + // - Different batch sizes + // - Pipeline contention under load + // - Memory usage patterns + // - Latency measurements + // - Different wait strategies + + return 0; +}