Compare commits

...

13 Commits

14 changed files with 1465 additions and 641 deletions

View File

@@ -16,3 +16,19 @@ repos:
rev: e2c2116d86a80e72e7146a06e68b7c228afc6319 # frozen: v0.6.13
hooks:
- id: cmake-format
- repo: https://github.com/psf/black
rev: 8a737e727ac5ab2f1d4cf5876720ed276dc8dc4b # frozen: 25.1.0
hooks:
- id: black
language_version: python3
- repo: local
hooks:
- id: snake-case-enforcement
name: Enforce snake_case naming
entry: ./tools/check_snake_case.py
language: script
files: '\.(hpp|cpp)$'
exclude: '^build/'
args: ['--check-new-code-only']

View File

@@ -212,6 +212,10 @@ target_link_libraries(bench_parser_comparison nanobench weaseljson test_data
target_include_directories(bench_parser_comparison
PRIVATE src ${rapidjson_SOURCE_DIR}/include)
add_executable(bench_thread_pipeline benchmarks/bench_thread_pipeline.cpp)
target_link_libraries(bench_thread_pipeline nanobench Threads::Threads)
target_include_directories(bench_thread_pipeline PRIVATE src)
# Debug tools
add_executable(
debug_arena tools/debug_arena.cpp src/json_commit_request_parser.cpp
@@ -232,3 +236,4 @@ add_test(NAME server_connection_return_tests
add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator)
add_test(NAME commit_request_benchmarks COMMAND bench_commit_request)
add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison)
add_test(NAME thread_pipeline_benchmarks COMMAND bench_thread_pipeline)

View File

@@ -0,0 +1,245 @@
#include "thread_pipeline.hpp"
#include <latch>
#include <nanobench.h>
#include <thread>
int main() {
{
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
constexpr int NUM_ITEMS = 100'000;
constexpr int BATCH_SIZE = 16;
constexpr int BUSY_ITERS = 100;
auto bench = ankerl::nanobench::Bench()
.title("Pipeline Throughput")
.unit("item")
.batch(NUM_ITEMS)
.relative(true)
.warmup(100);
bench.run("Zero stage pipeline", [&] {
for (int i = 0; i < NUM_ITEMS; ++i) {
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
}
}
});
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
pipeline(LOG_PIPELINE_SIZE);
std::latch done{0};
// Stage 0 consumer thread
std::thread stage0_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.acquire<0, 0>();
for (auto &item : guard.batch) {
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
}
if (item == &done) {
return;
}
if (item) {
item->count_down();
}
}
}
});
bench.run("One stage pipeline", [&] {
// Producer (main thread)
int items_pushed = 0;
while (items_pushed < NUM_ITEMS - 1) {
auto guard = pipeline.push(
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
auto it = guard.batch.begin();
items_pushed += guard.batch.size();
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
*it = nullptr;
}
}
std::latch finish{1};
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &finish;
}
finish.wait();
});
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &done;
}
stage0_thread.join();
}
// Batch size comparison benchmark
{
constexpr int LOG_PIPELINE_SIZE = 10; // 2^10 = 1024 slots
constexpr int NUM_ITEMS = 100'000;
constexpr int BUSY_ITERS = 100;
auto bench = ankerl::nanobench::Bench()
.title("Batch Size Impact")
.unit("item")
.batch(NUM_ITEMS)
.relative(true)
.warmup(100);
for (int batch_size : {1, 4, 16, 64, 256}) {
StaticThreadPipeline<std::latch *, WaitStrategy::WaitIfStageEmpty, 1>
pipeline(LOG_PIPELINE_SIZE);
std::latch done{0};
// Stage 0 consumer thread
std::thread stage0_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.acquire<0, 0>();
for (auto &item : guard.batch) {
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
}
if (item == &done) {
return;
}
if (item) {
item->count_down();
}
}
}
});
bench.run("Batch size " + std::to_string(batch_size), [&] {
// Producer (main thread)
int items_pushed = 0;
while (items_pushed < NUM_ITEMS - 1) {
auto guard = pipeline.push(
std::min(NUM_ITEMS - 1 - items_pushed, batch_size), true);
auto it = guard.batch.begin();
items_pushed += guard.batch.size();
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
*it = nullptr;
}
}
std::latch finish{1};
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &finish;
}
finish.wait();
});
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &done;
}
stage0_thread.join();
}
}
// Helper function for wait strategy benchmarks
auto benchmark_wait_strategy =
[]<WaitStrategy strategy>(const std::string &name,
ankerl::nanobench::Bench &bench) {
constexpr int LOG_PIPELINE_SIZE =
8; // Smaller buffer to increase contention
constexpr int NUM_ITEMS = 50'000;
constexpr int BATCH_SIZE = 4; // Small batches to increase coordination
constexpr int BUSY_ITERS =
10; // Light work to emphasize coordination overhead
StaticThreadPipeline<std::latch *, strategy, 1, 1> pipeline(
LOG_PIPELINE_SIZE);
std::latch done{0};
// Stage 0 worker
std::thread stage0_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.template acquire<0, 0>();
for (auto &item : guard.batch) {
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
}
if (item == &done)
return;
}
}
});
// Stage 1 worker (final stage - always calls futex wake)
std::thread stage1_thread([&pipeline, &done]() {
for (;;) {
auto guard = pipeline.template acquire<1, 0>();
for (auto &item : guard.batch) {
for (volatile int i = 0; i < BUSY_ITERS; i = i + 1) {
}
if (item == &done)
return;
if (item)
item->count_down();
}
}
});
bench.run(name, [&] {
int items_pushed = 0;
while (items_pushed < NUM_ITEMS - 1) {
auto guard = pipeline.push(
std::min(NUM_ITEMS - 1 - items_pushed, BATCH_SIZE), true);
auto it = guard.batch.begin();
items_pushed += guard.batch.size();
for (size_t i = 0; i < guard.batch.size(); ++i, ++it) {
*it = nullptr;
}
}
std::latch finish{1};
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &finish;
}
finish.wait();
});
// Shutdown
{
auto guard = pipeline.push(1, true);
guard.batch[0] = &done;
}
stage0_thread.join();
stage1_thread.join();
};
// Wait strategy comparison benchmark - multiple stages to trigger futex wakes
{
auto bench = ankerl::nanobench::Bench()
.title("Wait Strategy Comparison")
.unit("item")
.batch(50'000)
.relative(true)
.warmup(50);
benchmark_wait_strategy.template operator()<WaitStrategy::WaitIfStageEmpty>(
"WaitIfStageEmpty", bench);
benchmark_wait_strategy.template
operator()<WaitStrategy::WaitIfUpstreamIdle>("WaitIfUpstreamIdle", bench);
benchmark_wait_strategy.template operator()<WaitStrategy::Never>("Never",
bench);
}
// TODO: Add more benchmarks for:
// - Multi-stage pipelines (3+ stages)
// - Multiple threads per stage
// - Different batch sizes
// - Pipeline contention under load
// - Memory usage patterns
// - Latency measurements
// - Different wait strategies
return 0;
}

View File

@@ -1,12 +1,14 @@
#include <nanobench.h>
#include "../src/loop_iterations.h"
int main() {
constexpr int loopIterations = 1200;
ankerl::nanobench::Bench().run(
"volatile loop to " + std::to_string(loopIterations), [&] {
for (volatile int i = 0; i < 1200; i = i + 1)
;
});
ankerl::nanobench::Bench bench;
bench.minEpochIterations(100000);
bench.run("volatile loop to " + std::to_string(loopIterations), [&] {
for (volatile int i = 0; i < loopIterations; i = i + 1)
;
});
return 0;
}

View File

