246 lines
7.0 KiB
C++
246 lines
7.0 KiB
C++
#include "thread_pipeline.hpp"
|
|
|
|
#include <latch>
|
|
#include <nanobench.h>
|
|
#include <thread>
|
|
|
|
int main() {
|
|
{
|
|
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
|
|
constexpr int NUM_ITEMS = 100'000;
|
|
constexpr int BATCH_SIZE = 16;
|
|
constexpr int BUSY_ITERS = 100;
|
|
|
|
auto bench = ankerl::nanobench::Bench()
|
|
.title("Pipeline Throughput")
|
|
.unit("item")
|
|
.batch(NUM_ITEMS)
|
|
.relative(true)
|
|
.warmup(100);
|
|
bench.run("Zero stage pipeline", [&] {
|
|
for (int i = 0; i < NUM_ITEMS; ++i) {
|
|
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
|
}
|
|
}
|
|
});
|
|
|
|
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
|
|
pipeline(LOG_PIPELINE_SIZE);
|
|
|
|
std::latch done{0};
|
|
|
|
// Stage 0 consumer thread
|
|
std::thread stage0_thread([&pipeline, &done]() {
|
|
for (;;) {
|
|
auto guard = pipeline.acquire<0, 0>();
|
|
|
|
for (auto &item : guard.batch) {
|
|
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
|
}
|
|
if (item == &done) {
|
|
return;
|
|
}
|
|
if (item) {
|
|
item->count_down();
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
bench.run("One stage pipeline", [&] {
|
|
// Producer (main thread)
|
|
int items_pushed = 0;
|
|
while (items_pushed < NUM_ITEMS - 1) {
|
|
auto guard = pipeline.push(
|
|
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
|
|
|
|
auto it = guard.batch.begin();
|
|
items_pushed += guard.batch.size();
|
|
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
|
*it = nullptr;
|
|
}
|
|
}
|
|
std::latch finish{1};
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &finish;
|
|
}
|
|
finish.wait();
|
|
});
|
|
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &done;
|
|
}
|
|
|
|
stage0_thread.join();
|
|
}
|
|
|
|
// Batch size comparison benchmark
|
|
{
|
|
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
|
|
constexpr int NUM_ITEMS = 100'000;
|
|
constexpr int BUSY_ITERS = 100;
|
|
|
|
auto bench = ankerl::nanobench::Bench()
|
|
.title("Batch Size Impact")
|
|
.unit("item")
|
|
.batch(NUM_ITEMS)
|
|
.relative(true)
|
|
.warmup(100);
|
|
|
|
for (int batch_size : {1, 4, 16, 64, 256}) {
|
|
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
|
|
pipeline(LOG_PIPELINE_SIZE);
|
|
|
|
std::latch done{0};
|
|
|
|
// Stage 0 consumer thread
|
|
std::thread stage0_thread([&pipeline, &done]() {
|
|
for (;;) {
|
|
auto guard = pipeline.acquire<0, 0>();
|
|
|
|
for (auto &item : guard.batch) {
|
|
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
|
}
|
|
if (item == &done) {
|
|
return;
|
|
}
|
|
if (item) {
|
|
item->count_down();
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
bench.run("Batch size " + std::to_string(batch_size), [&] {
|
|
// Producer (main thread)
|
|
int items_pushed = 0;
|
|
while (items_pushed < NUM_ITEMS - 1) {
|
|
auto guard = pipeline.push(
|
|
std::min(NUM_ITEMS - 1 - items_pushed, batch_size), true);
|
|
|
|
auto it = guard.batch.begin();
|
|
items_pushed += guard.batch.size();
|
|
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
|
*it = nullptr;
|
|
}
|
|
}
|
|
std::latch finish{1};
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &finish;
|
|
}
|
|
finish.wait();
|
|
});
|
|
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &done;
|
|
}
|
|
|
|
stage0_thread.join();
|
|
}
|
|
}
|
|
|
|
// Helper function for wait strategy benchmarks
|
|
auto benchmark_wait_strategy =
|
|
[]<WaitStrategy strategy>(const std::string &name,
|
|
ankerl::nanobench::Bench &bench) {
|
|
constexpr int LOG_PIPELINE_SIZE =
|
|
8; // Smaller buffer to increase contention
|
|
constexpr int NUM_ITEMS = 50'000;
|
|
constexpr int BATCH_SIZE = 4; // Small batches to increase coordination
|
|
constexpr int BUSY_ITERS =
|
|
10; // Light work to emphasize coordination overhead
|
|
|
|
StaticThreadPipeline<std::latch *, strategy, 1, 1> pipeline(
|
|
LOG_PIPELINE_SIZE);
|
|
|
|
std::latch done{0};
|
|
|
|
// Stage 0 worker
|
|
std::thread stage0_thread([&pipeline, &done]() {
|
|
for (;;) {
|
|
auto guard = pipeline.template acquire<0, 0>();
|
|
for (auto &item : guard.batch) {
|
|
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
|
}
|
|
if (item == &done)
|
|
return;
|
|
}
|
|
}
|
|
});
|
|
|
|
// Stage 1 worker (final stage - always calls futex wake)
|
|
std::thread stage1_thread([&pipeline, &done]() {
|
|
for (;;) {
|
|
auto guard = pipeline.template acquire<1, 0>();
|
|
for (auto &item : guard.batch) {
|
|
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
|
}
|
|
if (item == &done)
|
|
return;
|
|
if (item)
|
|
item->count_down();
|
|
}
|
|
}
|
|
});
|
|
|
|
bench.run(name, [&] {
|
|
int items_pushed = 0;
|
|
while (items_pushed < NUM_ITEMS - 1) {
|
|
auto guard = pipeline.push(
|
|
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
|
|
auto it = guard.batch.begin();
|
|
items_pushed += guard.batch.size();
|
|
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
|
*it = nullptr;
|
|
}
|
|
}
|
|
std::latch finish{1};
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &finish;
|
|
}
|
|
finish.wait();
|
|
});
|
|
|
|
// Shutdown
|
|
{
|
|
auto guard = pipeline.push(1, true);
|
|
guard.batch[0] = &done;
|
|
}
|
|
stage0_thread.join();
|
|
stage1_thread.join();
|
|
};
|
|
|
|
// Wait strategy comparison benchmark - multiple stages to trigger futex wakes
|
|
{
|
|
auto bench = ankerl::nanobench::Bench()
|
|
.title("Wait Strategy Comparison")
|
|
.unit("item")
|
|
.batch(50'000)
|
|
.relative(true)
|
|
.warmup(50);
|
|
|
|
benchmark_wait_strategy.template operator()<WaitStrategy::WaitIfStageEmpty>(
|
|
"WaitIfStageEmpty", bench);
|
|
benchmark_wait_strategy.template
|
|
operator()<WaitStrategy::WaitIfUpstreamIdle>("WaitIfUpstreamIdle", bench);
|
|
benchmark_wait_strategy.template operator()<WaitStrategy::Never>("Never",
|
|
bench);
|
|
}
|
|
|
|
// TODO: Add more benchmarks for:
|
|
// - Multi-stage pipelines (3+ stages)
|
|
// - Multiple threads per stage
|
|
// - Different batch sizes
|
|
// - Pipeline contention under load
|
|
// - Memory usage patterns
|
|
// - Latency measurements
|
|
// - Different wait strategies
|
|
|
|
return 0;
|
|
}
|