Compare commits
13 Commits
5314448480
...
431befe9bd
| Author | SHA1 | Date | |
|---|---|---|---|
| 431befe9bd | |||
| a734760b60 | |||
| 6e48a0ff9a | |||
| 6dbf29d1e1 | |||
| 0b63e24b98 | |||
| 1f050c861a | |||
| 3c3555da7a | |||
| 4e9e4d634c | |||
| eaeffff620 | |||
| 6ddba37e60 | |||
| f8be6941bb | |||
| 612497f733 | |||
| 50d873e8eb |
@@ -16,3 +16,19 @@ repos:
|
||||
rev: e2c2116d86a80e72e7146a06e68b7c228afc6319 # frozen: v0.6.13
|
||||
hooks:
|
||||
- id: cmake-format
|
||||
|
||||
- repo: https://github.com/psf/black
|
||||
rev: 8a737e727ac5ab2f1d4cf5876720ed276dc8dc4b # frozen: 25.1.0
|
||||
hooks:
|
||||
- id: black
|
||||
language_version: python3
|
||||
|
||||
- repo: local
|
||||
hooks:
|
||||
- id: snake-case-enforcement
|
||||
name: Enforce snake_case naming
|
||||
entry: ./tools/check_snake_case.py
|
||||
language: script
|
||||
files: '\.(hpp|cpp)$'
|
||||
exclude: '^build/'
|
||||
args: ['--check-new-code-only']
|
||||
|
||||
@@ -212,6 +212,10 @@ target_link_libraries(bench_parser_comparison nanobench weaseljson test_data
|
||||
target_include_directories(bench_parser_comparison
|
||||
PRIVATE src ${rapidjson_SOURCE_DIR}/include)
|
||||
|
||||
add_executable(bench_thread_pipeline benchmarks/bench_thread_pipeline.cpp)
|
||||
target_link_libraries(bench_thread_pipeline nanobench Threads::Threads)
|
||||
target_include_directories(bench_thread_pipeline PRIVATE src)
|
||||
|
||||
# Debug tools
|
||||
add_executable(
|
||||
debug_arena tools/debug_arena.cpp src/json_commit_request_parser.cpp
|
||||
@@ -232,3 +236,4 @@ add_test(NAME server_connection_return_tests
|
||||
add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator)
|
||||
add_test(NAME commit_request_benchmarks COMMAND bench_commit_request)
|
||||
add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison)
|
||||
add_test(NAME thread_pipeline_benchmarks COMMAND bench_thread_pipeline)
|
||||
|
||||
245
benchmarks/bench_thread_pipeline.cpp
Normal file
245
benchmarks/bench_thread_pipeline.cpp
Normal file
@@ -0,0 +1,245 @@
|
||||
#include "thread_pipeline.hpp"
|
||||
|
||||
#include <latch>
|
||||
#include <nanobench.h>
|
||||
#include <thread>
|
||||
|
||||
int main() {
|
||||
{
|
||||
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
|
||||
constexpr int NUM_ITEMS = 100'000;
|
||||
constexpr int BATCH_SIZE = 16;
|
||||
constexpr int BUSY_ITERS = 100;
|
||||
|
||||
auto bench = ankerl::nanobench::Bench()
|
||||
.title("Pipeline Throughput")
|
||||
.unit("item")
|
||||
.batch(NUM_ITEMS)
|
||||
.relative(true)
|
||||
.warmup(100);
|
||||
bench.run("Zero stage pipeline", [&] {
|
||||
for (int i = 0; i < NUM_ITEMS; ++i) {
|
||||
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
|
||||
pipeline(LOG_PIPELINE_SIZE);
|
||||
|
||||
std::latch done{0};
|
||||
|
||||
// Stage 0 consumer thread
|
||||
std::thread stage0_thread([&pipeline, &done]() {
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<0, 0>();
|
||||
|
||||
for (auto &item : guard.batch) {
|
||||
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
||||
}
|
||||
if (item == &done) {
|
||||
return;
|
||||
}
|
||||
if (item) {
|
||||
item->count_down();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
bench.run("One stage pipeline", [&] {
|
||||
// Producer (main thread)
|
||||
int items_pushed = 0;
|
||||
while (items_pushed < NUM_ITEMS - 1) {
|
||||
auto guard = pipeline.push(
|
||||
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
|
||||
|
||||
auto it = guard.batch.begin();
|
||||
items_pushed += guard.batch.size();
|
||||
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
||||
*it = nullptr;
|
||||
}
|
||||
}
|
||||
std::latch finish{1};
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &finish;
|
||||
}
|
||||
finish.wait();
|
||||
});
|
||||
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &done;
|
||||
}
|
||||
|
||||
stage0_thread.join();
|
||||
}
|
||||
|
||||
// Batch size comparison benchmark
|
||||
{
|
||||
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
|
||||
constexpr int NUM_ITEMS = 100'000;
|
||||
constexpr int BUSY_ITERS = 100;
|
||||
|
||||
auto bench = ankerl::nanobench::Bench()
|
||||
.title("Batch Size Impact")
|
||||
.unit("item")
|
||||
.batch(NUM_ITEMS)
|
||||
.relative(true)
|
||||
.warmup(100);
|
||||
|
||||
for (int batch_size : {1, 4, 16, 64, 256}) {
|
||||
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
|
||||
pipeline(LOG_PIPELINE_SIZE);
|
||||
|
||||
std::latch done{0};
|
||||
|
||||
// Stage 0 consumer thread
|
||||
std::thread stage0_thread([&pipeline, &done]() {
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<0, 0>();
|
||||
|
||||
for (auto &item : guard.batch) {
|
||||
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
||||
}
|
||||
if (item == &done) {
|
||||
return;
|
||||
}
|
||||
if (item) {
|
||||
item->count_down();
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
bench.run("Batch size " + std::to_string(batch_size), [&] {
|
||||
// Producer (main thread)
|
||||
int items_pushed = 0;
|
||||
while (items_pushed < NUM_ITEMS - 1) {
|
||||
auto guard = pipeline.push(
|
||||
std::min(NUM_ITEMS - 1 - items_pushed, batch_size), true);
|
||||
|
||||
auto it = guard.batch.begin();
|
||||
items_pushed += guard.batch.size();
|
||||
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
||||
*it = nullptr;
|
||||
}
|
||||
}
|
||||
std::latch finish{1};
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &finish;
|
||||
}
|
||||
finish.wait();
|
||||
});
|
||||
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &done;
|
||||
}
|
||||
|
||||
stage0_thread.join();
|
||||
}
|
||||
}
|
||||
|
||||
// Helper function for wait strategy benchmarks
|
||||
auto benchmark_wait_strategy =
|
||||
[]<WaitStrategy strategy>(const std::string &name,
|
||||
ankerl::nanobench::Bench &bench) {
|
||||
constexpr int LOG_PIPELINE_SIZE =
|
||||
8; // Smaller buffer to increase contention
|
||||
constexpr int NUM_ITEMS = 50'000;
|
||||
constexpr int BATCH_SIZE = 4; // Small batches to increase coordination
|
||||
constexpr int BUSY_ITERS =
|
||||
10; // Light work to emphasize coordination overhead
|
||||
|
||||
StaticThreadPipeline<std::latch *, strategy, 1, 1> pipeline(
|
||||
LOG_PIPELINE_SIZE);
|
||||
|
||||
std::latch done{0};
|
||||
|
||||
// Stage 0 worker
|
||||
std::thread stage0_thread([&pipeline, &done]() {
|
||||
for (;;) {
|
||||
auto guard = pipeline.template acquire<0, 0>();
|
||||
for (auto &item : guard.batch) {
|
||||
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
||||
}
|
||||
if (item == &done)
|
||||
return;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
// Stage 1 worker (final stage - always calls futex wake)
|
||||
std::thread stage1_thread([&pipeline, &done]() {
|
||||
for (;;) {
|
||||
auto guard = pipeline.template acquire<1, 0>();
|
||||
for (auto &item : guard.batch) {
|
||||
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
|
||||
}
|
||||
if (item == &done)
|
||||
return;
|
||||
if (item)
|
||||
item->count_down();
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
bench.run(name, [&] {
|
||||
int items_pushed = 0;
|
||||
while (items_pushed < NUM_ITEMS - 1) {
|
||||
auto guard = pipeline.push(
|
||||
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
|
||||
auto it = guard.batch.begin();
|
||||
items_pushed += guard.batch.size();
|
||||
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
|
||||
*it = nullptr;
|
||||
}
|
||||
}
|
||||
std::latch finish{1};
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &finish;
|
||||
}
|
||||
finish.wait();
|
||||
});
|
||||
|
||||
// Shutdown
|
||||
{
|
||||
auto guard = pipeline.push(1, true);
|
||||
guard.batch[0] = &done;
|
||||
}
|
||||
stage0_thread.join();
|
||||
stage1_thread.join();
|
||||
};
|
||||
|
||||
// Wait strategy comparison benchmark - multiple stages to trigger futex wakes
|
||||
{
|
||||
auto bench = ankerl::nanobench::Bench()
|
||||
.title("Wait Strategy Comparison")
|
||||
.unit("item")
|
||||
.batch(50'000)
|
||||
.relative(true)
|
||||
.warmup(50);
|
||||
|
||||
benchmark_wait_strategy.template operator()<WaitStrategy::WaitIfStageEmpty>(
|
||||
"WaitIfStageEmpty", bench);
|
||||
benchmark_wait_strategy.template
|
||||
operator()<WaitStrategy::WaitIfUpstreamIdle>("WaitIfUpstreamIdle", bench);
|
||||
benchmark_wait_strategy.template operator()<WaitStrategy::Never>("Never",
|
||||
bench);
|
||||
}
|
||||
|
||||
// TODO: Add more benchmarks for:
|
||||
// - Multi-stage pipelines (3+ stages)
|
||||
// - Multiple threads per stage
|
||||
// - Different batch sizes
|
||||
// - Pipeline contention under load
|
||||
// - Memory usage patterns
|
||||
// - Latency measurements
|
||||
// - Different wait strategies
|
||||
|
||||
return 0;
|
||||
}
|
||||
@@ -1,12 +1,14 @@
|
||||
#include <nanobench.h>
|
||||
|
||||
#include "../src/loop_iterations.h"
|
||||
|
||||
int main() {
|
||||
constexpr int loopIterations = 1200;
|
||||
ankerl::nanobench::Bench().run(
|
||||
"volatile loop to " + std::to_string(loopIterations), [&] {
|
||||
for (volatile int i = 0; i < 1200; i = i + 1)
|
||||
;
|
||||
});
|
||||
ankerl::nanobench::Bench bench;
|
||||
bench.minEpochIterations(100000);
|
||||
bench.run("volatile loop to " + std::to_string(loopIterations), [&] {
|
||||
for (volatile int i = 0; i < loopIterations; i = i + 1)
|
||||
;
|
||||
});
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
|
||||
#include "connection.hpp"
|
||||
#include "connection_handler.hpp"
|
||||
#include "loop_iterations.h"
|
||||
#include "perfetto_categories.hpp"
|
||||
#include "server.hpp"
|
||||
#include "thread_pipeline.hpp"
|
||||
@@ -64,37 +65,51 @@ struct HttpConnectionState {
|
||||
*/
|
||||
struct HttpHandler : ConnectionHandler {
|
||||
HttpHandler() {
|
||||
for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) {
|
||||
finalStageThreads.emplace_back([this, threadId]() {
|
||||
pthread_setname_np(pthread_self(),
|
||||
("stage-1-" + std::to_string(threadId)).c_str());
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire(1, threadId);
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % kFinalStageThreads) == threadId) {
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "pipeline thread",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-0");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 0>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-1");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 1>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
stage0Thread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-0");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire(0, 0, 0, false);
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
auto &c = *it;
|
||||
auto guard = pipeline.acquire<0, 0>();
|
||||
for (auto &c : guard.batch) {
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
for (volatile int i = 0; i < 1200; i = i + 1)
|
||||
for (volatile int i = 0; i < loopIterations; i = i + 1)
|
||||
;
|
||||
}
|
||||
}
|
||||
@@ -102,7 +117,7 @@ struct HttpHandler : ConnectionHandler {
|
||||
}
|
||||
~HttpHandler() {
|
||||
{
|
||||
auto guard = pipeline.push(kFinalStageThreads, true);
|
||||
auto guard = pipeline.push(2, true);
|
||||
for (auto &c : guard.batch) {
|
||||
c = {};
|
||||
}
|
||||
@@ -137,8 +152,9 @@ struct HttpHandler : ConnectionHandler {
|
||||
private:
|
||||
static constexpr int kFinalStageThreads = 2;
|
||||
static constexpr int kLogSize = 12;
|
||||
ThreadPipeline<std::unique_ptr<Connection>> pipeline{
|
||||
kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}};
|
||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
|
||||
pipeline{kLogSize};
|
||||
std::thread stage0Thread;
|
||||
std::vector<std::thread> finalStageThreads;
|
||||
|
||||
|
||||
3
src/loop_iterations.h
Normal file
3
src/loop_iterations.h
Normal file
@@ -0,0 +1,3 @@
|
||||
#pragma once
|
||||
|
||||
constexpr int loopIterations = 1550;
|
||||
@@ -1,7 +1,9 @@
|
||||
#pragma once
|
||||
|
||||
#include <array>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
@@ -9,27 +11,252 @@
|
||||
#include <utility>
|
||||
#include <vector>
|
||||
|
||||
// Multi-stage lock-free pipeline for high-throughput inter-thread
|
||||
// communication.
|
||||
// Wait strategies for controlling thread blocking behavior when no work is
|
||||
// available
|
||||
enum class WaitStrategy {
|
||||
// Never block - threads busy-wait (spin) when no work available.
|
||||
// Stage threads will always use 100% CPU even when idle.
|
||||
// Requires dedicated CPU cores to avoid scheduler thrashing.
|
||||
// Use when: latency is critical and you have spare cores.
|
||||
Never,
|
||||
|
||||
// Block only when all upstream stages are idle (no new work entering
|
||||
// pipeline).
|
||||
// Downstream threads busy-wait if upstream has work but not for their stage.
|
||||
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
|
||||
// Requires dedicated cores to avoid priority inversion when pipeline has
|
||||
// work.
|
||||
// Use when: high throughput with spare cores and sustained workloads.
|
||||
WaitIfUpstreamIdle,
|
||||
|
||||
// Block when individual stages are empty (original behavior).
|
||||
// Each stage waits independently on its input sources.
|
||||
// Safe for shared CPU environments, works well with variable workloads.
|
||||
// Use when: general purpose, shared cores, or unpredictable workloads.
|
||||
WaitIfStageEmpty,
|
||||
};
|
||||
|
||||
// Core thread state
|
||||
struct ThreadState {
|
||||
alignas(128) std::atomic<uint32_t> pops{0};
|
||||
uint32_t local_pops{0};
|
||||
std::vector<uint32_t> last_push_read;
|
||||
bool last_stage;
|
||||
};
|
||||
|
||||
// Compile-time topology configuration for static pipelines
|
||||
//
|
||||
// This template defines a pipeline topology at compile-time:
|
||||
// - Stage and thread calculations done at compile-time
|
||||
// - Type-safe indexing: Stage and thread indices validated at compile-time
|
||||
// - Fixed-size arrays with known bounds
|
||||
// - Code specialization for each topology
|
||||
//
|
||||
// Example: StaticPipelineTopology<1, 4, 2> creates:
|
||||
// - Stage 0: 1 thread (index 0)
|
||||
// - Stage 1: 4 threads (indices 1-4)
|
||||
// - Stage 2: 2 threads (indices 5-6)
|
||||
// - Total: 7 threads across 3 stages
|
||||
template <int... ThreadsPerStage> struct StaticPipelineTopology {
|
||||
static_assert(sizeof...(ThreadsPerStage) > 0,
|
||||
"Must specify at least one stage");
|
||||
static_assert(((ThreadsPerStage > 0) && ...),
|
||||
"All stages must have at least one thread");
|
||||
|
||||
static constexpr int num_stages = sizeof...(ThreadsPerStage);
|
||||
static constexpr std::array<int, num_stages> threads_per_stage = {
|
||||
ThreadsPerStage...};
|
||||
static constexpr int total_threads = (ThreadsPerStage + ...);
|
||||
|
||||
// Compile-time stage offset calculation
|
||||
template <int Stage> static constexpr int stage_offset() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
if constexpr (Stage == 0) {
|
||||
return 0;
|
||||
} else {
|
||||
return stage_offset<Stage - 1>() + threads_per_stage[Stage - 1];
|
||||
}
|
||||
}
|
||||
|
||||
// Compile-time thread index calculation
|
||||
template <int Stage, int Thread> static constexpr int thread_index() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
static_assert(Thread >= 0 && Thread < threads_per_stage[Stage],
|
||||
"Thread index out of bounds");
|
||||
return stage_offset<Stage>() + Thread;
|
||||
}
|
||||
|
||||
// Compile-time previous stage thread count
|
||||
template <int Stage> static constexpr int prev_stage_thread_count() {
|
||||
static_assert(Stage >= 0 && Stage < num_stages,
|
||||
"Stage index out of bounds");
|
||||
if constexpr (Stage == 0) {
|
||||
return 1;
|
||||
} else {
|
||||
return threads_per_stage[Stage - 1];
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Static pipeline algorithms - compile-time specialized versions
|
||||
namespace StaticPipelineAlgorithms {
|
||||
|
||||
template <WaitStrategy wait_strategy, typename Topology, int Stage,
|
||||
int ThreadInStage>
|
||||
uint32_t calculate_safe_len(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
std::atomic<uint32_t> &pushes, bool may_block) {
|
||||
constexpr int thread_idx =
|
||||
Topology::template thread_index<Stage, ThreadInStage>();
|
||||
auto &thread = all_threads[thread_idx];
|
||||
uint32_t safe_len = UINT32_MAX;
|
||||
|
||||
constexpr int prev_stage_threads =
|
||||
Topology::template prev_stage_thread_count<Stage>();
|
||||
|
||||
// Compile-time loop over previous stage threads
|
||||
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
|
||||
(
|
||||
[&] {
|
||||
auto &last_push = [&]() -> std::atomic<uint32_t> & {
|
||||
if constexpr (Stage == 0) {
|
||||
return pushes;
|
||||
} else {
|
||||
constexpr int prev_thread_idx =
|
||||
Topology::template thread_index<Stage - 1, Is>();
|
||||
return all_threads[prev_thread_idx].pops;
|
||||
}
|
||||
}();
|
||||
|
||||
if (thread.last_push_read[Is] == thread.local_pops) {
|
||||
thread.last_push_read[Is] =
|
||||
last_push.load(std::memory_order_acquire);
|
||||
if (thread.last_push_read[Is] == thread.local_pops) {
|
||||
if (!may_block) {
|
||||
safe_len = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
if constexpr (wait_strategy == WaitStrategy::Never) {
|
||||
// Empty - busy wait
|
||||
} else if constexpr (wait_strategy ==
|
||||
WaitStrategy::WaitIfUpstreamIdle) {
|
||||
// We're allowed to spin as long as we eventually go to 0% cpu
|
||||
// usage on idle
|
||||
uint32_t push;
|
||||
for (int i = 0; i < 100000; ++i) {
|
||||
push = pushes.load(std::memory_order_relaxed);
|
||||
if (push != thread.local_pops) {
|
||||
goto dont_wait;
|
||||
}
|
||||
}
|
||||
pushes.wait(push, std::memory_order_relaxed);
|
||||
dont_wait:;
|
||||
} else {
|
||||
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
|
||||
last_push.wait(thread.last_push_read[Is],
|
||||
std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
thread.last_push_read[Is] =
|
||||
last_push.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
safe_len =
|
||||
std::min(safe_len, thread.last_push_read[Is] - thread.local_pops);
|
||||
}(),
|
||||
...);
|
||||
}(std::make_index_sequence<prev_stage_threads>{});
|
||||
|
||||
return safe_len;
|
||||
}
|
||||
|
||||
template <WaitStrategy wait_strategy, typename Topology, int Stage,
|
||||
int ThreadInStage>
|
||||
void update_thread_pops(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
uint32_t local_pops) {
|
||||
constexpr int thread_idx =
|
||||
Topology::template thread_index<Stage, ThreadInStage>();
|
||||
auto &thread_state = all_threads[thread_idx];
|
||||
|
||||
if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) {
|
||||
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
|
||||
thread_state.pops.notify_all();
|
||||
} else if constexpr (Stage == Topology::num_stages - 1) { // last stage
|
||||
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
|
||||
thread_state.pops.notify_all();
|
||||
} else {
|
||||
thread_state.pops.store(local_pops, std::memory_order_release);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename Topology>
|
||||
int check_producer_capacity(
|
||||
std::array<ThreadState, Topology::total_threads> &all_threads,
|
||||
uint32_t slot, uint32_t size, uint32_t slot_count, bool block) {
|
||||
constexpr int last_stage = Topology::num_stages - 1;
|
||||
constexpr int last_stage_offset =
|
||||
Topology::template stage_offset<last_stage>();
|
||||
constexpr int last_stage_thread_count =
|
||||
Topology::threads_per_stage[last_stage];
|
||||
|
||||
for (int i = 0; i < last_stage_thread_count; ++i) {
|
||||
auto &thread = all_threads[last_stage_offset + i];
|
||||
uint32_t pops = thread.pops.load(std::memory_order_acquire);
|
||||
if (slot + size - pops > slot_count) {
|
||||
if (!block) {
|
||||
return 2; // Cannot proceed
|
||||
}
|
||||
thread.pops.wait(pops, std::memory_order_relaxed);
|
||||
return 1; // Should retry
|
||||
}
|
||||
}
|
||||
return 0; // Can proceed
|
||||
}
|
||||
} // namespace StaticPipelineAlgorithms
|
||||
|
||||
// Static multi-stage lock-free pipeline for inter-thread communication
|
||||
// with compile-time topology specification.
|
||||
//
|
||||
// Overview:
|
||||
// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... ->
|
||||
// final stage)
|
||||
// - Items flow from producers through multiple processing stages (stage 0 ->
|
||||
// stage 1 -> ... -> final stage)
|
||||
// - Each stage can have multiple worker threads processing items in parallel
|
||||
// - Uses a shared ring buffer with atomic counters for lock-free coordination
|
||||
// - Supports batch processing for efficiency
|
||||
// - Compile-time topology specification via template parameters
|
||||
//
|
||||
// Architecture:
|
||||
// - Producers: External threads that add items to the pipeline via push()
|
||||
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
|
||||
// acquire<Stage, Thread>()
|
||||
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
|
||||
//
|
||||
// Differences from Dynamic Version:
|
||||
// - Template parameters specify topology at compile-time (e.g., <Item,
|
||||
// WaitStrategy::Never, 1, 4, 2>)
|
||||
// - Stage and thread indices are template parameters, validated at compile-time
|
||||
// - Fixed-size arrays replace dynamic vectors
|
||||
// - Specialized algorithms for each stage/thread combination
|
||||
// - Type-safe guards prevent runtime indexing errors
|
||||
//
|
||||
// Usage Pattern:
|
||||
// // Producer threads (add items to stage 0):
|
||||
// using Pipeline = StaticThreadPipeline<Item, WaitStrategy::WaitIfStageEmpty,
|
||||
// 1, 4, 2>; Pipeline pipeline(lgSlotCount);
|
||||
//
|
||||
// // Producer threads (add items for stage 0 to consume):
|
||||
// auto guard = pipeline.push(batchSize, /*block=*/true);
|
||||
// for (auto& item : guard.batch) {
|
||||
// // Initialize item data
|
||||
// }
|
||||
// // Guard destructor publishes batch to consumers
|
||||
// // Guard destructor publishes batch to stage 0 consumers
|
||||
//
|
||||
// // Consumer threads (process items from any stage):
|
||||
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
|
||||
// /*mayBlock=*/true); for (auto& item : guard.batch) {
|
||||
// // Stage worker threads (process items and pass to next stage):
|
||||
// auto guard = pipeline.acquire<Stage, Thread>(maxBatch, /*may_block=*/true);
|
||||
// for (auto& item : guard.batch) {
|
||||
// // Process item
|
||||
// }
|
||||
// // Guard destructor marks items as consumed and available to next stage
|
||||
@@ -44,35 +271,29 @@
|
||||
// ordering
|
||||
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
|
||||
// - RAII guards ensure proper cleanup even with exceptions
|
||||
template <class T> struct ThreadPipeline {
|
||||
template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage>
|
||||
struct StaticThreadPipeline {
|
||||
using Topology = StaticPipelineTopology<ThreadsPerStage...>;
|
||||
|
||||
// Constructor
|
||||
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
|
||||
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1
|
||||
// stage-0 worker, 4 stage-1 workers, 2 stage-2 workers)
|
||||
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
|
||||
// Template parameters specify pipeline topology (e.g., <Item, Never, 1, 4,
|
||||
// 2>) Note: Producer threads are external to the pipeline and not counted in
|
||||
// ThreadsPerStage
|
||||
explicit StaticThreadPipeline(int lgSlotCount)
|
||||
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
|
||||
threadState(threadsPerStage.size()), ring(slot_count) {
|
||||
ring(slot_count) {
|
||||
// Otherwise we can't tell the difference between full and empty.
|
||||
assert(!(slot_count_mask & 0x80000000));
|
||||
for (size_t i = 0; i < threadsPerStage.size(); ++i) {
|
||||
threadState[i] = std::vector<ThreadState>(threadsPerStage[i]);
|
||||
for (auto &t : threadState[i]) {
|
||||
if (i == 0) {
|
||||
t.last_push_read = std::vector<uint32_t>(1);
|
||||
} else {
|
||||
t.last_push_read = std::vector<uint32_t>(threadsPerStage[i - 1]);
|
||||
}
|
||||
}
|
||||
}
|
||||
initialize_all_threads();
|
||||
}
|
||||
|
||||
ThreadPipeline(ThreadPipeline const &) = delete;
|
||||
ThreadPipeline &operator=(ThreadPipeline const &) = delete;
|
||||
ThreadPipeline(ThreadPipeline &&) = delete;
|
||||
ThreadPipeline &operator=(ThreadPipeline &&) = delete;
|
||||
StaticThreadPipeline(StaticThreadPipeline const &) = delete;
|
||||
StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete;
|
||||
StaticThreadPipeline(StaticThreadPipeline &&) = delete;
|
||||
StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete;
|
||||
|
||||
struct Batch {
|
||||
|
||||
Batch() : ring(), begin_(), end_() {}
|
||||
|
||||
struct Iterator {
|
||||
@@ -125,9 +346,6 @@ template <class T> struct ThreadPipeline {
|
||||
return static_cast<difference_type>(index_) -
|
||||
static_cast<difference_type>(rhs.index_);
|
||||
}
|
||||
reference operator[](difference_type n) const {
|
||||
return (*ring)[(index_ + n) & (ring->size() - 1)];
|
||||
}
|
||||
friend Iterator operator+(difference_type n, const Iterator &iter) {
|
||||
return iter + n;
|
||||
}
|
||||
@@ -141,7 +359,6 @@ template <class T> struct ThreadPipeline {
|
||||
}
|
||||
friend bool operator<(const Iterator &lhs, const Iterator &rhs) {
|
||||
assert(lhs.ring == rhs.ring);
|
||||
// Handle potential uint32_t wraparound by using signed difference
|
||||
return static_cast<int32_t>(lhs.index_ - rhs.index_) < 0;
|
||||
}
|
||||
friend bool operator<=(const Iterator &lhs, const Iterator &rhs) {
|
||||
@@ -157,9 +374,6 @@ template <class T> struct ThreadPipeline {
|
||||
return static_cast<int32_t>(lhs.index_ - rhs.index_) >= 0;
|
||||
}
|
||||
|
||||
/// Returns the ring buffer index (0 to ring->size()-1) for this iterator
|
||||
/// position. Useful for distributing work across multiple threads by
|
||||
/// using modulo operations.
|
||||
uint32_t index() const { return index_ & (ring->size() - 1); }
|
||||
|
||||
private:
|
||||
@@ -175,9 +389,12 @@ template <class T> struct ThreadPipeline {
|
||||
|
||||
[[nodiscard]] size_t size() const { return end_ - begin_; }
|
||||
[[nodiscard]] bool empty() const { return end_ == begin_; }
|
||||
T &operator[](uint32_t n) {
|
||||
return (*ring)[(begin_ + n) & (ring->size() - 1)];
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ThreadPipeline<T>;
|
||||
friend struct StaticThreadPipeline;
|
||||
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
|
||||
: ring(ring), begin_(begin_), end_(end_) {}
|
||||
std::vector<T> *const ring;
|
||||
@@ -185,97 +402,90 @@ template <class T> struct ThreadPipeline {
|
||||
uint32_t end_;
|
||||
};
|
||||
|
||||
// Static thread storage - fixed size array
|
||||
std::array<ThreadState, Topology::total_threads> all_threads;
|
||||
|
||||
private:
|
||||
Batch acquireHelper(int stage, int thread, uint32_t maxBatch, bool mayBlock) {
|
||||
uint32_t begin = threadState[stage][thread].local_pops & slot_count_mask;
|
||||
uint32_t len = getSafeLen(stage, thread, mayBlock);
|
||||
alignas(128) std::atomic<uint32_t> slots{0};
|
||||
alignas(128) std::atomic<uint32_t> pushes{0};
|
||||
const uint32_t slot_count;
|
||||
const uint32_t slot_count_mask;
|
||||
|
||||
std::vector<T> ring;
|
||||
|
||||
void initialize_all_threads() {
|
||||
[&]<std::size_t... StageIndices>(std::index_sequence<StageIndices...>) {
|
||||
(init_stage_threads<StageIndices>(), ...);
|
||||
}(std::make_index_sequence<Topology::num_stages>{});
|
||||
}
|
||||
|
||||
template <int Stage> void init_stage_threads() {
|
||||
constexpr int stage_offset = Topology::template stage_offset<Stage>();
|
||||
constexpr int stage_thread_count = Topology::threads_per_stage[Stage];
|
||||
constexpr int prev_stage_threads =
|
||||
Topology::template prev_stage_thread_count<Stage>();
|
||||
constexpr bool is_last_stage = (Stage == Topology::num_stages - 1);
|
||||
|
||||
for (int thread = 0; thread < stage_thread_count; ++thread) {
|
||||
auto &thread_state = all_threads[stage_offset + thread];
|
||||
thread_state.last_stage = is_last_stage;
|
||||
thread_state.last_push_read = std::vector<uint32_t>(prev_stage_threads);
|
||||
}
|
||||
}
|
||||
|
||||
template <int Stage, int Thread>
|
||||
Batch acquire_helper(uint32_t maxBatch, bool mayBlock) {
|
||||
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
|
||||
auto &thread_state = all_threads[thread_idx];
|
||||
|
||||
uint32_t begin = thread_state.local_pops & slot_count_mask;
|
||||
uint32_t len = StaticPipelineAlgorithms::calculate_safe_len<
|
||||
wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock);
|
||||
|
||||
if (maxBatch != 0) {
|
||||
len = std::min(len, maxBatch);
|
||||
}
|
||||
if (len == 0) {
|
||||
return Batch{};
|
||||
}
|
||||
|
||||
auto result = Batch{&ring, begin, begin + len};
|
||||
threadState[stage][thread].local_pops += len;
|
||||
thread_state.local_pops += len;
|
||||
return result;
|
||||
}
|
||||
|
||||
// Used by producer threads to reserve slots in the ring buffer
|
||||
alignas(128) std::atomic<uint32_t> slots{0};
|
||||
// Used for producers to publish
|
||||
alignas(128) std::atomic<uint32_t> pushes{0};
|
||||
|
||||
const uint32_t slot_count;
|
||||
const uint32_t slot_count_mask;
|
||||
|
||||
// We can safely acquire this many items
|
||||
uint32_t getSafeLen(int stage, int threadIndex, bool mayBlock) {
|
||||
uint32_t safeLen = UINT32_MAX;
|
||||
auto &thread = threadState[stage][threadIndex];
|
||||
// See if we can determine that there are entries we can acquire entirely
|
||||
// from state local to the thread
|
||||
for (int i = 0; i < int(thread.last_push_read.size()); ++i) {
|
||||
auto &lastPush = stage == 0 ? pushes : threadState[stage - 1][i].pops;
|
||||
if (thread.last_push_read[i] == thread.local_pops) {
|
||||
// Re-read lastPush with memory order and try again
|
||||
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
|
||||
if (thread.last_push_read[i] == thread.local_pops) {
|
||||
if (!mayBlock) {
|
||||
return 0;
|
||||
}
|
||||
// Wait for lastPush to change and try again
|
||||
lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed);
|
||||
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
safeLen = std::min(safeLen, thread.last_push_read[i] - thread.local_pops);
|
||||
}
|
||||
return safeLen;
|
||||
}
|
||||
|
||||
struct ThreadState {
|
||||
// Where this thread has published up to
|
||||
alignas(128) std::atomic<uint32_t> pops{0};
|
||||
// Where this thread will publish to the next time it publishes
|
||||
uint32_t local_pops{0};
|
||||
// Where the previous stage's threads have published up to last we checked
|
||||
std::vector<uint32_t> last_push_read;
|
||||
};
|
||||
// threadState[i][j] is the state for thread j in stage i
|
||||
std::vector<std::vector<ThreadState>> threadState;
|
||||
// Shared ring buffer
|
||||
std::vector<T> ring;
|
||||
|
||||
public:
|
||||
struct StageGuard {
|
||||
template <int Stage, int Thread> struct StageGuard {
|
||||
Batch batch;
|
||||
|
||||
~StageGuard() {
|
||||
if (ts != nullptr) {
|
||||
// seq_cst so that the notify can't be ordered before the store
|
||||
ts->pops.store(local_pops, std::memory_order_seq_cst);
|
||||
ts->pops.notify_all();
|
||||
if (!batch.empty()) {
|
||||
StaticPipelineAlgorithms::update_thread_pops<wait_strategy, Topology,
|
||||
Stage, Thread>(
|
||||
pipeline->all_threads, local_pops);
|
||||
}
|
||||
}
|
||||
|
||||
StageGuard(StageGuard const &) = delete;
|
||||
StageGuard &operator=(StageGuard const &) = delete;
|
||||
StageGuard(StageGuard &&other)
|
||||
StageGuard(StageGuard &&other) noexcept
|
||||
: batch(other.batch), local_pops(other.local_pops),
|
||||
ts(std::exchange(other.ts, nullptr)) {}
|
||||
StageGuard &operator=(StageGuard &&other) {
|
||||
pipeline(std::exchange(other.pipeline, nullptr)) {}
|
||||
StageGuard &operator=(StageGuard &&other) noexcept {
|
||||
batch = other.batch;
|
||||
local_pops = other.local_pops;
|
||||
ts = std::exchange(other.ts, nullptr);
|
||||
pipeline = std::exchange(other.pipeline, nullptr);
|
||||
return *this;
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct StaticThreadPipeline;
|
||||
uint32_t local_pops;
|
||||
friend struct ThreadPipeline;
|
||||
StageGuard(Batch batch, ThreadState *ts)
|
||||
: batch(batch), local_pops(ts->local_pops),
|
||||
ts(batch.empty() ? nullptr : ts) {}
|
||||
ThreadState *ts;
|
||||
StaticThreadPipeline *pipeline;
|
||||
|
||||
StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline)
|
||||
: batch(batch), local_pops(local_pops),
|
||||
pipeline(batch.empty() ? nullptr : pipeline) {}
|
||||
};
|
||||
|
||||
struct ProducerGuard {
|
||||
@@ -285,8 +495,6 @@ public:
|
||||
if (tp == nullptr) {
|
||||
return;
|
||||
}
|
||||
// Wait for earlier slots to finish being published, since publishing
|
||||
// implies that all previous slots were also published.
|
||||
for (;;) {
|
||||
uint32_t p = tp->pushes.load(std::memory_order_acquire);
|
||||
if (p == old_slot) {
|
||||
@@ -294,36 +502,42 @@ public:
|
||||
}
|
||||
tp->pushes.wait(p, std::memory_order_relaxed);
|
||||
}
|
||||
// Publish. seq_cst so that the notify can't be ordered before the store
|
||||
tp->pushes.store(new_slot, std::memory_order_seq_cst);
|
||||
// We have to notify every time, since we don't know if this is the last
|
||||
// push ever
|
||||
tp->pushes.notify_all();
|
||||
}
|
||||
|
||||
private:
|
||||
friend struct ThreadPipeline;
|
||||
friend struct StaticThreadPipeline;
|
||||
ProducerGuard() : batch(), tp() {}
|
||||
ProducerGuard(Batch batch, ThreadPipeline<T> *tp, uint32_t old_slot,
|
||||
ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot,
|
||||
uint32_t new_slot)
|
||||
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
|
||||
ThreadPipeline<T> *const tp;
|
||||
StaticThreadPipeline *const tp;
|
||||
uint32_t old_slot;
|
||||
uint32_t new_slot;
|
||||
};
|
||||
|
||||
// Acquire a batch of items for processing by a consumer thread.
|
||||
// stage: which processing stage (0 = first consumer stage after producers)
|
||||
// thread: thread ID within the stage (0 to threadsPerStage[stage]-1)
|
||||
// maxBatch: maximum items to acquire (0 = no limit)
|
||||
// mayBlock: whether to block waiting for items (false = return empty batch if
|
||||
// none available) Returns: StageGuard with batch of items to process
|
||||
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
|
||||
bool mayBlock = true) {
|
||||
assert(stage < int(threadState.size()));
|
||||
assert(thread < int(threadState[stage].size()));
|
||||
auto batch = acquireHelper(stage, thread, maxBatch, mayBlock);
|
||||
return StageGuard{std::move(batch), &threadState[stage][thread]};
|
||||
// Stage: which processing stage (0 = first consumer stage after producers) -
|
||||
// compile-time parameter Thread: thread ID within the stage (0 to
|
||||
// ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items
|
||||
// to acquire (0 = no limit) may_block: whether to block waiting for items
|
||||
// (false = return empty batch if none available) Returns: StageGuard<Stage,
|
||||
// Thread> with batch of items to process and compile-time type safety
|
||||
template <int Stage, int Thread>
|
||||
[[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0,
|
||||
bool may_block = true) {
|
||||
static_assert(Stage >= 0 && Stage < Topology::num_stages,
|
||||
"Stage index out of bounds");
|
||||
static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage],
|
||||
"Thread index out of bounds");
|
||||
|
||||
auto batch = acquire_helper<Stage, Thread>(maxBatch, may_block);
|
||||
|
||||
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
|
||||
uint32_t local_pops = all_threads[thread_idx].local_pops;
|
||||
|
||||
return StageGuard<Stage, Thread>{std::move(batch), local_pops, this};
|
||||
}
|
||||
|
||||
// Reserve slots in the ring buffer for a producer thread to fill with items.
|
||||
@@ -349,24 +563,23 @@ public:
|
||||
if (size > slot_count) {
|
||||
std::abort();
|
||||
}
|
||||
// Reserve a slot to construct an item, but don't publish to consumer yet
|
||||
|
||||
uint32_t slot;
|
||||
uint32_t begin;
|
||||
for (;;) {
|
||||
begin_loop:
|
||||
slot = slots.load(std::memory_order_relaxed);
|
||||
begin = slot & slot_count_mask;
|
||||
// Make sure we won't stomp the back of the ring buffer
|
||||
for (auto &thread : threadState.back()) {
|
||||
uint32_t pops = thread.pops.load(std::memory_order_acquire);
|
||||
if (slot + size - pops > slot_count) {
|
||||
if (!block) {
|
||||
return ProducerGuard{};
|
||||
}
|
||||
thread.pops.wait(pops, std::memory_order_relaxed);
|
||||
goto begin_loop;
|
||||
}
|
||||
|
||||
int capacity_result =
|
||||
StaticPipelineAlgorithms::check_producer_capacity<Topology>(
|
||||
all_threads, slot, size, slot_count, block);
|
||||
if (capacity_result == 1) {
|
||||
continue;
|
||||
}
|
||||
if (capacity_result == 2) {
|
||||
return ProducerGuard{};
|
||||
}
|
||||
|
||||
if (slots.compare_exchange_weak(slot, slot + size,
|
||||
std::memory_order_relaxed,
|
||||
std::memory_order_relaxed)) {
|
||||
|
||||
1
style.md
1
style.md
@@ -119,6 +119,7 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
|
||||
|
||||
### Variables and Functions
|
||||
- **snake_case** for all variables, functions, and member functions
|
||||
- **Legacy camelCase exists** - the codebase currently contains mixed naming due to historical development. New code should use snake_case. Existing camelCase should be converted to snake_case during natural refactoring (not mass renaming).
|
||||
```cpp
|
||||
int64_t used_bytes() const;
|
||||
void add_block(int64_t size);
|
||||
|
||||
@@ -18,10 +18,12 @@ struct Message {
|
||||
|
||||
struct EchoHandler : public ConnectionHandler {
|
||||
private:
|
||||
ThreadPipeline<Message> &pipeline;
|
||||
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1> &pipeline;
|
||||
|
||||
public:
|
||||
explicit EchoHandler(ThreadPipeline<Message> &pipeline)
|
||||
explicit EchoHandler(
|
||||
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1>
|
||||
&pipeline)
|
||||
: pipeline(pipeline) {}
|
||||
|
||||
void on_data_arrived(std::string_view data,
|
||||
@@ -42,11 +44,11 @@ TEST_CASE(
|
||||
config.server.io_threads = 1;
|
||||
config.server.epoll_instances = 1;
|
||||
|
||||
ThreadPipeline<Message> pipeline{10, {1}};
|
||||
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1> pipeline{10};
|
||||
EchoHandler handler{pipeline};
|
||||
auto echoThread = std::thread{[&]() {
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire(0, 0);
|
||||
auto guard = pipeline.acquire<0, 0>();
|
||||
for (auto &message : guard.batch) {
|
||||
bool done = message.done;
|
||||
if (done) {
|
||||
|
||||
@@ -2,60 +2,51 @@
|
||||
|
||||
## Summary
|
||||
|
||||
WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline, with 396ns serial CPU time per request. Higher serial CPU time means more CPU budget available for serial processing.
|
||||
WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline with futex wake optimization, delivering 488ns serial CPU time per request while maintaining 0% CPU usage when idle. Higher serial CPU time means more CPU budget available for serial processing.
|
||||
|
||||
## Performance Metrics
|
||||
|
||||
### Throughput
|
||||
- Non-blocking: 1.3M requests/second over unix socket
|
||||
- Blocking: 1.1M requests/second over unix socket
|
||||
- **1.3M requests/second** over unix socket
|
||||
- 8 I/O threads with 8 epoll instances
|
||||
- Load tester used 12 network threads
|
||||
- Max latency: 4ms out of 90M requests
|
||||
- **0% CPU usage when idle** (optimized futex wake implementation)
|
||||
|
||||
### Threading Architecture
|
||||
- Two-stage pipeline: Stage-0 (noop) → Stage-1 (connection return)
|
||||
- Lock-free coordination using atomic ring buffer
|
||||
- **Optimized futex wake**: Only wake on final pipeline stage
|
||||
- Each request "processed" serially on single thread
|
||||
|
||||
### Non-blocking vs Blocking Acquisition
|
||||
### Performance Characteristics
|
||||
|
||||
**Non-blocking acquisition (`mayBlock=false`)**:
|
||||
- Throughput: 1.3M requests/second (maintained with up to 1200 loop iterations)
|
||||
- Stage-0 CPU: 100% (10% futex wake, 90% other)
|
||||
- Serial CPU time per request: 396ns (1200 iterations, validated with nanobench)
|
||||
- Theoretical maximum serial CPU time: 769ns (1,000,000,000ns ÷ 1,300,000 req/s)
|
||||
- Serial efficiency: 51.5% (396ns ÷ 769ns)
|
||||
- 100% CPU usage when idle
|
||||
**Optimized Pipeline Mode**:
|
||||
- **Throughput**: 1.3M requests/second
|
||||
- **Serial CPU time per request**: 488ns (validated with nanobench)
|
||||
- **Theoretical maximum serial CPU time**: 769ns (1,000,000,000ns ÷ 1,300,000 req/s)
|
||||
- **Serial efficiency**: 63.4% (488ns ÷ 769ns)
|
||||
- **CPU usage when idle**: 0%
|
||||
|
||||
**Blocking acquisition (`mayBlock=true`)**:
|
||||
- Throughput: 1.1M requests/second (800 loop iterations)
|
||||
- Stage-0 CPU: 100% total (18% sched_yield, 8% futex wait, 7% futex wake, 67% other)
|
||||
- Serial CPU time per request: 266ns (800 iterations, validated with nanobench)
|
||||
- Theoretical maximum serial CPU time: 909ns (1,000,000,000ns ÷ 1,100,000 req/s)
|
||||
- Serial efficiency: 29.3% (266ns ÷ 909ns)
|
||||
- 0% CPU usage when idle
|
||||
### Key Optimization: Futex Wake Reduction
|
||||
- **Previous approach**: Futex wake at every pipeline stage (10% CPU overhead)
|
||||
- **Optimized approach**: Futex wake only at final stage to wake producers. Stages now do their futex wait on the beginning of the pipeline instead of the previous stage.
|
||||
- **Result**: 23% increase in serial CPU budget (396ns → 488ns)
|
||||
- **Benefits**: Higher throughput per CPU cycle + idle efficiency
|
||||
|
||||
### Request Flow
|
||||
```
|
||||
I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline
|
||||
↑ ↓
|
||||
| Stage 0: Noop thread
|
||||
| (396ns serial CPU per request)
|
||||
| (488ns serial CPU per request)
|
||||
| ↓
|
||||
| Stage 1: Connection return
|
||||
| (optimized futex wake)
|
||||
| ↓
|
||||
└─────────────────────── Server::release_back_to_server()
|
||||
```
|
||||
|
||||
### Pipeline Configuration
|
||||
- Stage 0: 1 noop thread
|
||||
- Stage 1: 2 worker threads for connection return
|
||||
- Atomic counters with shared ring buffer
|
||||
|
||||
### Memory Management
|
||||
- Transfer ownership of the connection along the pipeline
|
||||
|
||||
## Test Configuration
|
||||
|
||||
- Server: test_config.toml with 8 io_threads, 8 epoll_instances
|
||||
|
||||
166
tools/check_snake_case.py
Executable file
166
tools/check_snake_case.py
Executable file
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Pre-commit hook to enforce snake_case naming conventions in C++ code.
|
||||
Only checks modified lines to avoid breaking existing camelCase code.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def get_modified_lines(filepath):
|
||||
"""Get line numbers of modified lines using git diff."""
|
||||
try:
|
||||
# Get diff for staged changes (what's being committed)
|
||||
result = subprocess.run(
|
||||
["git", "diff", "--cached", "--unified=0", filepath],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
)
|
||||
|
||||
modified_lines = set()
|
||||
for line in result.stdout.split("\n"):
|
||||
# Look for diff hunks like "@@ -10,3 +15,4 @@"
|
||||
if line.startswith("@@"):
|
||||
# Extract new line numbers from hunk header
|
||||
match = re.search(r"\+(\d+)(?:,(\d+))?", line)
|
||||
if match:
|
||||
start = int(match.group(1))
|
||||
count = int(match.group(2)) if match.group(2) else 1
|
||||
modified_lines.update(range(start, start + count))
|
||||
|
||||
return modified_lines
|
||||
except subprocess.CalledProcessError:
|
||||
# If git diff fails, check all lines (fallback)
|
||||
return None
|
||||
|
||||
|
||||
def check_snake_case_violations(filepath, check_new_only=True):
|
||||
"""Check for camelCase violations in C++ code."""
|
||||
violations = []
|
||||
|
||||
# Get modified lines if checking new code only
|
||||
modified_lines = get_modified_lines(filepath) if check_new_only else None
|
||||
|
||||
# Patterns that should use snake_case
|
||||
patterns = [
|
||||
# Function definitions and declarations
|
||||
(r"\b([a-z]+[A-Z][a-zA-Z0-9_]*)\s*\(", "function"),
|
||||
# Variable declarations (including member variables)
|
||||
(
|
||||
r"\b(?:auto|int|int64_t|int32_t|size_t|bool|char|float|double|std::\w+)\s+([a-z]+[A-Z][a-zA-Z0-9_]*)\b",
|
||||
"variable",
|
||||
),
|
||||
# Member variable access (obj.camelCase)
|
||||
(r"\.([a-z]+[A-Z][a-zA-Z0-9_]*)\b", "member_access"),
|
||||
# Assignment to camelCase variables
|
||||
(r"\b([a-z]+[A-Z][a-zA-Z0-9_]*)\s*=", "assignment"),
|
||||
]
|
||||
|
||||
# Exclusions - things that should NOT be flagged
|
||||
exclusions = [
|
||||
# C++ standard library and common libraries
|
||||
r"\b(std::|weaseljson|simdutf|doctest)",
|
||||
# Template parameters and concepts
|
||||
r"\b[A-Z][a-zA-Z0-9_]*\b",
|
||||
# Class/struct names (PascalCase is correct)
|
||||
r"\bstruct\s+[A-Z][a-zA-Z0-9_]*",
|
||||
r"\bclass\s+[A-Z][a-zA-Z0-9_]*",
|
||||
# Enum values (PascalCase is correct per style guide)
|
||||
r"\b[A-Z][a-zA-Z0-9_]*::[A-Z][a-zA-Z0-9_]*",
|
||||
# Common HTTP parser callback names (external API)
|
||||
r"\b(onUrl|onHeaderField|onHeaderFieldComplete|onHeaderValue|onHeaderValueComplete|onHeadersComplete|onBody|onMessageComplete)\b",
|
||||
# Known legacy APIs we can't easily change
|
||||
r"\b(user_data|get_arena|append_message)\b",
|
||||
]
|
||||
|
||||
try:
|
||||
with open(filepath, "r", encoding="utf-8") as f:
|
||||
lines = f.readlines()
|
||||
except (IOError, UnicodeDecodeError):
|
||||
return violations
|
||||
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
# Skip if we're only checking modified lines and this isn't one
|
||||
if modified_lines is not None and line_num not in modified_lines:
|
||||
continue
|
||||
|
||||
# Skip comments and strings
|
||||
if re.search(r'^\s*(?://|/\*|\*|")', line.strip()):
|
||||
continue
|
||||
|
||||
# Check if line should be excluded
|
||||
if any(re.search(exclusion, line) for exclusion in exclusions):
|
||||
continue
|
||||
|
||||
# Check for violations
|
||||
for pattern, violation_type in patterns:
|
||||
matches = re.finditer(pattern, line)
|
||||
for match in matches:
|
||||
camel_case_name = match.group(1)
|
||||
# Convert to snake_case suggestion
|
||||
snake_case = re.sub(
|
||||
"([a-z0-9])([A-Z])", r"\1_\2", camel_case_name
|
||||
).lower()
|
||||
|
||||
violations.append(
|
||||
{
|
||||
"file": filepath,
|
||||
"line": line_num,
|
||||
"column": match.start(1) + 1,
|
||||
"type": violation_type,
|
||||
"camelCase": camel_case_name,
|
||||
"snake_case": snake_case,
|
||||
"context": line.strip(),
|
||||
}
|
||||
)
|
||||
|
||||
return violations
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Check snake_case naming in C++ files")
|
||||
parser.add_argument("files", nargs="*", help="Files to check")
|
||||
parser.add_argument(
|
||||
"--check-new-code-only",
|
||||
action="store_true",
|
||||
help="Only check modified lines (git diff)",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
if not args.files:
|
||||
return 0
|
||||
|
||||
total_violations = 0
|
||||
|
||||
for filepath in args.files:
|
||||
path = Path(filepath)
|
||||
if not path.exists() or path.suffix not in [".hpp", ".cpp"]:
|
||||
continue
|
||||
|
||||
violations = check_snake_case_violations(filepath, args.check_new_code_only)
|
||||
|
||||
if violations:
|
||||
print(f"\n❌ {filepath}:")
|
||||
for v in violations:
|
||||
print(
|
||||
f" Line {v['line']}:{v['column']} - {v['type']} '{v['camelCase']}' should be '{v['snake_case']}'"
|
||||
)
|
||||
print(f" Context: {v['context']}")
|
||||
total_violations += len(violations)
|
||||
|
||||
if total_violations > 0:
|
||||
print(f"\n💡 Found {total_violations} naming violations.")
|
||||
print(" New code should use snake_case per style.md")
|
||||
print(" To convert: s/([a-z0-9])([A-Z])/\\1_\\2/g and lowercase")
|
||||
return 1
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -38,6 +38,7 @@ class PersistencePattern(Enum):
|
||||
@dataclass
|
||||
class Transaction:
|
||||
"""Represents a database transaction/commit"""
|
||||
|
||||
txn_id: int
|
||||
arrival_time: float
|
||||
size_bytes: int = 1024
|
||||
@@ -47,6 +48,7 @@ class Transaction:
|
||||
@dataclass
|
||||
class PersistenceMetrics:
|
||||
"""Metrics for a persistence approach"""
|
||||
|
||||
pattern: PersistencePattern
|
||||
total_transactions: int
|
||||
completed_transactions: int
|
||||
@@ -81,9 +83,9 @@ class PersistenceMetrics:
|
||||
class PersistenceSimulator(ABC):
|
||||
"""Abstract base class for persistence pattern simulators"""
|
||||
|
||||
def __init__(self,
|
||||
simulation_duration: float = 60.0,
|
||||
arrival_rate_per_sec: float = 1000.0):
|
||||
def __init__(
|
||||
self, simulation_duration: float = 60.0, arrival_rate_per_sec: float = 1000.0
|
||||
):
|
||||
self.simulation_duration = simulation_duration
|
||||
self.arrival_rate_per_sec = arrival_rate_per_sec
|
||||
|
||||
@@ -115,7 +117,7 @@ class PersistenceSimulator(ABC):
|
||||
txn_id=self.next_txn_id,
|
||||
arrival_time=arrival_time,
|
||||
size_bytes=size,
|
||||
requires_durability=True
|
||||
requires_durability=True,
|
||||
)
|
||||
|
||||
@abstractmethod
|
||||
@@ -138,9 +140,9 @@ class PersistenceSimulator(ABC):
|
||||
time, event_type, data = heapq.heappop(self.event_queue)
|
||||
self.current_time = time
|
||||
|
||||
if event_type == 'transaction_arrival':
|
||||
if event_type == "transaction_arrival":
|
||||
self.process_transaction(data)
|
||||
elif event_type == 'custom':
|
||||
elif event_type == "custom":
|
||||
self._handle_custom_event(data)
|
||||
|
||||
return self._calculate_metrics()
|
||||
@@ -158,7 +160,7 @@ class PersistenceSimulator(ABC):
|
||||
txn = self.generate_transaction(time)
|
||||
self.next_txn_id += 1
|
||||
|
||||
heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn))
|
||||
heapq.heappush(self.event_queue, (time, "transaction_arrival", txn))
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle custom events - override in subclasses"""
|
||||
@@ -173,13 +175,12 @@ class PersistenceSimulator(ABC):
|
||||
if not self.completed_transactions:
|
||||
raise ValueError("No transactions completed")
|
||||
|
||||
latencies = [txn['latency_ms'] for txn in self.completed_transactions]
|
||||
latencies = [txn["latency_ms"] for txn in self.completed_transactions]
|
||||
|
||||
return PersistenceMetrics(
|
||||
pattern=self.get_pattern_name(),
|
||||
total_transactions=self.next_txn_id,
|
||||
completed_transactions=len(self.completed_transactions),
|
||||
|
||||
# Latency metrics
|
||||
min_latency=min(latencies),
|
||||
mean_latency=statistics.mean(latencies),
|
||||
@@ -187,22 +188,22 @@ class PersistenceSimulator(ABC):
|
||||
p95_latency=np.percentile(latencies, 95),
|
||||
p99_latency=np.percentile(latencies, 99),
|
||||
max_latency=max(latencies),
|
||||
|
||||
# Throughput metrics
|
||||
avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration,
|
||||
avg_throughput_tps=len(self.completed_transactions)
|
||||
/ self.simulation_duration,
|
||||
peak_throughput_tps=self._calculate_peak_throughput(),
|
||||
|
||||
# Resource metrics - implemented by subclasses
|
||||
avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0,
|
||||
avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0,
|
||||
avg_network_mbps=(
|
||||
statistics.mean(self.network_usage) if self.network_usage else 0
|
||||
),
|
||||
storage_efficiency=self._calculate_storage_efficiency(),
|
||||
|
||||
# Pattern-specific characteristics
|
||||
durability_guarantee=self._get_durability_guarantee(),
|
||||
consistency_model=self._get_consistency_model(),
|
||||
recovery_time_estimate=self._get_recovery_time(),
|
||||
operational_complexity=self._get_operational_complexity(),
|
||||
infrastructure_cost=self._get_infrastructure_cost()
|
||||
infrastructure_cost=self._get_infrastructure_cost(),
|
||||
)
|
||||
|
||||
def _calculate_peak_throughput(self) -> float:
|
||||
@@ -213,7 +214,7 @@ class PersistenceSimulator(ABC):
|
||||
# Group transactions by second
|
||||
throughput_by_second = defaultdict(int)
|
||||
for txn in self.completed_transactions:
|
||||
second = int(txn['completion_time'])
|
||||
second = int(txn["completion_time"])
|
||||
throughput_by_second[second] += 1
|
||||
|
||||
return max(throughput_by_second.values()) if throughput_by_second else 0
|
||||
@@ -247,11 +248,13 @@ class PersistenceSimulator(ABC):
|
||||
class WeaselDBSimulator(PersistenceSimulator):
|
||||
"""WeaselDB's batched S3 persistence simulation"""
|
||||
|
||||
def __init__(self,
|
||||
batch_timeout_ms: float = 1.0,
|
||||
batch_size_threshold: int = 800000,
|
||||
max_in_flight: int = 50,
|
||||
**kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
batch_timeout_ms: float = 1.0,
|
||||
batch_size_threshold: int = 800000,
|
||||
max_in_flight: int = 50,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.batch_timeout_ms = batch_timeout_ms
|
||||
@@ -280,7 +283,10 @@ class WeaselDBSimulator(PersistenceSimulator):
|
||||
self.current_batch.append(txn)
|
||||
|
||||
# Check if we should send batch
|
||||
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
|
||||
if (
|
||||
self._should_send_batch()
|
||||
and len(self.in_flight_batches) < self.max_in_flight
|
||||
):
|
||||
self._send_current_batch()
|
||||
|
||||
def _should_send_batch(self) -> bool:
|
||||
@@ -294,7 +300,9 @@ class WeaselDBSimulator(PersistenceSimulator):
|
||||
return True
|
||||
|
||||
# Time trigger
|
||||
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
|
||||
self.batch_timeout_ms / 1000.0
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -315,14 +323,16 @@ class WeaselDBSimulator(PersistenceSimulator):
|
||||
|
||||
# Track batch
|
||||
self.in_flight_batches[batch_id] = {
|
||||
'transactions': self.current_batch.copy(),
|
||||
'sent_time': self.current_time,
|
||||
'completion_time': completion_time,
|
||||
'size_bytes': batch_size
|
||||
"transactions": self.current_batch.copy(),
|
||||
"sent_time": self.current_time,
|
||||
"completion_time": completion_time,
|
||||
"size_bytes": batch_size,
|
||||
}
|
||||
|
||||
# Schedule completion
|
||||
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
|
||||
self.schedule_event(
|
||||
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
|
||||
)
|
||||
|
||||
# Track network usage
|
||||
self.network_usage.append(batch_size / (1024 * 1024)) # MB
|
||||
@@ -340,21 +350,23 @@ class WeaselDBSimulator(PersistenceSimulator):
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle batch completion events"""
|
||||
if data['type'] == 'batch_complete':
|
||||
batch_id = data['batch_id']
|
||||
if data["type"] == "batch_complete":
|
||||
batch_id = data["batch_id"]
|
||||
if batch_id in self.in_flight_batches:
|
||||
batch = self.in_flight_batches.pop(batch_id)
|
||||
|
||||
# Mark transactions complete
|
||||
for txn in batch['transactions']:
|
||||
for txn in batch["transactions"]:
|
||||
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||
self.completed_transactions.append({
|
||||
'txn_id': txn.txn_id,
|
||||
'arrival_time': txn.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency_ms': latency_ms,
|
||||
'size_bytes': txn.size_bytes
|
||||
})
|
||||
self.completed_transactions.append(
|
||||
{
|
||||
"txn_id": txn.txn_id,
|
||||
"arrival_time": txn.arrival_time,
|
||||
"completion_time": self.current_time,
|
||||
"latency_ms": latency_ms,
|
||||
"size_bytes": txn.size_bytes,
|
||||
}
|
||||
)
|
||||
|
||||
def _calculate_storage_efficiency(self) -> float:
|
||||
return 1.0 # S3 has no storage overhead
|
||||
@@ -378,10 +390,12 @@ class WeaselDBSimulator(PersistenceSimulator):
|
||||
class TraditionalWALSimulator(PersistenceSimulator):
|
||||
"""Traditional Write-Ahead Log with periodic sync"""
|
||||
|
||||
def __init__(self,
|
||||
wal_sync_interval_ms: float = 10.0,
|
||||
checkpoint_interval_sec: float = 30.0,
|
||||
**kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
wal_sync_interval_ms: float = 10.0,
|
||||
checkpoint_interval_sec: float = 30.0,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.wal_sync_interval_ms = wal_sync_interval_ms
|
||||
@@ -415,12 +429,12 @@ class TraditionalWALSimulator(PersistenceSimulator):
|
||||
"""Schedule periodic WAL sync events"""
|
||||
sync_time = self.wal_sync_interval_ms / 1000.0
|
||||
while sync_time < self.simulation_duration:
|
||||
self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'})
|
||||
self.schedule_event(sync_time, "custom", {"type": "wal_sync"})
|
||||
sync_time += self.wal_sync_interval_ms / 1000.0
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle WAL sync events"""
|
||||
if data['type'] == 'wal_sync':
|
||||
if data["type"] == "wal_sync":
|
||||
self._perform_wal_sync()
|
||||
|
||||
def _perform_wal_sync(self):
|
||||
@@ -436,10 +450,11 @@ class TraditionalWALSimulator(PersistenceSimulator):
|
||||
|
||||
# Schedule sync completion
|
||||
syncing_txns = list(self.wal_buffer)
|
||||
self.schedule_event(completion_time, 'custom', {
|
||||
'type': 'sync_complete',
|
||||
'transactions': syncing_txns
|
||||
})
|
||||
self.schedule_event(
|
||||
completion_time,
|
||||
"custom",
|
||||
{"type": "sync_complete", "transactions": syncing_txns},
|
||||
)
|
||||
|
||||
# Track IOPS for sync operation
|
||||
self.disk_iops.append(len(self.wal_buffer))
|
||||
@@ -474,22 +489,24 @@ class TraditionalWALSimulator(PersistenceSimulator):
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle sync completion"""
|
||||
if data['type'] == 'wal_sync':
|
||||
if data["type"] == "wal_sync":
|
||||
self._perform_wal_sync()
|
||||
elif data['type'] == 'sync_complete':
|
||||
elif data["type"] == "sync_complete":
|
||||
# Mark transactions as durable
|
||||
for txn in data['transactions']:
|
||||
for txn in data["transactions"]:
|
||||
if txn.txn_id in self.pending_transactions:
|
||||
del self.pending_transactions[txn.txn_id]
|
||||
|
||||
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||
self.completed_transactions.append({
|
||||
'txn_id': txn.txn_id,
|
||||
'arrival_time': txn.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency_ms': latency_ms,
|
||||
'size_bytes': txn.size_bytes
|
||||
})
|
||||
self.completed_transactions.append(
|
||||
{
|
||||
"txn_id": txn.txn_id,
|
||||
"arrival_time": txn.arrival_time,
|
||||
"completion_time": self.current_time,
|
||||
"latency_ms": latency_ms,
|
||||
"size_bytes": txn.size_bytes,
|
||||
}
|
||||
)
|
||||
|
||||
def _calculate_storage_efficiency(self) -> float:
|
||||
return 2.0 # WAL + main storage
|
||||
@@ -527,10 +544,11 @@ class SynchronousSimulator(PersistenceSimulator):
|
||||
completion_time = self.current_time + disk_latency
|
||||
|
||||
# Schedule completion
|
||||
self.schedule_event(completion_time, 'custom', {
|
||||
'type': 'disk_write_complete',
|
||||
'transaction': txn
|
||||
})
|
||||
self.schedule_event(
|
||||
completion_time,
|
||||
"custom",
|
||||
{"type": "disk_write_complete", "transaction": txn},
|
||||
)
|
||||
|
||||
# Track disk IOPS
|
||||
self.disk_iops.append(1.0)
|
||||
@@ -559,17 +577,19 @@ class SynchronousSimulator(PersistenceSimulator):
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle disk write completion"""
|
||||
if data['type'] == 'disk_write_complete':
|
||||
txn = data['transaction']
|
||||
if data["type"] == "disk_write_complete":
|
||||
txn = data["transaction"]
|
||||
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||
|
||||
self.completed_transactions.append({
|
||||
'txn_id': txn.txn_id,
|
||||
'arrival_time': txn.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency_ms': latency_ms,
|
||||
'size_bytes': txn.size_bytes
|
||||
})
|
||||
self.completed_transactions.append(
|
||||
{
|
||||
"txn_id": txn.txn_id,
|
||||
"arrival_time": txn.arrival_time,
|
||||
"completion_time": self.current_time,
|
||||
"latency_ms": latency_ms,
|
||||
"size_bytes": txn.size_bytes,
|
||||
}
|
||||
)
|
||||
|
||||
def _calculate_storage_efficiency(self) -> float:
|
||||
return 1.5 # Some overhead for metadata
|
||||
@@ -593,11 +613,13 @@ class SynchronousSimulator(PersistenceSimulator):
|
||||
class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
"""WeaselDB's batched disk persistence simulation"""
|
||||
|
||||
def __init__(self,
|
||||
batch_timeout_ms: float = 1.0,
|
||||
batch_size_threshold: int = 800000,
|
||||
max_in_flight: int = 50,
|
||||
**kwargs):
|
||||
def __init__(
|
||||
self,
|
||||
batch_timeout_ms: float = 1.0,
|
||||
batch_size_threshold: int = 800000,
|
||||
max_in_flight: int = 50,
|
||||
**kwargs,
|
||||
):
|
||||
super().__init__(**kwargs)
|
||||
|
||||
self.batch_timeout_ms = batch_timeout_ms
|
||||
@@ -626,7 +648,10 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
self.current_batch.append(txn)
|
||||
|
||||
# Check if we should send batch
|
||||
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
|
||||
if (
|
||||
self._should_send_batch()
|
||||
and len(self.in_flight_batches) < self.max_in_flight
|
||||
):
|
||||
self._send_current_batch()
|
||||
|
||||
def _should_send_batch(self) -> bool:
|
||||
@@ -640,7 +665,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
return True
|
||||
|
||||
# Time trigger
|
||||
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
|
||||
self.batch_timeout_ms / 1000.0
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -661,14 +688,16 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
|
||||
# Track batch
|
||||
self.in_flight_batches[batch_id] = {
|
||||
'transactions': self.current_batch.copy(),
|
||||
'sent_time': self.current_time,
|
||||
'completion_time': completion_time,
|
||||
'size_bytes': batch_size
|
||||
"transactions": self.current_batch.copy(),
|
||||
"sent_time": self.current_time,
|
||||
"completion_time": completion_time,
|
||||
"size_bytes": batch_size,
|
||||
}
|
||||
|
||||
# Schedule completion
|
||||
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
|
||||
self.schedule_event(
|
||||
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
|
||||
)
|
||||
|
||||
# Track disk IOPS (one write operation per batch)
|
||||
self.disk_iops.append(1.0)
|
||||
@@ -684,7 +713,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
|
||||
# Throughput-based latency for data transfer to page cache
|
||||
size_mb = batch_size_bytes / (1024 * 1024)
|
||||
transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms
|
||||
transfer_latency = (
|
||||
size_mb / self.disk_throughput_mbps
|
||||
) * 1000.0 # Convert to ms
|
||||
|
||||
# FSYNC latency for EBS - forces write to replicated storage
|
||||
# EBS has higher fsync latency due to network replication
|
||||
@@ -709,21 +740,23 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||
|
||||
def _handle_custom_event(self, data):
|
||||
"""Handle batch completion events"""
|
||||
if data['type'] == 'batch_complete':
|
||||
batch_id = data['batch_id']
|
||||
if data["type"] == "batch_complete":
|
||||
batch_id = data["batch_id"]
|
||||
if batch_id in self.in_flight_batches:
|
||||
batch = self.in_flight_batches.pop(batch_id)
|
||||
|
||||
# Mark transactions complete
|
||||
for txn in batch['transactions']:
|
||||
for txn in batch["transactions"]:
|
||||
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||
self.completed_transactions.append({
|
||||
'txn_id': txn.txn_id,
|
||||
'arrival_time': txn.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency_ms': latency_ms,
|
||||
'size_bytes': txn.size_bytes
|
||||
})
|
||||
self.completed_transactions.append(
|
||||
{
|
||||
"txn_id": txn.txn_id,
|
||||
"arrival_time": txn.arrival_time,
|
||||
"completion_time": self.current_time,
|
||||
"latency_ms": latency_ms,
|
||||
"size_bytes": txn.size_bytes,
|
||||
}
|
||||
)
|
||||
|
||||
def _calculate_storage_efficiency(self) -> float:
|
||||
return 1.2 # Some overhead for batching metadata
|
||||
@@ -750,8 +783,9 @@ class DatabaseComparisonFramework:
|
||||
def __init__(self, simulation_duration: float = 30.0):
|
||||
self.simulation_duration = simulation_duration
|
||||
|
||||
def run_comparison(self,
|
||||
arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]:
|
||||
def run_comparison(
|
||||
self, arrival_rates: List[float] = [100, 500, 1000, 2000]
|
||||
) -> Dict[str, List[PersistenceMetrics]]:
|
||||
"""Run comparison across multiple arrival rates"""
|
||||
|
||||
results = defaultdict(list)
|
||||
@@ -765,10 +799,10 @@ class DatabaseComparisonFramework:
|
||||
batch_size_threshold=800000,
|
||||
max_in_flight=50,
|
||||
simulation_duration=self.simulation_duration,
|
||||
arrival_rate_per_sec=rate
|
||||
arrival_rate_per_sec=rate,
|
||||
)
|
||||
weasel_s3_metrics = weasel_s3.run_simulation()
|
||||
results['WeaselDB S3'].append(weasel_s3_metrics)
|
||||
results["WeaselDB S3"].append(weasel_s3_metrics)
|
||||
print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms")
|
||||
|
||||
# WeaselDB EBS (same batching, EBS storage)
|
||||
@@ -777,38 +811,37 @@ class DatabaseComparisonFramework:
|
||||
batch_size_threshold=800000,
|
||||
max_in_flight=50,
|
||||
simulation_duration=self.simulation_duration,
|
||||
arrival_rate_per_sec=rate
|
||||
arrival_rate_per_sec=rate,
|
||||
)
|
||||
weasel_ebs_metrics = weasel_ebs.run_simulation()
|
||||
results['WeaselDB EBS'].append(weasel_ebs_metrics)
|
||||
results["WeaselDB EBS"].append(weasel_ebs_metrics)
|
||||
print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms")
|
||||
|
||||
# Traditional WAL
|
||||
wal = TraditionalWALSimulator(
|
||||
wal_sync_interval_ms=10.0,
|
||||
simulation_duration=self.simulation_duration,
|
||||
arrival_rate_per_sec=rate
|
||||
arrival_rate_per_sec=rate,
|
||||
)
|
||||
wal_metrics = wal.run_simulation()
|
||||
results['Traditional WAL'].append(wal_metrics)
|
||||
results["Traditional WAL"].append(wal_metrics)
|
||||
print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms")
|
||||
|
||||
# Synchronous
|
||||
sync = SynchronousSimulator(
|
||||
simulation_duration=self.simulation_duration,
|
||||
arrival_rate_per_sec=rate
|
||||
simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate
|
||||
)
|
||||
sync_metrics = sync.run_simulation()
|
||||
results['Synchronous'].append(sync_metrics)
|
||||
results["Synchronous"].append(sync_metrics)
|
||||
print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms")
|
||||
|
||||
return dict(results)
|
||||
|
||||
def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]):
|
||||
"""Print comprehensive comparison report"""
|
||||
print("\n" + "="*80)
|
||||
print("\n" + "=" * 80)
|
||||
print("DATABASE PERSISTENCE PATTERN COMPARISON")
|
||||
print("="*80)
|
||||
print("=" * 80)
|
||||
|
||||
# Get arrival rates for headers
|
||||
arrival_rates = [100, 500, 1000, 2000]
|
||||
@@ -843,14 +876,18 @@ class DatabaseComparisonFramework:
|
||||
|
||||
# Characteristics comparison
|
||||
print(f"\nSYSTEM CHARACTERISTICS")
|
||||
print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}")
|
||||
print(
|
||||
f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}"
|
||||
)
|
||||
print("-" * 85)
|
||||
|
||||
for pattern_name, metrics_list in results.items():
|
||||
metrics = metrics_list[0] # Use first metrics for characteristics
|
||||
print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
|
||||
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
|
||||
f"{metrics.infrastructure_cost:<6.1f}")
|
||||
print(
|
||||
f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
|
||||
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
|
||||
f"{metrics.infrastructure_cost:<6.1f}"
|
||||
)
|
||||
|
||||
# Performance sweet spots
|
||||
print(f"\nPERFORMANCE SWEET SPOTS")
|
||||
@@ -858,49 +895,76 @@ class DatabaseComparisonFramework:
|
||||
|
||||
for rate in arrival_rates:
|
||||
print(f"\nAt {rate} TPS:")
|
||||
rate_results = [(name, metrics_list[arrival_rates.index(rate)])
|
||||
for name, metrics_list in results.items()]
|
||||
rate_results = [
|
||||
(name, metrics_list[arrival_rates.index(rate)])
|
||||
for name, metrics_list in results.items()
|
||||
]
|
||||
|
||||
# Sort by P95 latency
|
||||
rate_results.sort(key=lambda x: x[1].p95_latency)
|
||||
|
||||
for i, (name, metrics) in enumerate(rate_results):
|
||||
rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
|
||||
print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
|
||||
f"{metrics.avg_throughput_tps:.0f} TPS achieved")
|
||||
rank_symbol = (
|
||||
"🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
|
||||
)
|
||||
print(
|
||||
f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
|
||||
f"{metrics.avg_throughput_tps:.0f} TPS achieved"
|
||||
)
|
||||
|
||||
def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]],
|
||||
save_path: Optional[str] = None):
|
||||
def plot_comparison_results(
|
||||
self,
|
||||
results: Dict[str, List[PersistenceMetrics]],
|
||||
save_path: Optional[str] = None,
|
||||
):
|
||||
"""Plot comparison results"""
|
||||
try:
|
||||
arrival_rates = [100, 500, 1000, 2000]
|
||||
|
||||
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||||
fig.suptitle('Database Persistence Pattern Comparison', fontsize=16)
|
||||
fig.suptitle("Database Persistence Pattern Comparison", fontsize=16)
|
||||
|
||||
# Plot 1: P95 Latency vs Load
|
||||
for pattern_name, metrics_list in results.items():
|
||||
p95_latencies = [m.p95_latency for m in metrics_list]
|
||||
ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name)
|
||||
ax1.plot(
|
||||
arrival_rates,
|
||||
p95_latencies,
|
||||
marker="o",
|
||||
linewidth=2,
|
||||
label=pattern_name,
|
||||
)
|
||||
|
||||
ax1.set_xlabel('Arrival Rate (TPS)')
|
||||
ax1.set_ylabel('P95 Latency (ms)')
|
||||
ax1.set_title('P95 Latency vs Load')
|
||||
ax1.set_xlabel("Arrival Rate (TPS)")
|
||||
ax1.set_ylabel("P95 Latency (ms)")
|
||||
ax1.set_title("P95 Latency vs Load")
|
||||
ax1.legend()
|
||||
ax1.grid(True, alpha=0.3)
|
||||
ax1.set_yscale('log')
|
||||
ax1.set_yscale("log")
|
||||
|
||||
# Plot 2: Throughput Achieved
|
||||
for pattern_name, metrics_list in results.items():
|
||||
throughputs = [m.avg_throughput_tps for m in metrics_list]
|
||||
ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name)
|
||||
ax2.plot(
|
||||
arrival_rates,
|
||||
throughputs,
|
||||
marker="s",
|
||||
linewidth=2,
|
||||
label=pattern_name,
|
||||
)
|
||||
|
||||
# Perfect throughput line
|
||||
ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)')
|
||||
ax2.plot(
|
||||
arrival_rates,
|
||||
arrival_rates,
|
||||
"k--",
|
||||
alpha=0.5,
|
||||
label="Perfect (no loss)",
|
||||
)
|
||||
|
||||
ax2.set_xlabel('Target Rate (TPS)')
|
||||
ax2.set_ylabel('Achieved Throughput (TPS)')
|
||||
ax2.set_title('Throughput: Target vs Achieved')
|
||||
ax2.set_xlabel("Target Rate (TPS)")
|
||||
ax2.set_ylabel("Achieved Throughput (TPS)")
|
||||
ax2.set_title("Throughput: Target vs Achieved")
|
||||
ax2.legend()
|
||||
ax2.grid(True, alpha=0.3)
|
||||
|
||||
@@ -910,13 +974,21 @@ class DatabaseComparisonFramework:
|
||||
metrics = metrics_list[rate_idx]
|
||||
# Plot latency percentiles
|
||||
percentiles = [50, 95, 99]
|
||||
values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency]
|
||||
ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values,
|
||||
alpha=0.7, label=pattern_name)
|
||||
values = [
|
||||
metrics.median_latency,
|
||||
metrics.p95_latency,
|
||||
metrics.p99_latency,
|
||||
]
|
||||
ax3.bar(
|
||||
[f"{pattern_name}\nP{p}" for p in percentiles],
|
||||
values,
|
||||
alpha=0.7,
|
||||
label=pattern_name,
|
||||
)
|
||||
|
||||
ax3.set_ylabel('Latency (ms)')
|
||||
ax3.set_title('Latency Percentiles at 1000 TPS')
|
||||
ax3.set_yscale('log')
|
||||
ax3.set_ylabel("Latency (ms)")
|
||||
ax3.set_title("Latency Percentiles at 1000 TPS")
|
||||
ax3.set_yscale("log")
|
||||
ax3.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 4: Cost vs Performance
|
||||
@@ -925,22 +997,24 @@ class DatabaseComparisonFramework:
|
||||
p95s = [m.p95_latency for m in metrics_list]
|
||||
|
||||
# Use different markers for different patterns
|
||||
markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'}
|
||||
marker = markers.get(pattern_name, 'o')
|
||||
markers = {"WeaselDB": "o", "Traditional WAL": "s", "Synchronous": "^"}
|
||||
marker = markers.get(pattern_name, "o")
|
||||
|
||||
ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name)
|
||||
ax4.scatter(
|
||||
costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name
|
||||
)
|
||||
|
||||
ax4.set_xlabel('Infrastructure Cost (relative)')
|
||||
ax4.set_ylabel('P95 Latency (ms)')
|
||||
ax4.set_title('Cost vs Performance Trade-off')
|
||||
ax4.set_xlabel("Infrastructure Cost (relative)")
|
||||
ax4.set_ylabel("P95 Latency (ms)")
|
||||
ax4.set_title("Cost vs Performance Trade-off")
|
||||
ax4.legend()
|
||||
ax4.grid(True, alpha=0.3)
|
||||
ax4.set_yscale('log')
|
||||
ax4.set_yscale("log")
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
if save_path:
|
||||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||
plt.savefig(save_path, dpi=300, bbox_inches="tight")
|
||||
print(f"Comparison plots saved to {save_path}")
|
||||
else:
|
||||
plt.show()
|
||||
@@ -961,10 +1035,10 @@ def main():
|
||||
comparison.print_comparison_report(results)
|
||||
|
||||
try:
|
||||
comparison.plot_comparison_results(results, 'database_comparison.png')
|
||||
comparison.plot_comparison_results(results, "database_comparison.png")
|
||||
except Exception as e:
|
||||
print(f"Could not generate plots: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
@@ -25,6 +25,7 @@ try:
|
||||
from skopt.utils import use_named_args
|
||||
from skopt.plots import plot_convergence, plot_objective
|
||||
import matplotlib.pyplot as plt
|
||||
|
||||
OPTIMIZE_AVAILABLE = True
|
||||
except ImportError:
|
||||
print("scikit-optimize not available. Install with: pip install scikit-optimize")
|
||||
@@ -40,12 +41,14 @@ class PersistenceOptimizer:
|
||||
by intelligently exploring the parameter space using Gaussian Process models.
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
optimization_budget: int = 50,
|
||||
simulation_duration: float = 20.0,
|
||||
arrival_rate: float = 1000.0,
|
||||
objective_metric: str = "p95_latency",
|
||||
random_seed: int = 42):
|
||||
def __init__(
|
||||
self,
|
||||
optimization_budget: int = 50,
|
||||
simulation_duration: float = 20.0,
|
||||
arrival_rate: float = 1000.0,
|
||||
objective_metric: str = "p95_latency",
|
||||
random_seed: int = 42,
|
||||
):
|
||||
|
||||
self.optimization_budget = optimization_budget
|
||||
self.simulation_duration = simulation_duration
|
||||
@@ -56,7 +59,7 @@ class PersistenceOptimizer:
|
||||
# Track optimization history
|
||||
self.optimization_history = []
|
||||
self.best_params = None
|
||||
self.best_score = float('inf')
|
||||
self.best_score = float("inf")
|
||||
|
||||
# Define parameter search space
|
||||
self.parameter_space = self._define_search_space()
|
||||
@@ -71,31 +74,35 @@ class PersistenceOptimizer:
|
||||
"""
|
||||
return [
|
||||
# Core batching parameters
|
||||
Real(1.0, 50.0, name='batch_timeout_ms',
|
||||
prior='log-uniform'), # Log scale since small changes matter
|
||||
Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB
|
||||
prior='log-uniform'),
|
||||
|
||||
Real(
|
||||
1.0, 50.0, name="batch_timeout_ms", prior="log-uniform"
|
||||
), # Log scale since small changes matter
|
||||
Integer(
|
||||
64 * 1024,
|
||||
4 * 1024 * 1024,
|
||||
name="batch_size_threshold", # 64KB - 4MB
|
||||
prior="log-uniform",
|
||||
),
|
||||
# Flow control parameters - likely the most impactful
|
||||
Integer(1, 50, name='max_in_flight_requests'),
|
||||
Integer(1, 50, name="max_in_flight_requests"),
|
||||
]
|
||||
|
||||
def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict:
|
||||
"""Run simulation with given parameters and return results"""
|
||||
try:
|
||||
sim = PersistenceSimulation(
|
||||
batch_timeout_ms=params['batch_timeout_ms'],
|
||||
batch_size_threshold=int(params['batch_size_threshold']),
|
||||
max_in_flight_requests=int(params['max_in_flight_requests']),
|
||||
batch_timeout_ms=params["batch_timeout_ms"],
|
||||
batch_size_threshold=int(params["batch_size_threshold"]),
|
||||
max_in_flight_requests=int(params["max_in_flight_requests"]),
|
||||
# Retry parameters fixed since S3 is 100% reliable
|
||||
max_retry_attempts=0, # No retries needed
|
||||
retry_base_delay_ms=100.0, # Irrelevant but needs a value
|
||||
max_retry_attempts=0, # No retries needed
|
||||
retry_base_delay_ms=100.0, # Irrelevant but needs a value
|
||||
# S3 parameters kept fixed - 100% reliable for optimization focus
|
||||
s3_latency_shape=2.0, # Fixed Gamma shape
|
||||
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
|
||||
s3_failure_rate=0.0, # 100% reliable S3
|
||||
s3_latency_shape=2.0, # Fixed Gamma shape
|
||||
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
|
||||
s3_failure_rate=0.0, # 100% reliable S3
|
||||
arrival_rate_per_sec=self.arrival_rate,
|
||||
simulation_duration_sec=self.simulation_duration
|
||||
simulation_duration_sec=self.simulation_duration,
|
||||
)
|
||||
|
||||
return sim.run_simulation()
|
||||
@@ -104,34 +111,32 @@ class PersistenceOptimizer:
|
||||
print(f"Simulation failed with params {params}: {e}")
|
||||
# Return a high penalty score for failed simulations
|
||||
return {
|
||||
'commit_metrics': {
|
||||
'latency_ms': {
|
||||
'mean': 10000,
|
||||
'p95': 10000,
|
||||
'p99': 10000
|
||||
}
|
||||
"commit_metrics": {
|
||||
"latency_ms": {"mean": 10000, "p95": 10000, "p99": 10000}
|
||||
},
|
||||
'error': str(e)
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
def _extract_objective_value(self, results: Dict) -> float:
|
||||
"""Extract the objective value to minimize from simulation results"""
|
||||
try:
|
||||
commit_metrics = results['commit_metrics']['latency_ms']
|
||||
commit_metrics = results["commit_metrics"]["latency_ms"]
|
||||
|
||||
if self.objective_metric == "mean_latency":
|
||||
return commit_metrics['mean']
|
||||
return commit_metrics["mean"]
|
||||
elif self.objective_metric == "p95_latency":
|
||||
return commit_metrics['p95']
|
||||
return commit_metrics["p95"]
|
||||
elif self.objective_metric == "p99_latency":
|
||||
return commit_metrics['p99']
|
||||
return commit_metrics["p99"]
|
||||
elif self.objective_metric == "weighted_latency":
|
||||
# Weighted combination emphasizing tail latencies
|
||||
return (0.3 * commit_metrics['mean'] +
|
||||
0.5 * commit_metrics['p95'] +
|
||||
0.2 * commit_metrics['p99'])
|
||||
return (
|
||||
0.3 * commit_metrics["mean"]
|
||||
+ 0.5 * commit_metrics["p95"]
|
||||
+ 0.2 * commit_metrics["p99"]
|
||||
)
|
||||
else:
|
||||
return commit_metrics['p95'] # Default to P95
|
||||
return commit_metrics["p95"] # Default to P95
|
||||
|
||||
except KeyError as e:
|
||||
print(f"Failed to extract objective from results: {e}")
|
||||
@@ -147,7 +152,9 @@ class PersistenceOptimizer:
|
||||
if not OPTIMIZE_AVAILABLE:
|
||||
return self.optimize_with_grid_search()
|
||||
|
||||
print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations")
|
||||
print(
|
||||
f"Starting Bayesian Optimization with {self.optimization_budget} evaluations"
|
||||
)
|
||||
print(f"Objective: Minimize {self.objective_metric}")
|
||||
print(f"Parameter space: {len(self.parameter_space)} dimensions")
|
||||
print()
|
||||
@@ -165,11 +172,11 @@ class PersistenceOptimizer:
|
||||
|
||||
# Track optimization history
|
||||
history_entry = {
|
||||
'params': params.copy(),
|
||||
'objective_value': objective_value,
|
||||
'results': results,
|
||||
'eval_time': eval_time,
|
||||
'iteration': len(self.optimization_history) + 1
|
||||
"params": params.copy(),
|
||||
"objective_value": objective_value,
|
||||
"results": results,
|
||||
"eval_time": eval_time,
|
||||
"iteration": len(self.optimization_history) + 1,
|
||||
}
|
||||
self.optimization_history.append(history_entry)
|
||||
|
||||
@@ -177,7 +184,9 @@ class PersistenceOptimizer:
|
||||
if objective_value < self.best_score:
|
||||
self.best_score = objective_value
|
||||
self.best_params = params.copy()
|
||||
print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})")
|
||||
print(
|
||||
f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})"
|
||||
)
|
||||
else:
|
||||
print(f" Score: {objective_value:.2f}ms")
|
||||
|
||||
@@ -192,8 +201,8 @@ class PersistenceOptimizer:
|
||||
dimensions=self.parameter_space,
|
||||
n_calls=self.optimization_budget,
|
||||
n_initial_points=10, # Random exploration first
|
||||
acq_func='EI', # Expected Improvement acquisition
|
||||
random_state=self.random_seed
|
||||
acq_func="EI", # Expected Improvement acquisition
|
||||
random_state=self.random_seed,
|
||||
)
|
||||
|
||||
# Extract best parameters
|
||||
@@ -205,32 +214,34 @@ class PersistenceOptimizer:
|
||||
|
||||
def optimize_with_grid_search(self) -> Tuple[Dict, float]:
|
||||
"""Fallback grid search optimization if scikit-optimize not available"""
|
||||
print("Using grid search optimization (install scikit-optimize for better results)")
|
||||
print(
|
||||
"Using grid search optimization (install scikit-optimize for better results)"
|
||||
)
|
||||
print()
|
||||
|
||||
# Define a smaller grid for key parameters
|
||||
grid_configs = [
|
||||
# Vary max_in_flight and batch_timeout
|
||||
{'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0},
|
||||
{'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0},
|
||||
{'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0},
|
||||
{'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0},
|
||||
{'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0},
|
||||
{'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0},
|
||||
{'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0},
|
||||
{"max_in_flight_requests": 5, "batch_timeout_ms": 5.0},
|
||||
{"max_in_flight_requests": 10, "batch_timeout_ms": 5.0},
|
||||
{"max_in_flight_requests": 20, "batch_timeout_ms": 5.0},
|
||||
{"max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
|
||||
{"max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
|
||||
{"max_in_flight_requests": 15, "batch_timeout_ms": 3.0},
|
||||
{"max_in_flight_requests": 25, "batch_timeout_ms": 7.0},
|
||||
]
|
||||
|
||||
best_params = None
|
||||
best_score = float('inf')
|
||||
best_score = float("inf")
|
||||
|
||||
for i, config in enumerate(grid_configs):
|
||||
print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}")
|
||||
|
||||
# Use default values for unspecified parameters
|
||||
full_params = {
|
||||
'batch_timeout_ms': 5.0,
|
||||
'batch_size_threshold': 1024 * 1024,
|
||||
'max_in_flight_requests': 5
|
||||
"batch_timeout_ms": 5.0,
|
||||
"batch_size_threshold": 1024 * 1024,
|
||||
"max_in_flight_requests": 5,
|
||||
}
|
||||
full_params.update(config)
|
||||
|
||||
@@ -261,8 +272,8 @@ class PersistenceOptimizer:
|
||||
objectives = []
|
||||
|
||||
for entry in self.optimization_history:
|
||||
objectives.append(entry['objective_value'])
|
||||
for param_name, param_value in entry['params'].items():
|
||||
objectives.append(entry["objective_value"])
|
||||
for param_name, param_value in entry["params"].items():
|
||||
if param_name not in param_data:
|
||||
param_data[param_name] = []
|
||||
param_data[param_name].append(param_value)
|
||||
@@ -289,12 +300,12 @@ class PersistenceOptimizer:
|
||||
if not OPTIMIZE_AVAILABLE or not self.optimization_history:
|
||||
return
|
||||
|
||||
iterations = [entry['iteration'] for entry in self.optimization_history]
|
||||
objectives = [entry['objective_value'] for entry in self.optimization_history]
|
||||
iterations = [entry["iteration"] for entry in self.optimization_history]
|
||||
objectives = [entry["objective_value"] for entry in self.optimization_history]
|
||||
|
||||
# Calculate running minimum (best so far)
|
||||
running_min = []
|
||||
current_min = float('inf')
|
||||
current_min = float("inf")
|
||||
for obj in objectives:
|
||||
current_min = min(current_min, obj)
|
||||
running_min.append(current_min)
|
||||
@@ -304,34 +315,38 @@ class PersistenceOptimizer:
|
||||
# Plot 1: Objective value over iterations
|
||||
plt.subplot(2, 2, 1)
|
||||
plt.scatter(iterations, objectives, alpha=0.6, s=30)
|
||||
plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far')
|
||||
plt.xlabel('Iteration')
|
||||
plt.ylabel(f'{self.objective_metric} (ms)')
|
||||
plt.title('Optimization Progress')
|
||||
plt.plot(iterations, running_min, "r-", linewidth=2, label="Best so far")
|
||||
plt.xlabel("Iteration")
|
||||
plt.ylabel(f"{self.objective_metric} (ms)")
|
||||
plt.title("Optimization Progress")
|
||||
plt.legend()
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 2: Parameter evolution for key parameters
|
||||
plt.subplot(2, 2, 2)
|
||||
key_params = ['max_in_flight_requests', 'batch_timeout_ms']
|
||||
key_params = ["max_in_flight_requests", "batch_timeout_ms"]
|
||||
for param in key_params:
|
||||
if param in self.optimization_history[0]['params']:
|
||||
values = [entry['params'][param] for entry in self.optimization_history]
|
||||
if param in self.optimization_history[0]["params"]:
|
||||
values = [entry["params"][param] for entry in self.optimization_history]
|
||||
plt.scatter(iterations, values, alpha=0.6, label=param, s=30)
|
||||
plt.xlabel('Iteration')
|
||||
plt.ylabel('Parameter Value')
|
||||
plt.title('Key Parameter Evolution')
|
||||
plt.xlabel("Iteration")
|
||||
plt.ylabel("Parameter Value")
|
||||
plt.title("Key Parameter Evolution")
|
||||
plt.legend()
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
# Plot 3: Objective distribution
|
||||
plt.subplot(2, 2, 3)
|
||||
plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black')
|
||||
plt.axvline(self.best_score, color='red', linestyle='--',
|
||||
label=f'Best: {self.best_score:.1f}ms')
|
||||
plt.xlabel(f'{self.objective_metric} (ms)')
|
||||
plt.ylabel('Count')
|
||||
plt.title('Objective Value Distribution')
|
||||
plt.hist(objectives, bins=20, alpha=0.7, edgecolor="black")
|
||||
plt.axvline(
|
||||
self.best_score,
|
||||
color="red",
|
||||
linestyle="--",
|
||||
label=f"Best: {self.best_score:.1f}ms",
|
||||
)
|
||||
plt.xlabel(f"{self.objective_metric} (ms)")
|
||||
plt.ylabel("Count")
|
||||
plt.title("Objective Value Distribution")
|
||||
plt.legend()
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
@@ -342,21 +357,21 @@ class PersistenceOptimizer:
|
||||
if i == 0:
|
||||
improvements.append(0)
|
||||
else:
|
||||
prev_best = running_min[i-1]
|
||||
prev_best = running_min[i - 1]
|
||||
curr_best = running_min[i]
|
||||
improvement = prev_best - curr_best
|
||||
improvements.append(improvement)
|
||||
|
||||
plt.plot(iterations, improvements, 'g-', marker='o', markersize=3)
|
||||
plt.xlabel('Iteration')
|
||||
plt.ylabel('Improvement (ms)')
|
||||
plt.title('Per-Iteration Improvement')
|
||||
plt.plot(iterations, improvements, "g-", marker="o", markersize=3)
|
||||
plt.xlabel("Iteration")
|
||||
plt.ylabel("Improvement (ms)")
|
||||
plt.title("Per-Iteration Improvement")
|
||||
plt.grid(True, alpha=0.3)
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
if save_path:
|
||||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||
plt.savefig(save_path, dpi=300, bbox_inches="tight")
|
||||
print(f"Optimization plots saved to {save_path}")
|
||||
else:
|
||||
plt.show()
|
||||
@@ -379,12 +394,12 @@ class PersistenceOptimizer:
|
||||
|
||||
# Prepare optimization summary
|
||||
optimization_summary = {
|
||||
'best_parameters': best_params,
|
||||
'best_objective_value': best_score,
|
||||
'optimization_time': total_time,
|
||||
'evaluations_performed': len(self.optimization_history),
|
||||
'final_simulation_results': final_results,
|
||||
'optimization_history': self.optimization_history
|
||||
"best_parameters": best_params,
|
||||
"best_objective_value": best_score,
|
||||
"optimization_time": total_time,
|
||||
"evaluations_performed": len(self.optimization_history),
|
||||
"final_simulation_results": final_results,
|
||||
"optimization_history": self.optimization_history,
|
||||
}
|
||||
|
||||
return optimization_summary
|
||||
@@ -402,9 +417,9 @@ class PersistenceOptimizer:
|
||||
|
||||
print("OPTIMAL PARAMETERS:")
|
||||
print("-" * 40)
|
||||
for param, value in summary['best_parameters'].items():
|
||||
for param, value in summary["best_parameters"].items():
|
||||
if isinstance(value, float):
|
||||
if param.endswith('_rate'):
|
||||
if param.endswith("_rate"):
|
||||
print(f" {param:<25}: {value:.4f}")
|
||||
else:
|
||||
print(f" {param:<25}: {value:.2f}")
|
||||
@@ -413,7 +428,7 @@ class PersistenceOptimizer:
|
||||
|
||||
print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:")
|
||||
print("-" * 50)
|
||||
final_results = summary['final_simulation_results']
|
||||
final_results = summary["final_simulation_results"]
|
||||
print_results(final_results)
|
||||
|
||||
print("\nPARAMETER IMPACT ANALYSIS:")
|
||||
@@ -440,7 +455,7 @@ def main():
|
||||
simulation_duration=15.0, # Shorter sims for faster optimization
|
||||
arrival_rate=1000.0,
|
||||
objective_metric=objective,
|
||||
random_seed=42
|
||||
random_seed=42,
|
||||
)
|
||||
|
||||
# Run optimization
|
||||
@@ -449,13 +464,13 @@ def main():
|
||||
|
||||
# Generate plots
|
||||
try:
|
||||
optimizer.plot_optimization_progress(f'optimization_{objective}.png')
|
||||
optimizer.plot_optimization_progress(f"optimization_{objective}.png")
|
||||
except Exception as e:
|
||||
print(f"Could not generate plots: {e}")
|
||||
|
||||
print(f"\nOptimization for {objective} completed!")
|
||||
print("="*80)
|
||||
print("=" * 80)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
main()
|
||||
@@ -27,6 +27,7 @@ import time
|
||||
@dataclass
|
||||
class Commit:
|
||||
"""Represents a single commit request"""
|
||||
|
||||
commit_id: int
|
||||
arrival_time: float
|
||||
size_bytes: int = 1024 # Default 1KB per commit
|
||||
@@ -35,6 +36,7 @@ class Commit:
|
||||
@dataclass
|
||||
class Batch:
|
||||
"""Represents a batch of commits being processed"""
|
||||
|
||||
batch_id: int
|
||||
commits: List[Commit]
|
||||
created_time: float
|
||||
@@ -48,6 +50,7 @@ class Batch:
|
||||
@dataclass
|
||||
class InFlightRequest:
|
||||
"""Tracks an in-flight S3 request"""
|
||||
|
||||
batch: Batch
|
||||
start_time: float
|
||||
expected_completion: float
|
||||
@@ -66,24 +69,23 @@ class PersistenceSimulation:
|
||||
- Shape parameter controls the heaviness of the tail
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
# Configuration from persistence.md defaults
|
||||
batch_timeout_ms: float = 5.0,
|
||||
batch_size_threshold: int = 1024 * 1024, # 1MB
|
||||
max_in_flight_requests: int = 5,
|
||||
max_retry_attempts: int = 3,
|
||||
retry_base_delay_ms: float = 100.0,
|
||||
|
||||
# S3 latency modeling (Gamma distribution parameters)
|
||||
s3_latency_shape: float = 2.0, # Shape parameter (α)
|
||||
s3_latency_scale: float = 25.0, # Scale parameter (β)
|
||||
s3_failure_rate: float = 0.01, # 1% failure rate
|
||||
|
||||
# Arrival rate modeling
|
||||
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
|
||||
|
||||
# Simulation parameters
|
||||
simulation_duration_sec: float = 60.0):
|
||||
def __init__(
|
||||
self,
|
||||
# Configuration from persistence.md defaults
|
||||
batch_timeout_ms: float = 5.0,
|
||||
batch_size_threshold: int = 1024 * 1024, # 1MB
|
||||
max_in_flight_requests: int = 5,
|
||||
max_retry_attempts: int = 3,
|
||||
retry_base_delay_ms: float = 100.0,
|
||||
# S3 latency modeling (Gamma distribution parameters)
|
||||
s3_latency_shape: float = 2.0, # Shape parameter (α)
|
||||
s3_latency_scale: float = 25.0, # Scale parameter (β)
|
||||
s3_failure_rate: float = 0.01, # 1% failure rate
|
||||
# Arrival rate modeling
|
||||
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
|
||||
# Simulation parameters
|
||||
simulation_duration_sec: float = 60.0,
|
||||
):
|
||||
|
||||
# Configuration
|
||||
self.batch_timeout_ms = batch_timeout_ms
|
||||
@@ -142,7 +144,9 @@ class PersistenceSimulation:
|
||||
min_rtt = 30.0 # 30ms minimum round-trip time
|
||||
|
||||
# Variable latency component from Gamma distribution
|
||||
variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale)
|
||||
variable_latency = self.np_rng.gamma(
|
||||
self.s3_latency_shape, self.s3_latency_scale
|
||||
)
|
||||
|
||||
# Size-dependent scaling: +20ms per MB
|
||||
size_mb = batch_size_bytes / (1024 * 1024)
|
||||
@@ -191,7 +195,9 @@ class PersistenceSimulation:
|
||||
|
||||
# Time trigger - only if we have a valid batch start time
|
||||
if self.batch_start_time is not None:
|
||||
if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||
if (self.current_time - self.batch_start_time) >= (
|
||||
self.batch_timeout_ms / 1000.0
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
@@ -209,7 +215,7 @@ class PersistenceSimulation:
|
||||
batch = Batch(
|
||||
batch_id=self.next_batch_id,
|
||||
commits=self.current_batch.copy(),
|
||||
created_time=self.current_time
|
||||
created_time=self.current_time,
|
||||
)
|
||||
self.next_batch_id += 1
|
||||
|
||||
@@ -223,11 +229,13 @@ class PersistenceSimulation:
|
||||
"""Send batch to S3 and track as in-flight request"""
|
||||
if not self.can_start_new_request():
|
||||
# This shouldn't happen due to flow control, but handle gracefully
|
||||
self.schedule_event(self.current_time + 0.001, 'retry_batch', batch)
|
||||
self.schedule_event(self.current_time + 0.001, "retry_batch", batch)
|
||||
return
|
||||
|
||||
# Sample S3 response characteristics (pass batch size for latency modeling)
|
||||
s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds
|
||||
s3_latency = (
|
||||
self.sample_s3_latency(batch.size_bytes) / 1000.0
|
||||
) # Convert ms to seconds
|
||||
will_fail = self.rng.random() < self.s3_failure_rate
|
||||
|
||||
if will_fail:
|
||||
@@ -242,26 +250,28 @@ class PersistenceSimulation:
|
||||
batch=batch,
|
||||
start_time=self.current_time,
|
||||
expected_completion=completion_time,
|
||||
connection_id=connection_id
|
||||
connection_id=connection_id,
|
||||
)
|
||||
self.in_flight_requests[connection_id] = in_flight
|
||||
|
||||
# Schedule completion event
|
||||
if will_fail:
|
||||
self.schedule_event(completion_time, 'batch_failed', connection_id)
|
||||
self.schedule_event(completion_time, "batch_failed", connection_id)
|
||||
else:
|
||||
self.schedule_event(completion_time, 'batch_completed', connection_id)
|
||||
self.schedule_event(completion_time, "batch_completed", connection_id)
|
||||
|
||||
# Log event for analysis
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_sent',
|
||||
'batch_id': batch.batch_id,
|
||||
'batch_size': batch.size_bytes,
|
||||
'commit_count': len(batch.commits),
|
||||
'retry_count': batch.retry_count,
|
||||
'is_retry': is_retry
|
||||
})
|
||||
self.timeline_events.append(
|
||||
{
|
||||
"time": self.current_time,
|
||||
"event": "batch_sent",
|
||||
"batch_id": batch.batch_id,
|
||||
"batch_size": batch.size_bytes,
|
||||
"commit_count": len(batch.commits),
|
||||
"retry_count": batch.retry_count,
|
||||
"is_retry": is_retry,
|
||||
}
|
||||
)
|
||||
|
||||
def handle_batch_completed(self, connection_id: int):
|
||||
"""Handle successful batch completion"""
|
||||
@@ -273,33 +283,39 @@ class PersistenceSimulation:
|
||||
|
||||
# Calculate metrics
|
||||
batch_latency = self.current_time - batch.created_time
|
||||
self.batch_metrics.append({
|
||||
'batch_id': batch.batch_id,
|
||||
'latency': batch_latency,
|
||||
'size_bytes': batch.size_bytes,
|
||||
'commit_count': len(batch.commits),
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
self.batch_metrics.append(
|
||||
{
|
||||
"batch_id": batch.batch_id,
|
||||
"latency": batch_latency,
|
||||
"size_bytes": batch.size_bytes,
|
||||
"commit_count": len(batch.commits),
|
||||
"retry_count": batch.retry_count,
|
||||
}
|
||||
)
|
||||
|
||||
# Mark commits as completed and calculate end-to-end latency
|
||||
for commit in batch.commits:
|
||||
commit_latency = self.current_time - commit.arrival_time
|
||||
self.completed_commits.append({
|
||||
'commit_id': commit.commit_id,
|
||||
'arrival_time': commit.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency': commit_latency,
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
self.completed_commits.append(
|
||||
{
|
||||
"commit_id": commit.commit_id,
|
||||
"arrival_time": commit.arrival_time,
|
||||
"completion_time": self.current_time,
|
||||
"latency": commit_latency,
|
||||
"batch_id": batch.batch_id,
|
||||
"retry_count": batch.retry_count,
|
||||
}
|
||||
)
|
||||
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_completed',
|
||||
'batch_id': batch.batch_id,
|
||||
'latency': batch_latency,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
self.timeline_events.append(
|
||||
{
|
||||
"time": self.current_time,
|
||||
"event": "batch_completed",
|
||||
"batch_id": batch.batch_id,
|
||||
"latency": batch_latency,
|
||||
"retry_count": batch.retry_count,
|
||||
}
|
||||
)
|
||||
|
||||
# Try to process any pending work now that we have capacity
|
||||
self.process_pending_work()
|
||||
@@ -312,29 +328,35 @@ class PersistenceSimulation:
|
||||
in_flight = self.in_flight_requests.pop(connection_id)
|
||||
batch = in_flight.batch
|
||||
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_failed',
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
self.timeline_events.append(
|
||||
{
|
||||
"time": self.current_time,
|
||||
"event": "batch_failed",
|
||||
"batch_id": batch.batch_id,
|
||||
"retry_count": batch.retry_count,
|
||||
}
|
||||
)
|
||||
|
||||
if batch.retry_count < self.max_retry_attempts:
|
||||
# Exponential backoff retry
|
||||
batch.retry_count += 1
|
||||
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1))
|
||||
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (
|
||||
2 ** (batch.retry_count - 1)
|
||||
)
|
||||
retry_time = self.current_time + backoff_delay
|
||||
|
||||
self.schedule_event(retry_time, 'retry_batch', batch)
|
||||
self.schedule_event(retry_time, "retry_batch", batch)
|
||||
self.retry_counts[batch.retry_count] += 1
|
||||
else:
|
||||
# Max retries exhausted - this would be a fatal error in real system
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_abandoned',
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
self.timeline_events.append(
|
||||
{
|
||||
"time": self.current_time,
|
||||
"event": "batch_abandoned",
|
||||
"batch_id": batch.batch_id,
|
||||
"retry_count": batch.retry_count,
|
||||
}
|
||||
)
|
||||
|
||||
def handle_retry_batch(self, batch: Batch):
|
||||
"""Handle batch retry"""
|
||||
@@ -374,7 +396,7 @@ class PersistenceSimulation:
|
||||
# Schedule timeout event for current batch if it's the first commit
|
||||
if len(self.current_batch) == 1 and not self.pending_commits:
|
||||
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
|
||||
self.schedule_event(timeout_time, 'batch_timeout', None)
|
||||
self.schedule_event(timeout_time, "batch_timeout", None)
|
||||
|
||||
def handle_batch_timeout(self):
|
||||
"""Handle batch timeout trigger"""
|
||||
@@ -387,14 +409,19 @@ class PersistenceSimulation:
|
||||
current_batch_count = len(self.current_batch)
|
||||
in_flight_count = len(self.in_flight_requests)
|
||||
|
||||
self.queue_depth_samples.append({
|
||||
'time': self.current_time,
|
||||
'pending_commits': pending_count,
|
||||
'current_batch_size': current_batch_count,
|
||||
'in_flight_requests': in_flight_count,
|
||||
'total_unacknowledged': pending_count + current_batch_count +
|
||||
sum(len(req.batch.commits) for req in self.in_flight_requests.values())
|
||||
})
|
||||
self.queue_depth_samples.append(
|
||||
{
|
||||
"time": self.current_time,
|
||||
"pending_commits": pending_count,
|
||||
"current_batch_size": current_batch_count,
|
||||
"in_flight_requests": in_flight_count,
|
||||
"total_unacknowledged": pending_count
|
||||
+ current_batch_count
|
||||
+ sum(
|
||||
len(req.batch.commits) for req in self.in_flight_requests.values()
|
||||
),
|
||||
}
|
||||
)
|
||||
|
||||
def generate_arrivals(self):
|
||||
"""Generate Poisson arrival events for the simulation"""
|
||||
@@ -412,17 +439,17 @@ class PersistenceSimulation:
|
||||
commit = Commit(
|
||||
commit_id=self.next_commit_id,
|
||||
arrival_time=current_time,
|
||||
size_bytes=self.sample_commit_size() # Realistic size distribution
|
||||
size_bytes=self.sample_commit_size(), # Realistic size distribution
|
||||
)
|
||||
self.next_commit_id += 1
|
||||
|
||||
# Schedule arrival event
|
||||
self.schedule_event(current_time, 'commit_arrival', commit)
|
||||
self.schedule_event(current_time, "commit_arrival", commit)
|
||||
|
||||
# Schedule periodic queue depth sampling
|
||||
sample_time = 0.0
|
||||
while sample_time < self.simulation_duration_sec:
|
||||
self.schedule_event(sample_time, 'sample_queue_depth', None)
|
||||
self.schedule_event(sample_time, "sample_queue_depth", None)
|
||||
sample_time += 0.1 # Sample every 100ms
|
||||
|
||||
def run_simulation(self):
|
||||
@@ -430,7 +457,9 @@ class PersistenceSimulation:
|
||||
print(f"Starting persistence simulation...")
|
||||
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
|
||||
print(f"Duration: {self.simulation_duration_sec} seconds")
|
||||
print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}")
|
||||
print(
|
||||
f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}"
|
||||
)
|
||||
print()
|
||||
|
||||
# Generate all arrival events
|
||||
@@ -445,17 +474,17 @@ class PersistenceSimulation:
|
||||
if time > self.simulation_duration_sec:
|
||||
break
|
||||
|
||||
if event_type == 'commit_arrival':
|
||||
if event_type == "commit_arrival":
|
||||
self.handle_commit_arrival(data)
|
||||
elif event_type == 'batch_completed':
|
||||
elif event_type == "batch_completed":
|
||||
self.handle_batch_completed(data)
|
||||
elif event_type == 'batch_failed':
|
||||
elif event_type == "batch_failed":
|
||||
self.handle_batch_failed(data)
|
||||
elif event_type == 'retry_batch':
|
||||
elif event_type == "retry_batch":
|
||||
self.handle_retry_batch(data)
|
||||
elif event_type == 'batch_timeout':
|
||||
elif event_type == "batch_timeout":
|
||||
self.handle_batch_timeout()
|
||||
elif event_type == 'sample_queue_depth':
|
||||
elif event_type == "sample_queue_depth":
|
||||
self.sample_queue_depth()
|
||||
|
||||
events_processed += 1
|
||||
@@ -469,38 +498,46 @@ class PersistenceSimulation:
|
||||
return {"error": "No commits completed during simulation"}
|
||||
|
||||
# Calculate latency statistics
|
||||
latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms
|
||||
latencies = [
|
||||
c["latency"] * 1000 for c in self.completed_commits
|
||||
] # Convert to ms
|
||||
|
||||
results = {
|
||||
'simulation_config': {
|
||||
'duration_sec': self.simulation_duration_sec,
|
||||
'arrival_rate_per_sec': self.arrival_rate_per_sec,
|
||||
'batch_timeout_ms': self.batch_timeout_ms,
|
||||
'batch_size_threshold': self.batch_size_threshold,
|
||||
'max_in_flight_requests': self.max_in_flight_requests,
|
||||
's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
|
||||
's3_failure_rate': self.s3_failure_rate
|
||||
"simulation_config": {
|
||||
"duration_sec": self.simulation_duration_sec,
|
||||
"arrival_rate_per_sec": self.arrival_rate_per_sec,
|
||||
"batch_timeout_ms": self.batch_timeout_ms,
|
||||
"batch_size_threshold": self.batch_size_threshold,
|
||||
"max_in_flight_requests": self.max_in_flight_requests,
|
||||
"s3_latency_params": f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
|
||||
"s3_failure_rate": self.s3_failure_rate,
|
||||
},
|
||||
'commit_metrics': {
|
||||
'total_commits': len(self.completed_commits),
|
||||
'latency_ms': {
|
||||
'mean': statistics.mean(latencies),
|
||||
'median': statistics.median(latencies),
|
||||
'std': statistics.stdev(latencies) if len(latencies) > 1 else 0,
|
||||
'min': min(latencies),
|
||||
'max': max(latencies),
|
||||
'p95': np.percentile(latencies, 95),
|
||||
'p99': np.percentile(latencies, 99)
|
||||
}
|
||||
"commit_metrics": {
|
||||
"total_commits": len(self.completed_commits),
|
||||
"latency_ms": {
|
||||
"mean": statistics.mean(latencies),
|
||||
"median": statistics.median(latencies),
|
||||
"std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
|
||||
"min": min(latencies),
|
||||
"max": max(latencies),
|
||||
"p95": np.percentile(latencies, 95),
|
||||
"p99": np.percentile(latencies, 99),
|
||||
},
|
||||
},
|
||||
'batch_metrics': {
|
||||
'total_batches': len(self.batch_metrics),
|
||||
'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]),
|
||||
'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]),
|
||||
'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics])
|
||||
"batch_metrics": {
|
||||
"total_batches": len(self.batch_metrics),
|
||||
"avg_commits_per_batch": statistics.mean(
|
||||
[b["commit_count"] for b in self.batch_metrics]
|
||||
),
|
||||
"avg_batch_size_bytes": statistics.mean(
|
||||
[b["size_bytes"] for b in self.batch_metrics]
|
||||
),
|
||||
"avg_batch_latency_ms": statistics.mean(
|
||||
[b["latency"] * 1000 for b in self.batch_metrics]
|
||||
),
|
||||
},
|
||||
'retry_analysis': dict(self.retry_counts),
|
||||
'queue_depth_analysis': self._analyze_queue_depths()
|
||||
"retry_analysis": dict(self.retry_counts),
|
||||
"queue_depth_analysis": self._analyze_queue_depths(),
|
||||
}
|
||||
|
||||
return results
|
||||
@@ -510,26 +547,26 @@ class PersistenceSimulation:
|
||||
if not self.queue_depth_samples:
|
||||
return {}
|
||||
|
||||
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||
pending = [s["pending_commits"] for s in self.queue_depth_samples]
|
||||
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
|
||||
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
|
||||
|
||||
return {
|
||||
'pending_commits': {
|
||||
'mean': statistics.mean(pending),
|
||||
'max': max(pending),
|
||||
'p95': np.percentile(pending, 95)
|
||||
"pending_commits": {
|
||||
"mean": statistics.mean(pending),
|
||||
"max": max(pending),
|
||||
"p95": np.percentile(pending, 95),
|
||||
},
|
||||
'in_flight_requests': {
|
||||
'mean': statistics.mean(in_flight),
|
||||
'max': max(in_flight),
|
||||
'p95': np.percentile(in_flight, 95)
|
||||
"in_flight_requests": {
|
||||
"mean": statistics.mean(in_flight),
|
||||
"max": max(in_flight),
|
||||
"p95": np.percentile(in_flight, 95),
|
||||
},
|
||||
"total_unacknowledged": {
|
||||
"mean": statistics.mean(total_unack),
|
||||
"max": max(total_unack),
|
||||
"p95": np.percentile(total_unack, 95),
|
||||
},
|
||||
'total_unacknowledged': {
|
||||
'mean': statistics.mean(total_unack),
|
||||
'max': max(total_unack),
|
||||
'p95': np.percentile(total_unack, 95)
|
||||
}
|
||||
}
|
||||
|
||||
def plot_results(self, results: Dict, save_path: Optional[str] = None):
|
||||
@@ -539,57 +576,69 @@ class PersistenceSimulation:
|
||||
return
|
||||
|
||||
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||||
fig.suptitle('Persistence Thread Simulation Results', fontsize=16)
|
||||
fig.suptitle("Persistence Thread Simulation Results", fontsize=16)
|
||||
|
||||
# Plot 1: Commit latency histogram
|
||||
latencies_ms = [c['latency'] * 1000 for c in self.completed_commits]
|
||||
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black')
|
||||
ax1.set_xlabel('Commit Latency (ms)')
|
||||
ax1.set_ylabel('Count')
|
||||
ax1.set_title('Commit Latency Distribution')
|
||||
ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red',
|
||||
linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms")
|
||||
ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange',
|
||||
linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms")
|
||||
latencies_ms = [c["latency"] * 1000 for c in self.completed_commits]
|
||||
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor="black")
|
||||
ax1.set_xlabel("Commit Latency (ms)")
|
||||
ax1.set_ylabel("Count")
|
||||
ax1.set_title("Commit Latency Distribution")
|
||||
ax1.axvline(
|
||||
results["commit_metrics"]["latency_ms"]["mean"],
|
||||
color="red",
|
||||
linestyle="--",
|
||||
label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms",
|
||||
)
|
||||
ax1.axvline(
|
||||
results["commit_metrics"]["latency_ms"]["p95"],
|
||||
color="orange",
|
||||
linestyle="--",
|
||||
label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms",
|
||||
)
|
||||
ax1.legend()
|
||||
|
||||
# Plot 2: Timeline of commit completions
|
||||
completion_times = [c['completion_time'] for c in self.completed_commits]
|
||||
completion_latencies = [c['latency'] * 1000 for c in self.completed_commits]
|
||||
completion_times = [c["completion_time"] for c in self.completed_commits]
|
||||
completion_latencies = [c["latency"] * 1000 for c in self.completed_commits]
|
||||
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
|
||||
ax2.set_xlabel('Time (seconds)')
|
||||
ax2.set_ylabel('Commit Latency (ms)')
|
||||
ax2.set_title('Latency Over Time')
|
||||
ax2.set_xlabel("Time (seconds)")
|
||||
ax2.set_ylabel("Commit Latency (ms)")
|
||||
ax2.set_title("Latency Over Time")
|
||||
|
||||
# Plot 3: Queue depth over time
|
||||
if self.queue_depth_samples:
|
||||
times = [s['time'] for s in self.queue_depth_samples]
|
||||
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||
times = [s["time"] for s in self.queue_depth_samples]
|
||||
pending = [s["pending_commits"] for s in self.queue_depth_samples]
|
||||
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
|
||||
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
|
||||
|
||||
ax3.plot(times, pending, label='Pending Commits', alpha=0.8)
|
||||
ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8)
|
||||
ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8)
|
||||
ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--',
|
||||
label=f'Max In-Flight Limit ({self.max_in_flight_requests})')
|
||||
ax3.set_xlabel('Time (seconds)')
|
||||
ax3.set_ylabel('Count')
|
||||
ax3.set_title('Queue Depths Over Time')
|
||||
ax3.plot(times, pending, label="Pending Commits", alpha=0.8)
|
||||
ax3.plot(times, in_flight, label="In-Flight Requests", alpha=0.8)
|
||||
ax3.plot(times, total_unack, label="Total Unacknowledged", alpha=0.8)
|
||||
ax3.axhline(
|
||||
self.max_in_flight_requests,
|
||||
color="red",
|
||||
linestyle="--",
|
||||
label=f"Max In-Flight Limit ({self.max_in_flight_requests})",
|
||||
)
|
||||
ax3.set_xlabel("Time (seconds)")
|
||||
ax3.set_ylabel("Count")
|
||||
ax3.set_title("Queue Depths Over Time")
|
||||
ax3.legend()
|
||||
|
||||
# Plot 4: Batch size distribution
|
||||
if self.batch_metrics:
|
||||
batch_sizes = [b['commit_count'] for b in self.batch_metrics]
|
||||
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black')
|
||||
ax4.set_xlabel('Commits per Batch')
|
||||
ax4.set_ylabel('Count')
|
||||
ax4.set_title('Batch Size Distribution')
|
||||
batch_sizes = [b["commit_count"] for b in self.batch_metrics]
|
||||
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor="black")
|
||||
ax4.set_xlabel("Commits per Batch")
|
||||
ax4.set_ylabel("Count")
|
||||
ax4.set_title("Batch Size Distribution")
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
if save_path:
|
||||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||
plt.savefig(save_path, dpi=300, bbox_inches="tight")
|
||||
print(f"Plots saved to {save_path}")
|
||||
else:
|
||||
plt.show()
|
||||
@@ -602,7 +651,7 @@ def print_results(results: Dict):
|
||||
print("=" * 80)
|
||||
|
||||
# Configuration
|
||||
config = results['simulation_config']
|
||||
config = results["simulation_config"]
|
||||
print(f"\nConfiguration:")
|
||||
print(f" Duration: {config['duration_sec']}s")
|
||||
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
|
||||
@@ -613,8 +662,8 @@ def print_results(results: Dict):
|
||||
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
|
||||
|
||||
# Commit metrics
|
||||
commit_metrics = results['commit_metrics']
|
||||
latency = commit_metrics['latency_ms']
|
||||
commit_metrics = results["commit_metrics"]
|
||||
latency = commit_metrics["latency_ms"]
|
||||
print(f"\nCommit Performance:")
|
||||
print(f" Total Commits: {commit_metrics['total_commits']:,}")
|
||||
print(f" Latency Mean: {latency['mean']:.2f}ms")
|
||||
@@ -625,7 +674,7 @@ def print_results(results: Dict):
|
||||
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
|
||||
|
||||
# Batch metrics
|
||||
batch_metrics = results['batch_metrics']
|
||||
batch_metrics = results["batch_metrics"]
|
||||
print(f"\nBatching Performance:")
|
||||
print(f" Total Batches: {batch_metrics['total_batches']:,}")
|
||||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||
@@ -633,24 +682,30 @@ def print_results(results: Dict):
|
||||
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
|
||||
|
||||
# Retry analysis
|
||||
if results['retry_analysis']:
|
||||
if results["retry_analysis"]:
|
||||
print(f"\nRetry Analysis:")
|
||||
for retry_count, occurrences in results['retry_analysis'].items():
|
||||
for retry_count, occurrences in results["retry_analysis"].items():
|
||||
print(f" {occurrences:,} batches required {retry_count} retries")
|
||||
|
||||
# Queue depth analysis
|
||||
if results['queue_depth_analysis']:
|
||||
queue_analysis = results['queue_depth_analysis']
|
||||
if results["queue_depth_analysis"]:
|
||||
queue_analysis = results["queue_depth_analysis"]
|
||||
print(f"\nQueue Depth Analysis:")
|
||||
if 'pending_commits' in queue_analysis:
|
||||
pending = queue_analysis['pending_commits']
|
||||
print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}")
|
||||
if 'in_flight_requests' in queue_analysis:
|
||||
in_flight = queue_analysis['in_flight_requests']
|
||||
print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}")
|
||||
if 'total_unacknowledged' in queue_analysis:
|
||||
total = queue_analysis['total_unacknowledged']
|
||||
print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}")
|
||||
if "pending_commits" in queue_analysis:
|
||||
pending = queue_analysis["pending_commits"]
|
||||
print(
|
||||
f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}"
|
||||
)
|
||||
if "in_flight_requests" in queue_analysis:
|
||||
in_flight = queue_analysis["in_flight_requests"]
|
||||
print(
|
||||
f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}"
|
||||
)
|
||||
if "total_unacknowledged" in queue_analysis:
|
||||
total = queue_analysis["total_unacknowledged"]
|
||||
print(
|
||||
f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}"
|
||||
)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
@@ -664,8 +719,16 @@ if __name__ == "__main__":
|
||||
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
|
||||
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
|
||||
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
|
||||
{"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
|
||||
{"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
|
||||
{
|
||||
"name": "Lower Timeout (max_in_flight=10, timeout=2ms)",
|
||||
"max_in_flight_requests": 10,
|
||||
"batch_timeout_ms": 2.0,
|
||||
},
|
||||
{
|
||||
"name": "Higher Timeout (max_in_flight=10, timeout=10ms)",
|
||||
"max_in_flight_requests": 10,
|
||||
"batch_timeout_ms": 10.0,
|
||||
},
|
||||
]
|
||||
|
||||
results_comparison = []
|
||||
@@ -682,7 +745,7 @@ if __name__ == "__main__":
|
||||
s3_latency_scale=25.0,
|
||||
s3_failure_rate=0.01,
|
||||
max_in_flight_requests=config.get("max_in_flight_requests", 5),
|
||||
batch_timeout_ms=config.get("batch_timeout_ms", 5.0)
|
||||
batch_timeout_ms=config.get("batch_timeout_ms", 5.0),
|
||||
)
|
||||
|
||||
results = sim.run_simulation()
|
||||
@@ -690,9 +753,9 @@ if __name__ == "__main__":
|
||||
results_comparison.append(results)
|
||||
|
||||
# Print key metrics for quick comparison
|
||||
commit_metrics = results['commit_metrics']
|
||||
batch_metrics = results['batch_metrics']
|
||||
queue_metrics = results.get('queue_depth_analysis', {})
|
||||
commit_metrics = results["commit_metrics"]
|
||||
batch_metrics = results["batch_metrics"]
|
||||
queue_metrics = results.get("queue_depth_analysis", {})
|
||||
|
||||
print(f"\nKey Metrics:")
|
||||
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
|
||||
@@ -701,8 +764,12 @@ if __name__ == "__main__":
|
||||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||||
if queue_metrics:
|
||||
print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}")
|
||||
print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}")
|
||||
print(
|
||||
f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}"
|
||||
)
|
||||
print(
|
||||
f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}"
|
||||
)
|
||||
|
||||
# Summary comparison
|
||||
print(f"\n{'='*80}")
|
||||
@@ -713,25 +780,33 @@ if __name__ == "__main__":
|
||||
|
||||
for result in results_comparison:
|
||||
name = result["config_name"]
|
||||
commit_metrics = result['commit_metrics']
|
||||
queue_metrics = result.get('queue_depth_analysis', {})
|
||||
mean_lat = commit_metrics['latency_ms']['mean']
|
||||
p95_lat = commit_metrics['latency_ms']['p95']
|
||||
p99_lat = commit_metrics['latency_ms']['p99']
|
||||
avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0)
|
||||
commit_metrics = result["commit_metrics"]
|
||||
queue_metrics = result.get("queue_depth_analysis", {})
|
||||
mean_lat = commit_metrics["latency_ms"]["mean"]
|
||||
p95_lat = commit_metrics["latency_ms"]["p95"]
|
||||
p99_lat = commit_metrics["latency_ms"]["p99"]
|
||||
avg_queue = queue_metrics.get("total_unacknowledged", {}).get("mean", 0)
|
||||
|
||||
print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}")
|
||||
print(
|
||||
f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}"
|
||||
)
|
||||
|
||||
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
|
||||
print(f"Note: Higher in-flight allows more parallelism but may increase queue variability")
|
||||
print(
|
||||
f"Note: Higher in-flight allows more parallelism but may increase queue variability"
|
||||
)
|
||||
|
||||
# Generate plots for best configuration
|
||||
best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95'])
|
||||
best_config = min(
|
||||
results_comparison, key=lambda r: r["commit_metrics"]["latency_ms"]["p95"]
|
||||
)
|
||||
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
|
||||
|
||||
try:
|
||||
# Re-run best config to get simulation object for plotting
|
||||
best_params = next(c for c in configs if c['name'] == best_config['config_name'])
|
||||
best_params = next(
|
||||
c for c in configs if c["name"] == best_config["config_name"]
|
||||
)
|
||||
sim_best = PersistenceSimulation(
|
||||
arrival_rate_per_sec=1000.0,
|
||||
simulation_duration_sec=30.0,
|
||||
@@ -739,10 +814,10 @@ if __name__ == "__main__":
|
||||
s3_latency_scale=25.0,
|
||||
s3_failure_rate=0.01,
|
||||
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
|
||||
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0)
|
||||
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0),
|
||||
)
|
||||
sim_best.run_simulation()
|
||||
sim_best.plot_results(best_config, f'persistence_optimization_results.png')
|
||||
sim_best.plot_results(best_config, f"persistence_optimization_results.png")
|
||||
except Exception as e:
|
||||
print(f"\nCould not generate plots: {e}")
|
||||
print("Install matplotlib and numpy to enable visualization")
|
||||
print("Install matplotlib and numpy to enable visualization")
|
||||
Reference in New Issue
Block a user