@@ -8,6 +8,7 @@
#include "connection.hpp"
#include "connection_handler.hpp"
#include "loop_iterations.h"
#include "perfetto_categories.hpp"
#include "server.hpp"
#include "thread_pipeline.hpp"
@@ -64,37 +65,51 @@ struct HttpConnectionState {
*/
struct HttpHandler : ConnectionHandler {
HttpHandler() {
for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) {
finalStageThreads.emplace_back([this, threadId]() {
pthread_setname_np(pthread_self(),
("stage-1-" + std::to_string(threadId)).c_str());
for (;;) {
auto guard = pipeline.acquire(1, threadId);
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % kFinalStageThreads) == threadId) {
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "pipeline thread",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
finalStageThreads.emplace_back([this]() {
pthread_setname_np(pthread_self(), "stage-1-0");
for (;;) {
auto guard = pipeline.acquire<1, 0>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
}
}
});
}
}
});
finalStageThreads.emplace_back([this]() {
pthread_setname_np(pthread_self(), "stage-1-1");
for (;;) {
auto guard = pipeline.acquire<1, 1>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
}
}
}
});
stage0Thread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "stage-0");
for (;;) {
auto guard = pipeline.acquire(0, 0, 0, false);
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
auto &c = *it;
auto guard = pipeline.acquire<0, 0>();
for (auto &c : guard.batch) {
if (!c) {
return;
}
for (volatile int i = 0; i < 1200; i = i + 1)
for (volatile int i = 0; i < loopIterations; i = i + 1)
;
}
}
@@ -102,7 +117,7 @@ struct HttpHandler : ConnectionHandler {
}
~HttpHandler() {
{
auto guard = pipeline.push(kFinalStageThreads, true);
auto guard = pipeline.push(2, true);
for (auto &c : guard.batch) {
c = {};
}
@@ -137,8 +152,9 @@ struct HttpHandler : ConnectionHandler {
private:
static constexpr int kFinalStageThreads = 2;
static constexpr int kLogSize = 12;
ThreadPipeline<std::unique_ptr<Connection>> pipeline{
kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}};
StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
pipeline{kLogSize};
std::thread stage0Thread;
std::vector<std::thread> finalStageThreads;

3
src/loop_iterations.h Normal file
View File

@@ -0,0 +1,3 @@
#pragma once
constexpr int loopIterations = 1550;

View File

@@ -1,7 +1,9 @@
#pragma once
#include <array>
#include <atomic>
#include <cassert>
#include <cstddef>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
@@ -9,27 +11,252 @@
#include <utility>
#include <vector>
// Multi-stage lock-free pipeline for high-throughput inter-thread
// communication.
// Wait strategies for controlling thread blocking behavior when no work is
// available
enum class WaitStrategy {
// Never block - threads busy-wait (spin) when no work available.
// Stage threads will always use 100% CPU even when idle.
// Requires dedicated CPU cores to avoid scheduler thrashing.
// Use when: latency is critical and you have spare cores.
Never,
// Block only when all upstream stages are idle (no new work entering
// pipeline).
// Downstream threads busy-wait if upstream has work but not for their stage.
// Eliminates futex notifications between stages, reduces to 0% CPU when idle.
// Requires dedicated cores to avoid priority inversion when pipeline has
// work.
// Use when: high throughput with spare cores and sustained workloads.
WaitIfUpstreamIdle,
// Block when individual stages are empty (original behavior).
// Each stage waits independently on its input sources.
// Safe for shared CPU environments, works well with variable workloads.
// Use when: general purpose, shared cores, or unpredictable workloads.
WaitIfStageEmpty,
};
// Core thread state
struct ThreadState {
alignas(128) std::atomic<uint32_t> pops{0};
uint32_t local_pops{0};
std::vector<uint32_t> last_push_read;
bool last_stage;
};
// Compile-time topology configuration for static pipelines
//
// This template defines a pipeline topology at compile-time:
// - Stage and thread calculations done at compile-time
// - Type-safe indexing: Stage and thread indices validated at compile-time
// - Fixed-size arrays with known bounds
// - Code specialization for each topology
//
// Example: StaticPipelineTopology<1, 4, 2> creates:
// - Stage 0: 1 thread (index 0)
// - Stage 1: 4 threads (indices 1-4)
// - Stage 2: 2 threads (indices 5-6)
// - Total: 7 threads across 3 stages
template <int... ThreadsPerStage> struct StaticPipelineTopology {
static_assert(sizeof...(ThreadsPerStage) > 0,
"Must specify at least one stage");
static_assert(((ThreadsPerStage > 0) && ...),
"All stages must have at least one thread");
static constexpr int num_stages = sizeof...(ThreadsPerStage);
static constexpr std::array<int, num_stages> threads_per_stage = {
ThreadsPerStage...};
static constexpr int total_threads = (ThreadsPerStage + ...);
// Compile-time stage offset calculation
template <int Stage> static constexpr int stage_offset() {
static_assert(Stage >= 0 && Stage < num_stages,
"Stage index out of bounds");
if constexpr (Stage == 0) {
return 0;
} else {
return stage_offset<Stage - 1>() + threads_per_stage[Stage - 1];
}
}
// Compile-time thread index calculation
template <int Stage, int Thread> static constexpr int thread_index() {
static_assert(Stage >= 0 && Stage < num_stages,
"Stage index out of bounds");
static_assert(Thread >= 0 && Thread < threads_per_stage[Stage],
"Thread index out of bounds");
return stage_offset<Stage>() + Thread;
}
// Compile-time previous stage thread count
template <int Stage> static constexpr int prev_stage_thread_count() {
static_assert(Stage >= 0 && Stage < num_stages,
"Stage index out of bounds");
if constexpr (Stage == 0) {
return 1;
} else {
return threads_per_stage[Stage - 1];
}
}
};
// Static pipeline algorithms - compile-time specialized versions
namespace StaticPipelineAlgorithms {
template <WaitStrategy wait_strategy, typename Topology, int Stage,
int ThreadInStage>
uint32_t calculate_safe_len(
std::array<ThreadState, Topology::total_threads> &all_threads,
std::atomic<uint32_t> &pushes, bool may_block) {
constexpr int thread_idx =
Topology::template thread_index<Stage, ThreadInStage>();
auto &thread = all_threads[thread_idx];
uint32_t safe_len = UINT32_MAX;
constexpr int prev_stage_threads =
Topology::template prev_stage_thread_count<Stage>();
// Compile-time loop over previous stage threads
[&]<std::size_t... Is>(std::index_sequence<Is...>) {
(
[&] {
auto &last_push = [&]() -> std::atomic<uint32_t> & {
if constexpr (Stage == 0) {
return pushes;
} else {
constexpr int prev_thread_idx =
Topology::template thread_index<Stage - 1, Is>();
return all_threads[prev_thread_idx].pops;
}
}();
if (thread.last_push_read[Is] == thread.local_pops) {
thread.last_push_read[Is] =
last_push.load(std::memory_order_acquire);
if (thread.last_push_read[Is] == thread.local_pops) {
if (!may_block) {
safe_len = 0;
return;
}
if constexpr (wait_strategy == WaitStrategy::Never) {
// Empty - busy wait
} else if constexpr (wait_strategy ==
WaitStrategy::WaitIfUpstreamIdle) {
// We're allowed to spin as long as we eventually go to 0% cpu
// usage on idle
uint32_t push;
for (int i = 0; i < 100000; ++i) {
push = pushes.load(std::memory_order_relaxed);
if (push != thread.local_pops) {
goto dont_wait;
}
}
pushes.wait(push, std::memory_order_relaxed);
dont_wait:;
} else {
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
last_push.wait(thread.last_push_read[Is],
std::memory_order_relaxed);
}
thread.last_push_read[Is] =
last_push.load(std::memory_order_acquire);
}
}
safe_len =
std::min(safe_len, thread.last_push_read[Is] - thread.local_pops);
}(),
...);
}(std::make_index_sequence<prev_stage_threads>{});
return safe_len;
}
template <WaitStrategy wait_strategy, typename Topology, int Stage,
int ThreadInStage>
void update_thread_pops(
std::array<ThreadState, Topology::total_threads> &all_threads,
uint32_t local_pops) {
constexpr int thread_idx =
Topology::template thread_index<Stage, ThreadInStage>();
auto &thread_state = all_threads[thread_idx];
if constexpr (wait_strategy == WaitStrategy::WaitIfStageEmpty) {
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all();
} else if constexpr (Stage == Topology::num_stages - 1) { // last stage
thread_state.pops.store(local_pops, std::memory_order_seq_cst);
thread_state.pops.notify_all();
} else {
thread_state.pops.store(local_pops, std::memory_order_release);
}
}
template <typename Topology>
int check_producer_capacity(
std::array<ThreadState, Topology::total_threads> &all_threads,
uint32_t slot, uint32_t size, uint32_t slot_count, bool block) {
constexpr int last_stage = Topology::num_stages - 1;
constexpr int last_stage_offset =
Topology::template stage_offset<last_stage>();
constexpr int last_stage_thread_count =
Topology::threads_per_stage[last_stage];
for (int i = 0; i < last_stage_thread_count; ++i) {
auto &thread = all_threads[last_stage_offset + i];
uint32_t pops = thread.pops.load(std::memory_order_acquire);
if (slot + size - pops > slot_count) {
if (!block) {
return 2; // Cannot proceed
}
thread.pops.wait(pops, std::memory_order_relaxed);
return 1; // Should retry
}
}
return 0; // Can proceed
}
} // namespace StaticPipelineAlgorithms
// Static multi-stage lock-free pipeline for inter-thread communication
// with compile-time topology specification.
//
// Overview:
// - Items flow through multiple processing stages (stage 0 -> stage 1 -> ... ->
// final stage)
// - Items flow from producers through multiple processing stages (stage 0 ->
// stage 1 -> ... -> final stage)
// - Each stage can have multiple worker threads processing items in parallel
// - Uses a shared ring buffer with atomic counters for lock-free coordination
// - Supports batch processing for efficiency
// - Compile-time topology specification via template parameters
//
// Architecture:
// - Producers: External threads that add items to the pipeline via push()
// - Stages: Processing stages numbered 0, 1, 2, ... that consume items via
// acquire<Stage, Thread>()
// - Items flow: Producers -> Stage 0 -> Stage 1 -> ... -> Final Stage
//
// Differences from Dynamic Version:
// - Template parameters specify topology at compile-time (e.g., <Item,
// WaitStrategy::Never, 1, 4, 2>)
// - Stage and thread indices are template parameters, validated at compile-time
// - Fixed-size arrays replace dynamic vectors
// - Specialized algorithms for each stage/thread combination
// - Type-safe guards prevent runtime indexing errors
//
// Usage Pattern:
// // Producer threads (add items to stage 0):
// using Pipeline = StaticThreadPipeline<Item, WaitStrategy::WaitIfStageEmpty,
// 1, 4, 2>; Pipeline pipeline(lgSlotCount);
//
// // Producer threads (add items for stage 0 to consume):
// auto guard = pipeline.push(batchSize, /*block=*/true);
// for (auto& item : guard.batch) {
// // Initialize item data
// }
// // Guard destructor publishes batch to consumers
// // Guard destructor publishes batch to stage 0 consumers
//
// // Consumer threads (process items from any stage):
// auto guard = pipeline.acquire(stageNum, threadId, maxBatch,
// /*mayBlock=*/true); for (auto& item : guard.batch) {
// // Stage worker threads (process items and pass to next stage):
// auto guard = pipeline.acquire<Stage, Thread>(maxBatch, /*may_block=*/true);
// for (auto& item : guard.batch) {
// // Process item
// }
// // Guard destructor marks items as consumed and available to next stage
@@ -44,35 +271,29 @@
// ordering
// - Uses C++20 atomic wait/notify for efficient blocking when no work available
// - RAII guards ensure proper cleanup even with exceptions
template <class T> struct ThreadPipeline {
template <class T, WaitStrategy wait_strategy, int... ThreadsPerStage>
struct StaticThreadPipeline {
using Topology = StaticPipelineTopology<ThreadsPerStage...>;
// Constructor
// lgSlotCount: log2 of ring buffer size (e.g., 10 -> 1024 slots)
// threadsPerStage: number of threads for each stage (e.g., {1, 4, 2} = 1
// stage-0 worker, 4 stage-1 workers, 2 stage-2 workers)
ThreadPipeline(int lgSlotCount, const std::vector<int> &threadsPerStage)
// Template parameters specify pipeline topology (e.g., <Item, Never, 1, 4,
// 2>) Note: Producer threads are external to the pipeline and not counted in
// ThreadsPerStage
explicit StaticThreadPipeline(int lgSlotCount)
: slot_count(1 << lgSlotCount), slot_count_mask(slot_count - 1),
threadState(threadsPerStage.size()), ring(slot_count) {
ring(slot_count) {
// Otherwise we can't tell the difference between full and empty.
assert(!(slot_count_mask & 0x80000000));
for (size_t i = 0; i < threadsPerStage.size(); ++i) {
threadState[i] = std::vector<ThreadState>(threadsPerStage[i]);
for (auto &t : threadState[i]) {
if (i == 0) {
t.last_push_read = std::vector<uint32_t>(1);
} else {
t.last_push_read = std::vector<uint32_t>(threadsPerStage[i - 1]);
}
}
}
initialize_all_threads();
}
ThreadPipeline(ThreadPipeline const &) = delete;
ThreadPipeline &operator=(ThreadPipeline const &) = delete;
ThreadPipeline(ThreadPipeline &&) = delete;
ThreadPipeline &operator=(ThreadPipeline &&) = delete;
StaticThreadPipeline(StaticThreadPipeline const &) = delete;
StaticThreadPipeline &operator=(StaticThreadPipeline const &) = delete;
StaticThreadPipeline(StaticThreadPipeline &&) = delete;
StaticThreadPipeline &operator=(StaticThreadPipeline &&) = delete;
struct Batch {
Batch() : ring(), begin_(), end_() {}
struct Iterator {
@@ -125,9 +346,6 @@ template <class T> struct ThreadPipeline {
return static_cast<difference_type>(index_) -
static_cast<difference_type>(rhs.index_);
}
reference operator[](difference_type n) const {
return (*ring)[(index_ + n) & (ring->size() - 1)];
}
friend Iterator operator+(difference_type n, const Iterator &iter) {
return iter + n;
}
@@ -141,7 +359,6 @@ template <class T> struct ThreadPipeline {
}
friend bool operator<(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring);
// Handle potential uint32_t wraparound by using signed difference
return static_cast<int32_t>(lhs.index_ - rhs.index_) < 0;
}
friend bool operator<=(const Iterator &lhs, const Iterator &rhs) {
@@ -157,9 +374,6 @@ template <class T> struct ThreadPipeline {
return static_cast<int32_t>(lhs.index_ - rhs.index_) >= 0;
}
/// Returns the ring buffer index (0 to ring->size()-1) for this iterator
/// position. Useful for distributing work across multiple threads by
/// using modulo operations.
uint32_t index() const { return index_ & (ring->size() - 1); }
private:
@@ -175,9 +389,12 @@ template <class T> struct ThreadPipeline {
[[nodiscard]] size_t size() const { return end_ - begin_; }
[[nodiscard]] bool empty() const { return end_ == begin_; }
T &operator[](uint32_t n) {
return (*ring)[(begin_ + n) & (ring->size() - 1)];
}
private:
friend struct ThreadPipeline<T>;
friend struct StaticThreadPipeline;
Batch(std::vector<T> *const ring, uint32_t begin_, uint32_t end_)
: ring(ring), begin_(begin_), end_(end_) {}
std::vector<T> *const ring;
@@ -185,97 +402,90 @@ template <class T> struct ThreadPipeline {
uint32_t end_;
};
// Static thread storage - fixed size array
std::array<ThreadState, Topology::total_threads> all_threads;
private:
Batch acquireHelper(int stage, int thread, uint32_t maxBatch, bool mayBlock) {
uint32_t begin = threadState[stage][thread].local_pops & slot_count_mask;
uint32_t len = getSafeLen(stage, thread, mayBlock);
alignas(128) std::atomic<uint32_t> slots{0};
alignas(128) std::atomic<uint32_t> pushes{0};
const uint32_t slot_count;
const uint32_t slot_count_mask;
std::vector<T> ring;
void initialize_all_threads() {
[&]<std::size_t... StageIndices>(std::index_sequence<StageIndices...>) {
(init_stage_threads<StageIndices>(), ...);
}(std::make_index_sequence<Topology::num_stages>{});
}
template <int Stage> void init_stage_threads() {
constexpr int stage_offset = Topology::template stage_offset<Stage>();
constexpr int stage_thread_count = Topology::threads_per_stage[Stage];
constexpr int prev_stage_threads =
Topology::template prev_stage_thread_count<Stage>();
constexpr bool is_last_stage = (Stage == Topology::num_stages - 1);
for (int thread = 0; thread < stage_thread_count; ++thread) {
auto &thread_state = all_threads[stage_offset + thread];
thread_state.last_stage = is_last_stage;
thread_state.last_push_read = std::vector<uint32_t>(prev_stage_threads);
}
}
template <int Stage, int Thread>
Batch acquire_helper(uint32_t maxBatch, bool mayBlock) {
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
auto &thread_state = all_threads[thread_idx];
uint32_t begin = thread_state.local_pops & slot_count_mask;
uint32_t len = StaticPipelineAlgorithms::calculate_safe_len<
wait_strategy, Topology, Stage, Thread>(all_threads, pushes, mayBlock);
if (maxBatch != 0) {
len = std::min(len, maxBatch);
}
if (len == 0) {
return Batch{};
}
auto result = Batch{&ring, begin, begin + len};
threadState[stage][thread].local_pops += len;
thread_state.local_pops += len;
return result;
}
// Used by producer threads to reserve slots in the ring buffer
alignas(128) std::atomic<uint32_t> slots{0};
// Used for producers to publish
alignas(128) std::atomic<uint32_t> pushes{0};
const uint32_t slot_count;
const uint32_t slot_count_mask;
// We can safely acquire this many items
uint32_t getSafeLen(int stage, int threadIndex, bool mayBlock) {
uint32_t safeLen = UINT32_MAX;
auto &thread = threadState[stage][threadIndex];
// See if we can determine that there are entries we can acquire entirely
// from state local to the thread
for (int i = 0; i < int(thread.last_push_read.size()); ++i) {
auto &lastPush = stage == 0 ? pushes : threadState[stage - 1][i].pops;
if (thread.last_push_read[i] == thread.local_pops) {
// Re-read lastPush with memory order and try again
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
if (thread.last_push_read[i] == thread.local_pops) {
if (!mayBlock) {
return 0;
}
// Wait for lastPush to change and try again
lastPush.wait(thread.last_push_read[i], std::memory_order_relaxed);
thread.last_push_read[i] = lastPush.load(std::memory_order_acquire);
}
}
safeLen = std::min(safeLen, thread.last_push_read[i] - thread.local_pops);
}
return safeLen;
}
struct ThreadState {
// Where this thread has published up to
alignas(128) std::atomic<uint32_t> pops{0};
// Where this thread will publish to the next time it publishes
uint32_t local_pops{0};
// Where the previous stage's threads have published up to last we checked
std::vector<uint32_t> last_push_read;
};
// threadState[i][j] is the state for thread j in stage i
std::vector<std::vector<ThreadState>> threadState;
// Shared ring buffer
std::vector<T> ring;
public:
struct StageGuard {
template <int Stage, int Thread> struct StageGuard {
Batch batch;
~StageGuard() {
if (ts != nullptr) {
// seq_cst so that the notify can't be ordered before the store
ts->pops.store(local_pops, std::memory_order_seq_cst);
ts->pops.notify_all();
if (!batch.empty()) {
StaticPipelineAlgorithms::update_thread_pops<wait_strategy, Topology,
Stage, Thread>(
pipeline->all_threads, local_pops);
}
}
StageGuard(StageGuard const &) = delete;
StageGuard &operator=(StageGuard const &) = delete;
StageGuard(StageGuard &&other)
StageGuard(StageGuard &&other) noexcept
: batch(other.batch), local_pops(other.local_pops),
ts(std::exchange(other.ts, nullptr)) {}
StageGuard &operator=(StageGuard &&other) {
pipeline(std::exchange(other.pipeline, nullptr)) {}
StageGuard &operator=(StageGuard &&other) noexcept {
batch = other.batch;
local_pops = other.local_pops;
ts = std::exchange(other.ts, nullptr);
pipeline = std::exchange(other.pipeline, nullptr);
return *this;
}
private:
friend struct StaticThreadPipeline;
uint32_t local_pops;
friend struct ThreadPipeline;
StageGuard(Batch batch, ThreadState *ts)
: batch(batch), local_pops(ts->local_pops),
ts(batch.empty() ? nullptr : ts) {}
ThreadState *ts;
StaticThreadPipeline *pipeline;
StageGuard(Batch batch, uint32_t local_pops, StaticThreadPipeline *pipeline)
: batch(batch), local_pops(local_pops),
pipeline(batch.empty() ? nullptr : pipeline) {}
};
struct ProducerGuard {
@@ -285,8 +495,6 @@ public:
if (tp == nullptr) {
return;
}
// Wait for earlier slots to finish being published, since publishing
// implies that all previous slots were also published.
for (;;) {
uint32_t p = tp->pushes.load(std::memory_order_acquire);
if (p == old_slot) {
@@ -294,36 +502,42 @@ public:
}
tp->pushes.wait(p, std::memory_order_relaxed);
}
// Publish. seq_cst so that the notify can't be ordered before the store
tp->pushes.store(new_slot, std::memory_order_seq_cst);
// We have to notify every time, since we don't know if this is the last
// push ever
tp->pushes.notify_all();
}
private:
friend struct ThreadPipeline;
friend struct StaticThreadPipeline;
ProducerGuard() : batch(), tp() {}
ProducerGuard(Batch batch, ThreadPipeline<T> *tp, uint32_t old_slot,
ProducerGuard(Batch batch, StaticThreadPipeline *tp, uint32_t old_slot,
uint32_t new_slot)
: batch(batch), tp(tp), old_slot(old_slot), new_slot(new_slot) {}
ThreadPipeline<T> *const tp;
StaticThreadPipeline *const tp;
uint32_t old_slot;
uint32_t new_slot;
};
// Acquire a batch of items for processing by a consumer thread.
// stage: which processing stage (0 = first consumer stage after producers)
// thread: thread ID within the stage (0 to threadsPerStage[stage]-1)
// maxBatch: maximum items to acquire (0 = no limit)
// mayBlock: whether to block waiting for items (false = return empty batch if
// none available) Returns: StageGuard with batch of items to process
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
bool mayBlock = true) {
assert(stage < int(threadState.size()));
assert(thread < int(threadState[stage].size()));
auto batch = acquireHelper(stage, thread, maxBatch, mayBlock);
return StageGuard{std::move(batch), &threadState[stage][thread]};
// Stage: which processing stage (0 = first consumer stage after producers) -
// compile-time parameter Thread: thread ID within the stage (0 to
// ThreadsPerStage[Stage]-1) - compile-time parameter maxBatch: maximum items
// to acquire (0 = no limit) may_block: whether to block waiting for items
// (false = return empty batch if none available) Returns: StageGuard<Stage,
// Thread> with batch of items to process and compile-time type safety
template <int Stage, int Thread>
[[nodiscard]] StageGuard<Stage, Thread> acquire(int maxBatch = 0,
bool may_block = true) {
static_assert(Stage >= 0 && Stage < Topology::num_stages,
"Stage index out of bounds");
static_assert(Thread >= 0 && Thread < Topology::threads_per_stage[Stage],
"Thread index out of bounds");
auto batch = acquire_helper<Stage, Thread>(maxBatch, may_block);
constexpr int thread_idx = Topology::template thread_index<Stage, Thread>();
uint32_t local_pops = all_threads[thread_idx].local_pops;
return StageGuard<Stage, Thread>{std::move(batch), local_pops, this};
}
// Reserve slots in the ring buffer for a producer thread to fill with items.
@@ -349,24 +563,23 @@ public:
if (size > slot_count) {
std::abort();
}
// Reserve a slot to construct an item, but don't publish to consumer yet
uint32_t slot;
uint32_t begin;
for (;;) {
begin_loop:
slot = slots.load(std::memory_order_relaxed);
begin = slot & slot_count_mask;
// Make sure we won't stomp the back of the ring buffer
for (auto &thread : threadState.back()) {
uint32_t pops = thread.pops.load(std::memory_order_acquire);
if (slot + size - pops > slot_count) {
if (!block) {
return ProducerGuard{};
}
thread.pops.wait(pops, std::memory_order_relaxed);
goto begin_loop;
}
int capacity_result =
StaticPipelineAlgorithms::check_producer_capacity<Topology>(
all_threads, slot, size, slot_count, block);
if (capacity_result == 1) {
continue;
}
if (capacity_result == 2) {
return ProducerGuard{};
}
if (slots.compare_exchange_weak(slot, slot + size,
std::memory_order_relaxed,
std::memory_order_relaxed)) {

View File

@@ -119,6 +119,7 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
### Variables and Functions
- **snake_case** for all variables, functions, and member functions
- **Legacy camelCase exists** - the codebase currently contains mixed naming due to historical development. New code should use snake_case. Existing camelCase should be converted to snake_case during natural refactoring (not mass renaming).
```cpp
int64_t used_bytes() const;
void add_block(int64_t size);

View File

@@ -18,10 +18,12 @@ struct Message {
struct EchoHandler : public ConnectionHandler {
private:
ThreadPipeline<Message> &pipeline;
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1> &pipeline;
public:
explicit EchoHandler(ThreadPipeline<Message> &pipeline)
explicit EchoHandler(
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1>
&pipeline)
: pipeline(pipeline) {}
void on_data_arrived(std::string_view data,
@@ -42,11 +44,11 @@ TEST_CASE(
config.server.io_threads = 1;
config.server.epoll_instances = 1;
ThreadPipeline<Message> pipeline{10, {1}};
StaticThreadPipeline<Message, WaitStrategy::WaitIfStageEmpty, 1> pipeline{10};
EchoHandler handler{pipeline};
auto echoThread = std::thread{[&]() {
for (;;) {
auto guard = pipeline.acquire(0, 0);
auto guard = pipeline.acquire<0, 0>();
for (auto &message : guard.batch) {
bool done = message.done;
if (done) {

View File

@@ -2,60 +2,51 @@
## Summary
WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline, with 396ns serial CPU time per request. Higher serial CPU time means more CPU budget available for serial processing.
WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline with futex wake optimization, delivering 488ns serial CPU time per request while maintaining 0% CPU usage when idle. Higher serial CPU time means more CPU budget available for serial processing.
## Performance Metrics
### Throughput
- Non-blocking: 1.3M requests/second over unix socket
- Blocking: 1.1M requests/second over unix socket
- **1.3M requests/second** over unix socket
- 8 I/O threads with 8 epoll instances
- Load tester used 12 network threads
- Max latency: 4ms out of 90M requests
- **0% CPU usage when idle** (optimized futex wake implementation)
### Threading Architecture
- Two-stage pipeline: Stage-0 (noop) → Stage-1 (connection return)
- Lock-free coordination using atomic ring buffer
- **Optimized futex wake**: Only wake on final pipeline stage
- Each request "processed" serially on single thread
### Non-blocking vs Blocking Acquisition
### Performance Characteristics
**Non-blocking acquisition (`mayBlock=false`)**:
- Throughput: 1.3M requests/second (maintained with up to 1200 loop iterations)
- Stage-0 CPU: 100% (10% futex wake, 90% other)
- Serial CPU time per request: 396ns (1200 iterations, validated with nanobench)
- Theoretical maximum serial CPU time: 769ns (1,000,000,000ns ÷ 1,300,000 req/s)
- Serial efficiency: 51.5% (396ns ÷ 769ns)
- 100% CPU usage when idle
**Optimized Pipeline Mode**:
- **Throughput**: 1.3M requests/second
- **Serial CPU time per request**: 488ns (validated with nanobench)
- **Theoretical maximum serial CPU time**: 769ns (1,000,000,000ns ÷ 1,300,000 req/s)
- **Serial efficiency**: 63.4% (488ns ÷ 769ns)
- **CPU usage when idle**: 0%
**Blocking acquisition (`mayBlock=true`)**:
- Throughput: 1.1M requests/second (800 loop iterations)
- Stage-0 CPU: 100% total (18% sched_yield, 8% futex wait, 7% futex wake, 67% other)
- Serial CPU time per request: 266ns (800 iterations, validated with nanobench)
- Theoretical maximum serial CPU time: 909ns (1,000,000,000ns ÷ 1,100,000 req/s)
- Serial efficiency: 29.3% (266ns ÷ 909ns)
- 0% CPU usage when idle
### Key Optimization: Futex Wake Reduction
- **Previous approach**: Futex wake at every pipeline stage (10% CPU overhead)
- **Optimized approach**: Futex wake only at final stage to wake producers. Stages now do their futex wait on the beginning of the pipeline instead of the previous stage.
- **Result**: 23% increase in serial CPU budget (396ns → 488ns)
- **Benefits**: Higher throughput per CPU cycle + idle efficiency
### Request Flow
```
I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline
↑ ↓
| Stage 0: Noop thread
| (396ns serial CPU per request)
| (488ns serial CPU per request)
| ↓
| Stage 1: Connection return
| (optimized futex wake)
| ↓
└─────────────────────── Server::release_back_to_server()
```
### Pipeline Configuration
- Stage 0: 1 noop thread
- Stage 1: 2 worker threads for connection return
- Atomic counters with shared ring buffer
### Memory Management
- Transfer ownership of the connection along the pipeline
## Test Configuration
- Server: test_config.toml with 8 io_threads, 8 epoll_instances

166
tools/check_snake_case.py Executable file
View File

@@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
Pre-commit hook to enforce snake_case naming conventions in C++ code.
Only checks modified lines to avoid breaking existing camelCase code.
"""
import argparse
import re
import subprocess
import sys
from pathlib import Path
def get_modified_lines(filepath):
"""Get line numbers of modified lines using git diff."""
try:
# Get diff for staged changes (what's being committed)
result = subprocess.run(
["git", "diff", "--cached", "--unified=0", filepath],
capture_output=True,
text=True,
check=True,
)
modified_lines = set()
for line in result.stdout.split("\n"):
# Look for diff hunks like "@@ -10,3 +15,4 @@"
if line.startswith("@@"):
# Extract new line numbers from hunk header
match = re.search(r"\+(\d+)(?:,(\d+))?", line)
if match:
start = int(match.group(1))
count = int(match.group(2)) if match.group(2) else 1
modified_lines.update(range(start, start + count))
return modified_lines
except subprocess.CalledProcessError:
# If git diff fails, check all lines (fallback)
return None
def check_snake_case_violations(filepath, check_new_only=True):
"""Check for camelCase violations in C++ code."""
violations = []
# Get modified lines if checking new code only
modified_lines = get_modified_lines(filepath) if check_new_only else None
# Patterns that should use snake_case
patterns = [
# Function definitions and declarations
(r"\b([a-z]+[A-Z][a-zA-Z0-9_]*)\s*\(", "function"),
# Variable declarations (including member variables)
(
r"\b(?:auto|int|int64_t|int32_t|size_t|bool|char|float|double|std::\w+)\s+([a-z]+[A-Z][a-zA-Z0-9_]*)\b",
"variable",
),
# Member variable access (obj.camelCase)
(r"\.([a-z]+[A-Z][a-zA-Z0-9_]*)\b", "member_access"),
# Assignment to camelCase variables
(r"\b([a-z]+[A-Z][a-zA-Z0-9_]*)\s*=", "assignment"),
]
# Exclusions - things that should NOT be flagged
exclusions = [
# C++ standard library and common libraries
r"\b(std::|weaseljson|simdutf|doctest)",
# Template parameters and concepts
r"\b[A-Z][a-zA-Z0-9_]*\b",
# Class/struct names (PascalCase is correct)
r"\bstruct\s+[A-Z][a-zA-Z0-9_]*",
r"\bclass\s+[A-Z][a-zA-Z0-9_]*",
# Enum values (PascalCase is correct per style guide)
r"\b[A-Z][a-zA-Z0-9_]*::[A-Z][a-zA-Z0-9_]*",
# Common HTTP parser callback names (external API)
r"\b(onUrl|onHeaderField|onHeaderFieldComplete|onHeaderValue|onHeaderValueComplete|onHeadersComplete|onBody|onMessageComplete)\b",
# Known legacy APIs we can't easily change
r"\b(user_data|get_arena|append_message)\b",
]
try:
with open(filepath, "r", encoding="utf-8") as f:
lines = f.readlines()
except (IOError, UnicodeDecodeError):
return violations
for line_num, line in enumerate(lines, 1):
# Skip if we're only checking modified lines and this isn't one
if modified_lines is not None and line_num not in modified_lines:
continue
# Skip comments and strings
if re.search(r'^\s*(?://|/\*|\*|")', line.strip()):
continue
# Check if line should be excluded
if any(re.search(exclusion, line) for exclusion in exclusions):
continue
# Check for violations
for pattern, violation_type in patterns:
matches = re.finditer(pattern, line)
for match in matches:
camel_case_name = match.group(1)
# Convert to snake_case suggestion
snake_case = re.sub(
"([a-z0-9])([A-Z])", r"\1_\2", camel_case_name
).lower()
violations.append(
{
"file": filepath,
"line": line_num,
"column": match.start(1) + 1,
"type": violation_type,
"camelCase": camel_case_name,
"snake_case": snake_case,
"context": line.strip(),
}
)
return violations
def main():
parser = argparse.ArgumentParser(description="Check snake_case naming in C++ files")
parser.add_argument("files", nargs="*", help="Files to check")
parser.add_argument(
"--check-new-code-only",
action="store_true",
help="Only check modified lines (git diff)",
)
args = parser.parse_args()
if not args.files:
return 0
total_violations = 0
for filepath in args.files:
path = Path(filepath)
if not path.exists() or path.suffix not in [".hpp", ".cpp"]:
continue
violations = check_snake_case_violations(filepath, args.check_new_code_only)
if violations:
print(f"\n{filepath}:")
for v in violations:
print(
f" Line {v['line']}:{v['column']} - {v['type']} '{v['camelCase']}' should be '{v['snake_case']}'"
)
print(f" Context: {v['context']}")
total_violations += len(violations)
if total_violations > 0:
print(f"\n💡 Found {total_violations} naming violations.")
print(" New code should use snake_case per style.md")
print(" To convert: s/([a-z0-9])([A-Z])/\\1_\\2/g and lowercase")
return 1
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -38,6 +38,7 @@ class PersistencePattern(Enum):
@dataclass
class Transaction:
"""Represents a database transaction/commit"""
txn_id: int
arrival_time: float
size_bytes: int = 1024
@@ -47,6 +48,7 @@ class Transaction:
@dataclass
class PersistenceMetrics:
"""Metrics for a persistence approach"""
pattern: PersistencePattern
total_transactions: int
completed_transactions: int
@@ -81,9 +83,9 @@ class PersistenceMetrics:
class PersistenceSimulator(ABC):
"""Abstract base class for persistence pattern simulators"""
def __init__(self,
simulation_duration: float = 60.0,
arrival_rate_per_sec: float = 1000.0):
def __init__(
self, simulation_duration: float = 60.0, arrival_rate_per_sec: float = 1000.0
):
self.simulation_duration = simulation_duration
self.arrival_rate_per_sec = arrival_rate_per_sec
@@ -115,7 +117,7 @@ class PersistenceSimulator(ABC):
txn_id=self.next_txn_id,
arrival_time=arrival_time,
size_bytes=size,
requires_durability=True
requires_durability=True,
)
@abstractmethod
@@ -138,9 +140,9 @@ class PersistenceSimulator(ABC):
time, event_type, data = heapq.heappop(self.event_queue)
self.current_time = time
if event_type == 'transaction_arrival':
if event_type == "transaction_arrival":
self.process_transaction(data)
elif event_type == 'custom':
elif event_type == "custom":
self._handle_custom_event(data)
return self._calculate_metrics()
@@ -158,7 +160,7 @@ class PersistenceSimulator(ABC):
txn = self.generate_transaction(time)
self.next_txn_id += 1
heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn))
heapq.heappush(self.event_queue, (time, "transaction_arrival", txn))
def _handle_custom_event(self, data):
"""Handle custom events - override in subclasses"""
@@ -173,13 +175,12 @@ class PersistenceSimulator(ABC):
if not self.completed_transactions:
raise ValueError("No transactions completed")
latencies = [txn['latency_ms'] for txn in self.completed_transactions]
latencies = [txn["latency_ms"] for txn in self.completed_transactions]
return PersistenceMetrics(
pattern=self.get_pattern_name(),
total_transactions=self.next_txn_id,
completed_transactions=len(self.completed_transactions),
# Latency metrics
min_latency=min(latencies),
mean_latency=statistics.mean(latencies),
@@ -187,22 +188,22 @@ class PersistenceSimulator(ABC):
p95_latency=np.percentile(latencies, 95),
p99_latency=np.percentile(latencies, 99),
max_latency=max(latencies),
# Throughput metrics
avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration,
avg_throughput_tps=len(self.completed_transactions)
/ self.simulation_duration,
peak_throughput_tps=self._calculate_peak_throughput(),
# Resource metrics - implemented by subclasses
avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0,
avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0,
avg_network_mbps=(
statistics.mean(self.network_usage) if self.network_usage else 0
),
storage_efficiency=self._calculate_storage_efficiency(),
# Pattern-specific characteristics
durability_guarantee=self._get_durability_guarantee(),
consistency_model=self._get_consistency_model(),
recovery_time_estimate=self._get_recovery_time(),
operational_complexity=self._get_operational_complexity(),
infrastructure_cost=self._get_infrastructure_cost()
infrastructure_cost=self._get_infrastructure_cost(),
)
def _calculate_peak_throughput(self) -> float:
@@ -213,7 +214,7 @@ class PersistenceSimulator(ABC):
# Group transactions by second
throughput_by_second = defaultdict(int)
for txn in self.completed_transactions:
second = int(txn['completion_time'])
second = int(txn["completion_time"])
throughput_by_second[second] += 1
return max(throughput_by_second.values()) if throughput_by_second else 0
@@ -247,11 +248,13 @@ class PersistenceSimulator(ABC):
class WeaselDBSimulator(PersistenceSimulator):
"""WeaselDB's batched S3 persistence simulation"""
def __init__(self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs):
def __init__(
self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs,
):
super().__init__(**kwargs)
self.batch_timeout_ms = batch_timeout_ms
@@ -280,7 +283,10 @@ class WeaselDBSimulator(PersistenceSimulator):
self.current_batch.append(txn)
# Check if we should send batch
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
if (
self._should_send_batch()
and len(self.in_flight_batches) < self.max_in_flight
):
self._send_current_batch()
def _should_send_batch(self) -> bool:
@@ -294,7 +300,9 @@ class WeaselDBSimulator(PersistenceSimulator):
return True
# Time trigger
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
self.batch_timeout_ms / 1000.0
):
return True
return False
@@ -315,14 +323,16 @@ class WeaselDBSimulator(PersistenceSimulator):
# Track batch
self.in_flight_batches[batch_id] = {
'transactions': self.current_batch.copy(),
'sent_time': self.current_time,
'completion_time': completion_time,
'size_bytes': batch_size
"transactions": self.current_batch.copy(),
"sent_time": self.current_time,
"completion_time": completion_time,
"size_bytes": batch_size,
}
# Schedule completion
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
self.schedule_event(
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
)
# Track network usage
self.network_usage.append(batch_size / (1024 * 1024)) # MB
@@ -340,21 +350,23 @@ class WeaselDBSimulator(PersistenceSimulator):
def _handle_custom_event(self, data):
"""Handle batch completion events"""
if data['type'] == 'batch_complete':
batch_id = data['batch_id']
if data["type"] == "batch_complete":
batch_id = data["batch_id"]
if batch_id in self.in_flight_batches:
batch = self.in_flight_batches.pop(batch_id)
# Mark transactions complete
for txn in batch['transactions']:
for txn in batch["transactions"]:
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
self.completed_transactions.append(
{
"txn_id": txn.txn_id,
"arrival_time": txn.arrival_time,
"completion_time": self.current_time,
"latency_ms": latency_ms,
"size_bytes": txn.size_bytes,
}
)
def _calculate_storage_efficiency(self) -> float:
return 1.0 # S3 has no storage overhead
@@ -378,10 +390,12 @@ class WeaselDBSimulator(PersistenceSimulator):
class TraditionalWALSimulator(PersistenceSimulator):
"""Traditional Write-Ahead Log with periodic sync"""
def __init__(self,
wal_sync_interval_ms: float = 10.0,
checkpoint_interval_sec: float = 30.0,
**kwargs):
def __init__(
self,
wal_sync_interval_ms: float = 10.0,
checkpoint_interval_sec: float = 30.0,
**kwargs,
):
super().__init__(**kwargs)
self.wal_sync_interval_ms = wal_sync_interval_ms
@@ -415,12 +429,12 @@ class TraditionalWALSimulator(PersistenceSimulator):
"""Schedule periodic WAL sync events"""
sync_time = self.wal_sync_interval_ms / 1000.0
while sync_time < self.simulation_duration:
self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'})
self.schedule_event(sync_time, "custom", {"type": "wal_sync"})
sync_time += self.wal_sync_interval_ms / 1000.0
def _handle_custom_event(self, data):
"""Handle WAL sync events"""
if data['type'] == 'wal_sync':
if data["type"] == "wal_sync":
self._perform_wal_sync()
def _perform_wal_sync(self):
@@ -436,10 +450,11 @@ class TraditionalWALSimulator(PersistenceSimulator):
# Schedule sync completion
syncing_txns = list(self.wal_buffer)
self.schedule_event(completion_time, 'custom', {
'type': 'sync_complete',
'transactions': syncing_txns
})
self.schedule_event(
completion_time,
"custom",
{"type": "sync_complete", "transactions": syncing_txns},
)
# Track IOPS for sync operation
self.disk_iops.append(len(self.wal_buffer))
@@ -474,22 +489,24 @@ class TraditionalWALSimulator(PersistenceSimulator):
def _handle_custom_event(self, data):
"""Handle sync completion"""
if data['type'] == 'wal_sync':
if data["type"] == "wal_sync":
self._perform_wal_sync()
elif data['type'] == 'sync_complete':
elif data["type"] == "sync_complete":
# Mark transactions as durable
for txn in data['transactions']:
for txn in data["transactions"]:
if txn.txn_id in self.pending_transactions:
del self.pending_transactions[txn.txn_id]
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
self.completed_transactions.append(
{
"txn_id": txn.txn_id,
"arrival_time": txn.arrival_time,
"completion_time": self.current_time,
"latency_ms": latency_ms,
"size_bytes": txn.size_bytes,
}
)
def _calculate_storage_efficiency(self) -> float:
return 2.0 # WAL + main storage
@@ -527,10 +544,11 @@ class SynchronousSimulator(PersistenceSimulator):
completion_time = self.current_time + disk_latency
# Schedule completion
self.schedule_event(completion_time, 'custom', {
'type': 'disk_write_complete',
'transaction': txn
})
self.schedule_event(
completion_time,
"custom",
{"type": "disk_write_complete", "transaction": txn},
)
# Track disk IOPS
self.disk_iops.append(1.0)
@@ -559,17 +577,19 @@ class SynchronousSimulator(PersistenceSimulator):
def _handle_custom_event(self, data):
"""Handle disk write completion"""
if data['type'] == 'disk_write_complete':
txn = data['transaction']
if data["type"] == "disk_write_complete":
txn = data["transaction"]
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
self.completed_transactions.append(
{
"txn_id": txn.txn_id,
"arrival_time": txn.arrival_time,
"completion_time": self.current_time,
"latency_ms": latency_ms,
"size_bytes": txn.size_bytes,
}
)
def _calculate_storage_efficiency(self) -> float:
return 1.5 # Some overhead for metadata
@@ -593,11 +613,13 @@ class SynchronousSimulator(PersistenceSimulator):
class WeaselDBDiskSimulator(PersistenceSimulator):
"""WeaselDB's batched disk persistence simulation"""
def __init__(self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs):
def __init__(
self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs,
):
super().__init__(**kwargs)
self.batch_timeout_ms = batch_timeout_ms
@@ -626,7 +648,10 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
self.current_batch.append(txn)
# Check if we should send batch
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
if (
self._should_send_batch()
and len(self.in_flight_batches) < self.max_in_flight
):
self._send_current_batch()
def _should_send_batch(self) -> bool:
@@ -640,7 +665,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
return True
# Time trigger
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
self.batch_timeout_ms / 1000.0
):
return True
return False
@@ -661,14 +688,16 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
# Track batch
self.in_flight_batches[batch_id] = {
'transactions': self.current_batch.copy(),
'sent_time': self.current_time,
'completion_time': completion_time,
'size_bytes': batch_size
"transactions": self.current_batch.copy(),
"sent_time": self.current_time,
"completion_time": completion_time,
"size_bytes": batch_size,
}
# Schedule completion
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
self.schedule_event(
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
)
# Track disk IOPS (one write operation per batch)
self.disk_iops.append(1.0)
@@ -684,7 +713,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
# Throughput-based latency for data transfer to page cache
size_mb = batch_size_bytes / (1024 * 1024)
transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms
transfer_latency = (
size_mb / self.disk_throughput_mbps
) * 1000.0 # Convert to ms
# FSYNC latency for EBS - forces write to replicated storage
# EBS has higher fsync latency due to network replication
@@ -709,21 +740,23 @@ class WeaselDBDiskSimulator(PersistenceSimulator):
def _handle_custom_event(self, data):
"""Handle batch completion events"""
if data['type'] == 'batch_complete':
batch_id = data['batch_id']
if data["type"] == "batch_complete":
batch_id = data["batch_id"]
if batch_id in self.in_flight_batches:
batch = self.in_flight_batches.pop(batch_id)
# Mark transactions complete
for txn in batch['transactions']:
for txn in batch["transactions"]:
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
self.completed_transactions.append(
{
"txn_id": txn.txn_id,
"arrival_time": txn.arrival_time,
"completion_time": self.current_time,
"latency_ms": latency_ms,
"size_bytes": txn.size_bytes,
}
)
def _calculate_storage_efficiency(self) -> float:
return 1.2 # Some overhead for batching metadata
@@ -750,8 +783,9 @@ class DatabaseComparisonFramework:
def __init__(self, simulation_duration: float = 30.0):
self.simulation_duration = simulation_duration
def run_comparison(self,
arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]:
def run_comparison(
self, arrival_rates: List[float] = [100, 500, 1000, 2000]
) -> Dict[str, List[PersistenceMetrics]]:
"""Run comparison across multiple arrival rates"""
results = defaultdict(list)
@@ -765,10 +799,10 @@ class DatabaseComparisonFramework:
batch_size_threshold=800000,
max_in_flight=50,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
arrival_rate_per_sec=rate,
)
weasel_s3_metrics = weasel_s3.run_simulation()
results['WeaselDB S3'].append(weasel_s3_metrics)
results["WeaselDB S3"].append(weasel_s3_metrics)
print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms")
# WeaselDB EBS (same batching, EBS storage)
@@ -777,38 +811,37 @@ class DatabaseComparisonFramework:
batch_size_threshold=800000,
max_in_flight=50,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
arrival_rate_per_sec=rate,
)
weasel_ebs_metrics = weasel_ebs.run_simulation()
results['WeaselDB EBS'].append(weasel_ebs_metrics)
results["WeaselDB EBS"].append(weasel_ebs_metrics)
print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms")
# Traditional WAL
wal = TraditionalWALSimulator(
wal_sync_interval_ms=10.0,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
arrival_rate_per_sec=rate,
)
wal_metrics = wal.run_simulation()
results['Traditional WAL'].append(wal_metrics)
results["Traditional WAL"].append(wal_metrics)
print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms")
# Synchronous
sync = SynchronousSimulator(
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate
)
sync_metrics = sync.run_simulation()
results['Synchronous'].append(sync_metrics)
results["Synchronous"].append(sync_metrics)
print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms")
return dict(results)
def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]):
"""Print comprehensive comparison report"""
print("\n" + "="*80)
print("\n" + "=" * 80)
print("DATABASE PERSISTENCE PATTERN COMPARISON")
print("="*80)
print("=" * 80)
# Get arrival rates for headers
arrival_rates = [100, 500, 1000, 2000]
@@ -843,14 +876,18 @@ class DatabaseComparisonFramework:
# Characteristics comparison
print(f"\nSYSTEM CHARACTERISTICS")
print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}")
print(
f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}"
)
print("-" * 85)
for pattern_name, metrics_list in results.items():
metrics = metrics_list[0] # Use first metrics for characteristics
print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
f"{metrics.infrastructure_cost:<6.1f}")
print(
f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
f"{metrics.infrastructure_cost:<6.1f}"
)
# Performance sweet spots
print(f"\nPERFORMANCE SWEET SPOTS")
@@ -858,49 +895,76 @@ class DatabaseComparisonFramework:
for rate in arrival_rates:
print(f"\nAt {rate} TPS:")
rate_results = [(name, metrics_list[arrival_rates.index(rate)])
for name, metrics_list in results.items()]
rate_results = [
(name, metrics_list[arrival_rates.index(rate)])
for name, metrics_list in results.items()
]
# Sort by P95 latency
rate_results.sort(key=lambda x: x[1].p95_latency)
for i, (name, metrics) in enumerate(rate_results):
rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
f"{metrics.avg_throughput_tps:.0f} TPS achieved")
rank_symbol = (
"🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
)
print(
f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
f"{metrics.avg_throughput_tps:.0f} TPS achieved"
)
def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]],
save_path: Optional[str] = None):
def plot_comparison_results(
self,
results: Dict[str, List[PersistenceMetrics]],
save_path: Optional[str] = None,
):
"""Plot comparison results"""
try:
arrival_rates = [100, 500, 1000, 2000]
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Database Persistence Pattern Comparison', fontsize=16)
fig.suptitle("Database Persistence Pattern Comparison", fontsize=16)
# Plot 1: P95 Latency vs Load
for pattern_name, metrics_list in results.items():
p95_latencies = [m.p95_latency for m in metrics_list]
ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name)
ax1.plot(
arrival_rates,
p95_latencies,
marker="o",
linewidth=2,
label=pattern_name,
)
ax1.set_xlabel('Arrival Rate (TPS)')
ax1.set_ylabel('P95 Latency (ms)')
ax1.set_title('P95 Latency vs Load')
ax1.set_xlabel("Arrival Rate (TPS)")
ax1.set_ylabel("P95 Latency (ms)")
ax1.set_title("P95 Latency vs Load")
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_yscale('log')
ax1.set_yscale("log")
# Plot 2: Throughput Achieved
for pattern_name, metrics_list in results.items():
throughputs = [m.avg_throughput_tps for m in metrics_list]
ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name)
ax2.plot(
arrival_rates,
throughputs,
marker="s",
linewidth=2,
label=pattern_name,
)
# Perfect throughput line
ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)')
ax2.plot(
arrival_rates,
arrival_rates,
"k--",
alpha=0.5,
label="Perfect (no loss)",
)
ax2.set_xlabel('Target Rate (TPS)')
ax2.set_ylabel('Achieved Throughput (TPS)')
ax2.set_title('Throughput: Target vs Achieved')
ax2.set_xlabel("Target Rate (TPS)")
ax2.set_ylabel("Achieved Throughput (TPS)")
ax2.set_title("Throughput: Target vs Achieved")
ax2.legend()
ax2.grid(True, alpha=0.3)
@@ -910,13 +974,21 @@ class DatabaseComparisonFramework:
metrics = metrics_list[rate_idx]
# Plot latency percentiles
percentiles = [50, 95, 99]
values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency]
ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values,
alpha=0.7, label=pattern_name)
values = [
metrics.median_latency,
metrics.p95_latency,
metrics.p99_latency,
]
ax3.bar(
[f"{pattern_name}\nP{p}" for p in percentiles],
values,
alpha=0.7,
label=pattern_name,
)
ax3.set_ylabel('Latency (ms)')
ax3.set_title('Latency Percentiles at 1000 TPS')
ax3.set_yscale('log')
ax3.set_ylabel("Latency (ms)")
ax3.set_title("Latency Percentiles at 1000 TPS")
ax3.set_yscale("log")
ax3.grid(True, alpha=0.3)
# Plot 4: Cost vs Performance
@@ -925,22 +997,24 @@ class DatabaseComparisonFramework:
p95s = [m.p95_latency for m in metrics_list]
# Use different markers for different patterns
markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'}
marker = markers.get(pattern_name, 'o')
markers = {"WeaselDB": "o", "Traditional WAL": "s", "Synchronous": "^"}
marker = markers.get(pattern_name, "o")
ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name)
ax4.scatter(
costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name
)
ax4.set_xlabel('Infrastructure Cost (relative)')
ax4.set_ylabel('P95 Latency (ms)')
ax4.set_title('Cost vs Performance Trade-off')
ax4.set_xlabel("Infrastructure Cost (relative)")
ax4.set_ylabel("P95 Latency (ms)")
ax4.set_title("Cost vs Performance Trade-off")
ax4.legend()
ax4.grid(True, alpha=0.3)
ax4.set_yscale('log')
ax4.set_yscale("log")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.savefig(save_path, dpi=300, bbox_inches="tight")
print(f"Comparison plots saved to {save_path}")
else:
plt.show()
@@ -961,10 +1035,10 @@ def main():
comparison.print_comparison_report(results)
try:
comparison.plot_comparison_results(results, 'database_comparison.png')
comparison.plot_comparison_results(results, "database_comparison.png")
except Exception as e:
print(f"Could not generate plots: {e}")
if __name__ == "__main__":
main()
main()

View File

@@ -25,6 +25,7 @@ try:
from skopt.utils import use_named_args
from skopt.plots import plot_convergence, plot_objective
import matplotlib.pyplot as plt
OPTIMIZE_AVAILABLE = True
except ImportError:
print("scikit-optimize not available. Install with: pip install scikit-optimize")
@@ -40,12 +41,14 @@ class PersistenceOptimizer:
by intelligently exploring the parameter space using Gaussian Process models.
"""
def __init__(self,
optimization_budget: int = 50,
simulation_duration: float = 20.0,
arrival_rate: float = 1000.0,
objective_metric: str = "p95_latency",
random_seed: int = 42):
def __init__(
self,
optimization_budget: int = 50,
simulation_duration: float = 20.0,
arrival_rate: float = 1000.0,
objective_metric: str = "p95_latency",
random_seed: int = 42,
):
self.optimization_budget = optimization_budget
self.simulation_duration = simulation_duration
@@ -56,7 +59,7 @@ class PersistenceOptimizer:
# Track optimization history
self.optimization_history = []
self.best_params = None
self.best_score = float('inf')
self.best_score = float("inf")
# Define parameter search space
self.parameter_space = self._define_search_space()
@@ -71,31 +74,35 @@ class PersistenceOptimizer:
"""
return [
# Core batching parameters
Real(1.0, 50.0, name='batch_timeout_ms',
prior='log-uniform'), # Log scale since small changes matter
Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB
prior='log-uniform'),
Real(
1.0, 50.0, name="batch_timeout_ms", prior="log-uniform"
), # Log scale since small changes matter
Integer(
64 * 1024,
4 * 1024 * 1024,
name="batch_size_threshold", # 64KB - 4MB
prior="log-uniform",
),
# Flow control parameters - likely the most impactful
Integer(1, 50, name='max_in_flight_requests'),
Integer(1, 50, name="max_in_flight_requests"),
]
def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict:
"""Run simulation with given parameters and return results"""
try:
sim = PersistenceSimulation(
batch_timeout_ms=params['batch_timeout_ms'],
batch_size_threshold=int(params['batch_size_threshold']),
max_in_flight_requests=int(params['max_in_flight_requests']),
batch_timeout_ms=params["batch_timeout_ms"],
batch_size_threshold=int(params["batch_size_threshold"]),
max_in_flight_requests=int(params["max_in_flight_requests"]),
# Retry parameters fixed since S3 is 100% reliable
max_retry_attempts=0, # No retries needed
retry_base_delay_ms=100.0, # Irrelevant but needs a value
max_retry_attempts=0, # No retries needed
retry_base_delay_ms=100.0, # Irrelevant but needs a value
# S3 parameters kept fixed - 100% reliable for optimization focus
s3_latency_shape=2.0, # Fixed Gamma shape
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
s3_failure_rate=0.0, # 100% reliable S3
s3_latency_shape=2.0, # Fixed Gamma shape
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
s3_failure_rate=0.0, # 100% reliable S3
arrival_rate_per_sec=self.arrival_rate,
simulation_duration_sec=self.simulation_duration
simulation_duration_sec=self.simulation_duration,
)
return sim.run_simulation()
@@ -104,34 +111,32 @@ class PersistenceOptimizer:
print(f"Simulation failed with params {params}: {e}")
# Return a high penalty score for failed simulations
return {
'commit_metrics': {
'latency_ms': {
'mean': 10000,
'p95': 10000,
'p99': 10000
}
"commit_metrics": {
"latency_ms": {"mean": 10000, "p95": 10000, "p99": 10000}
},
'error': str(e)
"error": str(e),
}
def _extract_objective_value(self, results: Dict) -> float:
"""Extract the objective value to minimize from simulation results"""
try:
commit_metrics = results['commit_metrics']['latency_ms']
commit_metrics = results["commit_metrics"]["latency_ms"]
if self.objective_metric == "mean_latency":
return commit_metrics['mean']
return commit_metrics["mean"]
elif self.objective_metric == "p95_latency":
return commit_metrics['p95']
return commit_metrics["p95"]
elif self.objective_metric == "p99_latency":
return commit_metrics['p99']
return commit_metrics["p99"]
elif self.objective_metric == "weighted_latency":
# Weighted combination emphasizing tail latencies
return (0.3 * commit_metrics['mean'] +
0.5 * commit_metrics['p95'] +
0.2 * commit_metrics['p99'])
return (
0.3 * commit_metrics["mean"]
+ 0.5 * commit_metrics["p95"]
+ 0.2 * commit_metrics["p99"]
)
else:
return commit_metrics['p95'] # Default to P95
return commit_metrics["p95"] # Default to P95
except KeyError as e:
print(f"Failed to extract objective from results: {e}")
@@ -147,7 +152,9 @@ class PersistenceOptimizer:
if not OPTIMIZE_AVAILABLE:
return self.optimize_with_grid_search()
print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations")
print(
f"Starting Bayesian Optimization with {self.optimization_budget} evaluations"
)
print(f"Objective: Minimize {self.objective_metric}")
print(f"Parameter space: {len(self.parameter_space)} dimensions")
print()
@@ -165,11 +172,11 @@ class PersistenceOptimizer:
# Track optimization history
history_entry = {
'params': params.copy(),
'objective_value': objective_value,
'results': results,
'eval_time': eval_time,
'iteration': len(self.optimization_history) + 1
"params": params.copy(),
"objective_value": objective_value,
"results": results,
"eval_time": eval_time,
"iteration": len(self.optimization_history) + 1,
}
self.optimization_history.append(history_entry)
@@ -177,7 +184,9 @@ class PersistenceOptimizer:
if objective_value < self.best_score:
self.best_score = objective_value
self.best_params = params.copy()
print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})")
print(
f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})"
)
else:
print(f" Score: {objective_value:.2f}ms")
@@ -192,8 +201,8 @@ class PersistenceOptimizer:
dimensions=self.parameter_space,
n_calls=self.optimization_budget,
n_initial_points=10, # Random exploration first
acq_func='EI', # Expected Improvement acquisition
random_state=self.random_seed
acq_func="EI", # Expected Improvement acquisition
random_state=self.random_seed,
)
# Extract best parameters
@@ -205,32 +214,34 @@ class PersistenceOptimizer:
def optimize_with_grid_search(self) -> Tuple[Dict, float]:
"""Fallback grid search optimization if scikit-optimize not available"""
print("Using grid search optimization (install scikit-optimize for better results)")
print(
"Using grid search optimization (install scikit-optimize for better results)"
)
print()
# Define a smaller grid for key parameters
grid_configs = [
# Vary max_in_flight and batch_timeout
{'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0},
{'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0},
{'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0},
{"max_in_flight_requests": 5, "batch_timeout_ms": 5.0},
{"max_in_flight_requests": 10, "batch_timeout_ms": 5.0},
{"max_in_flight_requests": 20, "batch_timeout_ms": 5.0},
{"max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
{"max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
{"max_in_flight_requests": 15, "batch_timeout_ms": 3.0},
{"max_in_flight_requests": 25, "batch_timeout_ms": 7.0},
]
best_params = None
best_score = float('inf')
best_score = float("inf")
for i, config in enumerate(grid_configs):
print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}")
# Use default values for unspecified parameters
full_params = {
'batch_timeout_ms': 5.0,
'batch_size_threshold': 1024 * 1024,
'max_in_flight_requests': 5
"batch_timeout_ms": 5.0,
"batch_size_threshold": 1024 * 1024,
"max_in_flight_requests": 5,
}
full_params.update(config)
@@ -261,8 +272,8 @@ class PersistenceOptimizer:
objectives = []
for entry in self.optimization_history:
objectives.append(entry['objective_value'])
for param_name, param_value in entry['params'].items():
objectives.append(entry["objective_value"])
for param_name, param_value in entry["params"].items():
if param_name not in param_data:
param_data[param_name] = []
param_data[param_name].append(param_value)
@@ -289,12 +300,12 @@ class PersistenceOptimizer:
if not OPTIMIZE_AVAILABLE or not self.optimization_history:
return
iterations = [entry['iteration'] for entry in self.optimization_history]
objectives = [entry['objective_value'] for entry in self.optimization_history]
iterations = [entry["iteration"] for entry in self.optimization_history]
objectives = [entry["objective_value"] for entry in self.optimization_history]
# Calculate running minimum (best so far)
running_min = []
current_min = float('inf')
current_min = float("inf")
for obj in objectives:
current_min = min(current_min, obj)
running_min.append(current_min)
@@ -304,34 +315,38 @@ class PersistenceOptimizer:
# Plot 1: Objective value over iterations
plt.subplot(2, 2, 1)
plt.scatter(iterations, objectives, alpha=0.6, s=30)
plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far')
plt.xlabel('Iteration')
plt.ylabel(f'{self.objective_metric} (ms)')
plt.title('Optimization Progress')
plt.plot(iterations, running_min, "r-", linewidth=2, label="Best so far")
plt.xlabel("Iteration")
plt.ylabel(f"{self.objective_metric} (ms)")
plt.title("Optimization Progress")
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 2: Parameter evolution for key parameters
plt.subplot(2, 2, 2)
key_params = ['max_in_flight_requests', 'batch_timeout_ms']
key_params = ["max_in_flight_requests", "batch_timeout_ms"]
for param in key_params:
if param in self.optimization_history[0]['params']:
values = [entry['params'][param] for entry in self.optimization_history]
if param in self.optimization_history[0]["params"]:
values = [entry["params"][param] for entry in self.optimization_history]
plt.scatter(iterations, values, alpha=0.6, label=param, s=30)
plt.xlabel('Iteration')
plt.ylabel('Parameter Value')
plt.title('Key Parameter Evolution')
plt.xlabel("Iteration")
plt.ylabel("Parameter Value")
plt.title("Key Parameter Evolution")
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 3: Objective distribution
plt.subplot(2, 2, 3)
plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black')
plt.axvline(self.best_score, color='red', linestyle='--',
label=f'Best: {self.best_score:.1f}ms')
plt.xlabel(f'{self.objective_metric} (ms)')
plt.ylabel('Count')
plt.title('Objective Value Distribution')
plt.hist(objectives, bins=20, alpha=0.7, edgecolor="black")
plt.axvline(
self.best_score,
color="red",
linestyle="--",
label=f"Best: {self.best_score:.1f}ms",
)
plt.xlabel(f"{self.objective_metric} (ms)")
plt.ylabel("Count")
plt.title("Objective Value Distribution")
plt.legend()
plt.grid(True, alpha=0.3)
@@ -342,21 +357,21 @@ class PersistenceOptimizer:
if i == 0:
improvements.append(0)
else:
prev_best = running_min[i-1]
prev_best = running_min[i - 1]
curr_best = running_min[i]
improvement = prev_best - curr_best
improvements.append(improvement)
plt.plot(iterations, improvements, 'g-', marker='o', markersize=3)
plt.xlabel('Iteration')
plt.ylabel('Improvement (ms)')
plt.title('Per-Iteration Improvement')
plt.plot(iterations, improvements, "g-", marker="o", markersize=3)
plt.xlabel("Iteration")
plt.ylabel("Improvement (ms)")
plt.title("Per-Iteration Improvement")
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.savefig(save_path, dpi=300, bbox_inches="tight")
print(f"Optimization plots saved to {save_path}")
else:
plt.show()
@@ -379,12 +394,12 @@ class PersistenceOptimizer:
# Prepare optimization summary
optimization_summary = {
'best_parameters': best_params,
'best_objective_value': best_score,
'optimization_time': total_time,
'evaluations_performed': len(self.optimization_history),
'final_simulation_results': final_results,
'optimization_history': self.optimization_history
"best_parameters": best_params,
"best_objective_value": best_score,
"optimization_time": total_time,
"evaluations_performed": len(self.optimization_history),
"final_simulation_results": final_results,
"optimization_history": self.optimization_history,
}
return optimization_summary
@@ -402,9 +417,9 @@ class PersistenceOptimizer:
print("OPTIMAL PARAMETERS:")
print("-" * 40)
for param, value in summary['best_parameters'].items():
for param, value in summary["best_parameters"].items():
if isinstance(value, float):
if param.endswith('_rate'):
if param.endswith("_rate"):
print(f" {param:<25}: {value:.4f}")
else:
print(f" {param:<25}: {value:.2f}")
@@ -413,7 +428,7 @@ class PersistenceOptimizer:
print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:")
print("-" * 50)
final_results = summary['final_simulation_results']
final_results = summary["final_simulation_results"]
print_results(final_results)
print("\nPARAMETER IMPACT ANALYSIS:")
@@ -440,7 +455,7 @@ def main():
simulation_duration=15.0, # Shorter sims for faster optimization
arrival_rate=1000.0,
objective_metric=objective,
random_seed=42
random_seed=42,
)
# Run optimization
@@ -449,13 +464,13 @@ def main():
# Generate plots
try:
optimizer.plot_optimization_progress(f'optimization_{objective}.png')
optimizer.plot_optimization_progress(f"optimization_{objective}.png")
except Exception as e:
print(f"Could not generate plots: {e}")
print(f"\nOptimization for {objective} completed!")
print("="*80)
print("=" * 80)
if __name__ == "__main__":
main()
main()

View File

@@ -27,6 +27,7 @@ import time
@dataclass
class Commit:
"""Represents a single commit request"""
commit_id: int
arrival_time: float
size_bytes: int = 1024 # Default 1KB per commit
@@ -35,6 +36,7 @@ class Commit:
@dataclass
class Batch:
"""Represents a batch of commits being processed"""
batch_id: int
commits: List[Commit]
created_time: float
@@ -48,6 +50,7 @@ class Batch:
@dataclass
class InFlightRequest:
"""Tracks an in-flight S3 request"""
batch: Batch
start_time: float
expected_completion: float
@@ -66,24 +69,23 @@ class PersistenceSimulation:
- Shape parameter controls the heaviness of the tail
"""
def __init__(self,
# Configuration from persistence.md defaults
batch_timeout_ms: float = 5.0,
batch_size_threshold: int = 1024 * 1024, # 1MB
max_in_flight_requests: int = 5,
max_retry_attempts: int = 3,
retry_base_delay_ms: float = 100.0,
# S3 latency modeling (Gamma distribution parameters)
s3_latency_shape: float = 2.0, # Shape parameter (α)
s3_latency_scale: float = 25.0, # Scale parameter (β)
s3_failure_rate: float = 0.01, # 1% failure rate
# Arrival rate modeling
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
# Simulation parameters
simulation_duration_sec: float = 60.0):
def __init__(
self,
# Configuration from persistence.md defaults
batch_timeout_ms: float = 5.0,
batch_size_threshold: int = 1024 * 1024, # 1MB
max_in_flight_requests: int = 5,
max_retry_attempts: int = 3,
retry_base_delay_ms: float = 100.0,
# S3 latency modeling (Gamma distribution parameters)
s3_latency_shape: float = 2.0, # Shape parameter (α)
s3_latency_scale: float = 25.0, # Scale parameter (β)
s3_failure_rate: float = 0.01, # 1% failure rate
# Arrival rate modeling
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
# Simulation parameters
simulation_duration_sec: float = 60.0,
):
# Configuration
self.batch_timeout_ms = batch_timeout_ms
@@ -142,7 +144,9 @@ class PersistenceSimulation:
min_rtt = 30.0 # 30ms minimum round-trip time
# Variable latency component from Gamma distribution
variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale)
variable_latency = self.np_rng.gamma(
self.s3_latency_shape, self.s3_latency_scale
)
# Size-dependent scaling: +20ms per MB
size_mb = batch_size_bytes / (1024 * 1024)
@@ -191,7 +195,9 @@ class PersistenceSimulation:
# Time trigger - only if we have a valid batch start time
if self.batch_start_time is not None:
if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
if (self.current_time - self.batch_start_time) >= (
self.batch_timeout_ms / 1000.0
):
return True
return False
@@ -209,7 +215,7 @@ class PersistenceSimulation:
batch = Batch(
batch_id=self.next_batch_id,
commits=self.current_batch.copy(),
created_time=self.current_time
created_time=self.current_time,
)
self.next_batch_id += 1
@@ -223,11 +229,13 @@ class PersistenceSimulation:
"""Send batch to S3 and track as in-flight request"""
if not self.can_start_new_request():
# This shouldn't happen due to flow control, but handle gracefully
self.schedule_event(self.current_time + 0.001, 'retry_batch', batch)
self.schedule_event(self.current_time + 0.001, "retry_batch", batch)
return
# Sample S3 response characteristics (pass batch size for latency modeling)
s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds
s3_latency = (
self.sample_s3_latency(batch.size_bytes) / 1000.0
) # Convert ms to seconds
will_fail = self.rng.random() < self.s3_failure_rate
if will_fail:
@@ -242,26 +250,28 @@ class PersistenceSimulation:
batch=batch,
start_time=self.current_time,
expected_completion=completion_time,
connection_id=connection_id
connection_id=connection_id,
)
self.in_flight_requests[connection_id] = in_flight
# Schedule completion event
if will_fail:
self.schedule_event(completion_time, 'batch_failed', connection_id)
self.schedule_event(completion_time, "batch_failed", connection_id)
else:
self.schedule_event(completion_time, 'batch_completed', connection_id)
self.schedule_event(completion_time, "batch_completed", connection_id)
# Log event for analysis
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_sent',
'batch_id': batch.batch_id,
'batch_size': batch.size_bytes,
'commit_count': len(batch.commits),
'retry_count': batch.retry_count,
'is_retry': is_retry
})
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_sent",
"batch_id": batch.batch_id,
"batch_size": batch.size_bytes,
"commit_count": len(batch.commits),
"retry_count": batch.retry_count,
"is_retry": is_retry,
}
)
def handle_batch_completed(self, connection_id: int):
"""Handle successful batch completion"""
@@ -273,33 +283,39 @@ class PersistenceSimulation:
# Calculate metrics
batch_latency = self.current_time - batch.created_time
self.batch_metrics.append({
'batch_id': batch.batch_id,
'latency': batch_latency,
'size_bytes': batch.size_bytes,
'commit_count': len(batch.commits),
'retry_count': batch.retry_count
})
self.batch_metrics.append(
{
"batch_id": batch.batch_id,
"latency": batch_latency,
"size_bytes": batch.size_bytes,
"commit_count": len(batch.commits),
"retry_count": batch.retry_count,
}
)
# Mark commits as completed and calculate end-to-end latency
for commit in batch.commits:
commit_latency = self.current_time - commit.arrival_time
self.completed_commits.append({
'commit_id': commit.commit_id,
'arrival_time': commit.arrival_time,
'completion_time': self.current_time,
'latency': commit_latency,
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
self.completed_commits.append(
{
"commit_id": commit.commit_id,
"arrival_time": commit.arrival_time,
"completion_time": self.current_time,
"latency": commit_latency,
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_completed',
'batch_id': batch.batch_id,
'latency': batch_latency,
'retry_count': batch.retry_count
})
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_completed",
"batch_id": batch.batch_id,
"latency": batch_latency,
"retry_count": batch.retry_count,
}
)
# Try to process any pending work now that we have capacity
self.process_pending_work()
@@ -312,29 +328,35 @@ class PersistenceSimulation:
in_flight = self.in_flight_requests.pop(connection_id)
batch = in_flight.batch
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_failed',
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_failed",
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
if batch.retry_count < self.max_retry_attempts:
# Exponential backoff retry
batch.retry_count += 1
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1))
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (
2 ** (batch.retry_count - 1)
)
retry_time = self.current_time + backoff_delay
self.schedule_event(retry_time, 'retry_batch', batch)
self.schedule_event(retry_time, "retry_batch", batch)
self.retry_counts[batch.retry_count] += 1
else:
# Max retries exhausted - this would be a fatal error in real system
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_abandoned',
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_abandoned",
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
def handle_retry_batch(self, batch: Batch):
"""Handle batch retry"""
@@ -374,7 +396,7 @@ class PersistenceSimulation:
# Schedule timeout event for current batch if it's the first commit
if len(self.current_batch) == 1 and not self.pending_commits:
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
self.schedule_event(timeout_time, 'batch_timeout', None)
self.schedule_event(timeout_time, "batch_timeout", None)
def handle_batch_timeout(self):
"""Handle batch timeout trigger"""
@@ -387,14 +409,19 @@ class PersistenceSimulation:
current_batch_count = len(self.current_batch)
in_flight_count = len(self.in_flight_requests)
self.queue_depth_samples.append({
'time': self.current_time,
'pending_commits': pending_count,
'current_batch_size': current_batch_count,
'in_flight_requests': in_flight_count,
'total_unacknowledged': pending_count + current_batch_count +
sum(len(req.batch.commits) for req in self.in_flight_requests.values())
})
self.queue_depth_samples.append(
{
"time": self.current_time,
"pending_commits": pending_count,
"current_batch_size": current_batch_count,
"in_flight_requests": in_flight_count,
"total_unacknowledged": pending_count
+ current_batch_count
+ sum(
len(req.batch.commits) for req in self.in_flight_requests.values()
),
}
)
def generate_arrivals(self):
"""Generate Poisson arrival events for the simulation"""
@@ -412,17 +439,17 @@ class PersistenceSimulation:
commit = Commit(
commit_id=self.next_commit_id,
arrival_time=current_time,
size_bytes=self.sample_commit_size() # Realistic size distribution
size_bytes=self.sample_commit_size(), # Realistic size distribution
)
self.next_commit_id += 1
# Schedule arrival event
self.schedule_event(current_time, 'commit_arrival', commit)
self.schedule_event(current_time, "commit_arrival", commit)
# Schedule periodic queue depth sampling
sample_time = 0.0
while sample_time < self.simulation_duration_sec:
self.schedule_event(sample_time, 'sample_queue_depth', None)
self.schedule_event(sample_time, "sample_queue_depth", None)
sample_time += 0.1 # Sample every 100ms
def run_simulation(self):
@@ -430,7 +457,9 @@ class PersistenceSimulation:
print(f"Starting persistence simulation...")
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
print(f"Duration: {self.simulation_duration_sec} seconds")
print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}")
print(
f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}"
)
print()
# Generate all arrival events
@@ -445,17 +474,17 @@ class PersistenceSimulation:
if time > self.simulation_duration_sec:
break
if event_type == 'commit_arrival':
if event_type == "commit_arrival":
self.handle_commit_arrival(data)
elif event_type == 'batch_completed':
elif event_type == "batch_completed":
self.handle_batch_completed(data)
elif event_type == 'batch_failed':
elif event_type == "batch_failed":
self.handle_batch_failed(data)
elif event_type == 'retry_batch':
elif event_type == "retry_batch":
self.handle_retry_batch(data)
elif event_type == 'batch_timeout':
elif event_type == "batch_timeout":
self.handle_batch_timeout()
elif event_type == 'sample_queue_depth':
elif event_type == "sample_queue_depth":
self.sample_queue_depth()
events_processed += 1
@@ -469,38 +498,46 @@ class PersistenceSimulation:
return {"error": "No commits completed during simulation"}
# Calculate latency statistics
latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms
latencies = [
c["latency"] * 1000 for c in self.completed_commits
] # Convert to ms
results = {
'simulation_config': {
'duration_sec': self.simulation_duration_sec,
'arrival_rate_per_sec': self.arrival_rate_per_sec,
'batch_timeout_ms': self.batch_timeout_ms,
'batch_size_threshold': self.batch_size_threshold,
'max_in_flight_requests': self.max_in_flight_requests,
's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
's3_failure_rate': self.s3_failure_rate
"simulation_config": {
"duration_sec": self.simulation_duration_sec,
"arrival_rate_per_sec": self.arrival_rate_per_sec,
"batch_timeout_ms": self.batch_timeout_ms,
"batch_size_threshold": self.batch_size_threshold,
"max_in_flight_requests": self.max_in_flight_requests,
"s3_latency_params": f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
"s3_failure_rate": self.s3_failure_rate,
},
'commit_metrics': {
'total_commits': len(self.completed_commits),
'latency_ms': {
'mean': statistics.mean(latencies),
'median': statistics.median(latencies),
'std': statistics.stdev(latencies) if len(latencies) > 1 else 0,
'min': min(latencies),
'max': max(latencies),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99)
}
"commit_metrics": {
"total_commits": len(self.completed_commits),
"latency_ms": {
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"min": min(latencies),
"max": max(latencies),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99),
},
},
'batch_metrics': {
'total_batches': len(self.batch_metrics),
'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]),
'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]),
'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics])
"batch_metrics": {
"total_batches": len(self.batch_metrics),
"avg_commits_per_batch": statistics.mean(
[b["commit_count"] for b in self.batch_metrics]
),
"avg_batch_size_bytes": statistics.mean(
[b["size_bytes"] for b in self.batch_metrics]
),
"avg_batch_latency_ms": statistics.mean(
[b["latency"] * 1000 for b in self.batch_metrics]
),
},
'retry_analysis': dict(self.retry_counts),
'queue_depth_analysis': self._analyze_queue_depths()
"retry_analysis": dict(self.retry_counts),
"queue_depth_analysis": self._analyze_queue_depths(),
}
return results
@@ -510,26 +547,26 @@ class PersistenceSimulation:
if not self.queue_depth_samples:
return {}
pending = [s['pending_commits'] for s in self.queue_depth_samples]
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
pending = [s["pending_commits"] for s in self.queue_depth_samples]
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
return {
'pending_commits': {
'mean': statistics.mean(pending),
'max': max(pending),
'p95': np.percentile(pending, 95)
"pending_commits": {
"mean": statistics.mean(pending),
"max": max(pending),
"p95": np.percentile(pending, 95),
},
'in_flight_requests': {
'mean': statistics.mean(in_flight),
'max': max(in_flight),
'p95': np.percentile(in_flight, 95)
"in_flight_requests": {
"mean": statistics.mean(in_flight),
"max": max(in_flight),
"p95": np.percentile(in_flight, 95),
},
"total_unacknowledged": {
"mean": statistics.mean(total_unack),
"max": max(total_unack),
"p95": np.percentile(total_unack, 95),
},
'total_unacknowledged': {
'mean': statistics.mean(total_unack),
'max': max(total_unack),
'p95': np.percentile(total_unack, 95)
}
}
def plot_results(self, results: Dict, save_path: Optional[str] = None):
@@ -539,57 +576,69 @@ class PersistenceSimulation:
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Persistence Thread Simulation Results', fontsize=16)
fig.suptitle("Persistence Thread Simulation Results", fontsize=16)
# Plot 1: Commit latency histogram
latencies_ms = [c['latency'] * 1000 for c in self.completed_commits]
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black')
ax1.set_xlabel('Commit Latency (ms)')
ax1.set_ylabel('Count')
ax1.set_title('Commit Latency Distribution')
ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red',
linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms")
ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange',
linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms")
latencies_ms = [c["latency"] * 1000 for c in self.completed_commits]
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor="black")
ax1.set_xlabel("Commit Latency (ms)")
ax1.set_ylabel("Count")
ax1.set_title("Commit Latency Distribution")
ax1.axvline(
results["commit_metrics"]["latency_ms"]["mean"],
color="red",
linestyle="--",
label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms",
)
ax1.axvline(
results["commit_metrics"]["latency_ms"]["p95"],
color="orange",
linestyle="--",
label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms",
)
ax1.legend()
# Plot 2: Timeline of commit completions
completion_times = [c['completion_time'] for c in self.completed_commits]
completion_latencies = [c['latency'] * 1000 for c in self.completed_commits]
completion_times = [c["completion_time"] for c in self.completed_commits]
completion_latencies = [c["latency"] * 1000 for c in self.completed_commits]
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
ax2.set_xlabel('Time (seconds)')
ax2.set_ylabel('Commit Latency (ms)')
ax2.set_title('Latency Over Time')
ax2.set_xlabel("Time (seconds)")
ax2.set_ylabel("Commit Latency (ms)")
ax2.set_title("Latency Over Time")
# Plot 3: Queue depth over time
if self.queue_depth_samples:
times = [s['time'] for s in self.queue_depth_samples]
pending = [s['pending_commits'] for s in self.queue_depth_samples]
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
times = [s["time"] for s in self.queue_depth_samples]
pending = [s["pending_commits"] for s in self.queue_depth_samples]
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
ax3.plot(times, pending, label='Pending Commits', alpha=0.8)
ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8)
ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8)
ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--',
label=f'Max In-Flight Limit ({self.max_in_flight_requests})')
ax3.set_xlabel('Time (seconds)')
ax3.set_ylabel('Count')
ax3.set_title('Queue Depths Over Time')
ax3.plot(times, pending, label="Pending Commits", alpha=0.8)
ax3.plot(times, in_flight, label="In-Flight Requests", alpha=0.8)
ax3.plot(times, total_unack, label="Total Unacknowledged", alpha=0.8)
ax3.axhline(
self.max_in_flight_requests,
color="red",
linestyle="--",
label=f"Max In-Flight Limit ({self.max_in_flight_requests})",
)
ax3.set_xlabel("Time (seconds)")
ax3.set_ylabel("Count")
ax3.set_title("Queue Depths Over Time")
ax3.legend()
# Plot 4: Batch size distribution
if self.batch_metrics:
batch_sizes = [b['commit_count'] for b in self.batch_metrics]
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black')
ax4.set_xlabel('Commits per Batch')
ax4.set_ylabel('Count')
ax4.set_title('Batch Size Distribution')
batch_sizes = [b["commit_count"] for b in self.batch_metrics]
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor="black")
ax4.set_xlabel("Commits per Batch")
ax4.set_ylabel("Count")
ax4.set_title("Batch Size Distribution")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
plt.savefig(save_path, dpi=300, bbox_inches="tight")
print(f"Plots saved to {save_path}")
else:
plt.show()
@@ -602,7 +651,7 @@ def print_results(results: Dict):
print("=" * 80)
# Configuration
config = results['simulation_config']
config = results["simulation_config"]
print(f"\nConfiguration:")
print(f" Duration: {config['duration_sec']}s")
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
@@ -613,8 +662,8 @@ def print_results(results: Dict):
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
# Commit metrics
commit_metrics = results['commit_metrics']
latency = commit_metrics['latency_ms']
commit_metrics = results["commit_metrics"]
latency = commit_metrics["latency_ms"]
print(f"\nCommit Performance:")
print(f" Total Commits: {commit_metrics['total_commits']:,}")
print(f" Latency Mean: {latency['mean']:.2f}ms")
@@ -625,7 +674,7 @@ def print_results(results: Dict):
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
# Batch metrics
batch_metrics = results['batch_metrics']
batch_metrics = results["batch_metrics"]
print(f"\nBatching Performance:")
print(f" Total Batches: {batch_metrics['total_batches']:,}")
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
@@ -633,24 +682,30 @@ def print_results(results: Dict):
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
# Retry analysis
if results['retry_analysis']:
if results["retry_analysis"]:
print(f"\nRetry Analysis:")
for retry_count, occurrences in results['retry_analysis'].items():
for retry_count, occurrences in results["retry_analysis"].items():
print(f" {occurrences:,} batches required {retry_count} retries")
# Queue depth analysis
if results['queue_depth_analysis']:
queue_analysis = results['queue_depth_analysis']
if results["queue_depth_analysis"]:
queue_analysis = results["queue_depth_analysis"]
print(f"\nQueue Depth Analysis:")
if 'pending_commits' in queue_analysis:
pending = queue_analysis['pending_commits']
print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}")
if 'in_flight_requests' in queue_analysis:
in_flight = queue_analysis['in_flight_requests']
print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}")
if 'total_unacknowledged' in queue_analysis:
total = queue_analysis['total_unacknowledged']
print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}")
if "pending_commits" in queue_analysis:
pending = queue_analysis["pending_commits"]
print(
f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}"
)
if "in_flight_requests" in queue_analysis:
in_flight = queue_analysis["in_flight_requests"]
print(
f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}"
)
if "total_unacknowledged" in queue_analysis:
total = queue_analysis["total_unacknowledged"]
print(
f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}"
)
if __name__ == "__main__":
@@ -664,8 +719,16 @@ if __name__ == "__main__":
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
{"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
{"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
{
"name": "Lower Timeout (max_in_flight=10, timeout=2ms)",
"max_in_flight_requests": 10,
"batch_timeout_ms": 2.0,
},
{
"name": "Higher Timeout (max_in_flight=10, timeout=10ms)",
"max_in_flight_requests": 10,
"batch_timeout_ms": 10.0,
},
]
results_comparison = []
@@ -682,7 +745,7 @@ if __name__ == "__main__":
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=config.get("max_in_flight_requests", 5),
batch_timeout_ms=config.get("batch_timeout_ms", 5.0)
batch_timeout_ms=config.get("batch_timeout_ms", 5.0),
)
results = sim.run_simulation()
@@ -690,9 +753,9 @@ if __name__ == "__main__":
results_comparison.append(results)
# Print key metrics for quick comparison
commit_metrics = results['commit_metrics']
batch_metrics = results['batch_metrics']
queue_metrics = results.get('queue_depth_analysis', {})
commit_metrics = results["commit_metrics"]
batch_metrics = results["batch_metrics"]
queue_metrics = results.get("queue_depth_analysis", {})
print(f"\nKey Metrics:")
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
@@ -701,8 +764,12 @@ if __name__ == "__main__":
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
if queue_metrics:
print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}")
print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}")
print(
f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}"
)
print(
f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}"
)
# Summary comparison
print(f"\n{'='*80}")
@@ -713,25 +780,33 @@ if __name__ == "__main__":
for result in results_comparison:
name = result["config_name"]
commit_metrics = result['commit_metrics']
queue_metrics = result.get('queue_depth_analysis', {})
mean_lat = commit_metrics['latency_ms']['mean']
p95_lat = commit_metrics['latency_ms']['p95']
p99_lat = commit_metrics['latency_ms']['p99']
avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0)
commit_metrics = result["commit_metrics"]
queue_metrics = result.get("queue_depth_analysis", {})
mean_lat = commit_metrics["latency_ms"]["mean"]
p95_lat = commit_metrics["latency_ms"]["p95"]
p99_lat = commit_metrics["latency_ms"]["p99"]
avg_queue = queue_metrics.get("total_unacknowledged", {}).get("mean", 0)
print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}")
print(
f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}"
)
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
print(f"Note: Higher in-flight allows more parallelism but may increase queue variability")
print(
f"Note: Higher in-flight allows more parallelism but may increase queue variability"
)
# Generate plots for best configuration
best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95'])
best_config = min(
results_comparison, key=lambda r: r["commit_metrics"]["latency_ms"]["p95"]
)
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
try:
# Re-run best config to get simulation object for plotting
best_params = next(c for c in configs if c['name'] == best_config['config_name'])
best_params = next(
c for c in configs if c["name"] == best_config["config_name"]
)
sim_best = PersistenceSimulation(
arrival_rate_per_sec=1000.0,
simulation_duration_sec=30.0,
@@ -739,10 +814,10 @@ if __name__ == "__main__":
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0)
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0),
)
sim_best.run_simulation()
sim_best.plot_results(best_config, f'persistence_optimization_results.png')
sim_best.plot_results(best_config, f"persistence_optimization_results.png")
except Exception as e:
print(f"\nCould not generate plots: {e}")
print("Install matplotlib and numpy to enable visualization")
print("Install matplotlib and numpy to enable visualization")