diff --git a/CMakeLists.txt b/CMakeLists.txt index a3152d3..5f5450b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ target_link_libraries(nanobench_impl PUBLIC nanobench) # Define all source files in one place set(WEASELDB_SOURCES src/arena_allocator.cpp + src/cpu_work.cpp src/format.cpp src/metric.cpp src/json_commit_request_parser.cpp @@ -217,8 +218,8 @@ add_test(NAME metric_tests COMMAND test_metric) add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp) target_link_libraries(bench_arena_allocator nanobench_impl weaseldb_sources) -add_executable(bench_volatile_loop benchmarks/bench_volatile_loop.cpp) -target_link_libraries(bench_volatile_loop nanobench_impl) +add_executable(bench_cpu_work benchmarks/bench_cpu_work.cpp src/cpu_work.cpp) +target_link_libraries(bench_cpu_work nanobench_impl) add_executable(bench_commit_request benchmarks/bench_commit_request.cpp) target_link_libraries(bench_commit_request nanobench_impl weaseldb_sources diff --git a/benchmarks/bench_cpu_work.cpp b/benchmarks/bench_cpu_work.cpp new file mode 100644 index 0000000..d512a02 --- /dev/null +++ b/benchmarks/bench_cpu_work.cpp @@ -0,0 +1,34 @@ +#include +#include +#include + +#include "../src/cpu_work.hpp" + +int main(int argc, char *argv[]) { + int iterations = DEFAULT_HEALTH_CHECK_ITERATIONS; // Default: 7000 + + if (argc > 1) { + try { + iterations = std::stoi(argv[1]); + if (iterations < 0) { + std::cerr << "Error: iterations must be non-negative" << std::endl; + return 1; + } + } catch (const std::exception &e) { + std::cerr << "Error: invalid number '" << argv[1] << "'" << std::endl; + return 1; + } + } + + std::cout << "Benchmarking spend_cpu_cycles with " << iterations + << " iterations" << std::endl; + + ankerl::nanobench::Bench bench; + bench.minEpochIterations(10000); + + // Benchmark the same CPU work that health checks use + bench.run("spend_cpu_cycles(" + std::to_string(iterations) + ")", + [&] { spend_cpu_cycles(iterations); }); + + return 0; +} diff --git a/benchmarks/bench_volatile_loop.cpp b/benchmarks/bench_volatile_loop.cpp deleted file mode 100644 index 3cbaa41..0000000 --- a/benchmarks/bench_volatile_loop.cpp +++ /dev/null @@ -1,14 +0,0 @@ -#include - -#include "../src/loop_iterations.hpp" - -int main() { - ankerl::nanobench::Bench bench; - bench.minEpochIterations(100000); - bench.run("volatile loop to " + std::to_string(loop_iterations), [&] { - for (volatile int i = 0; i < loop_iterations; i = i + 1) - ; - }); - - return 0; -} diff --git a/config.toml b/config.toml index e51875f..534ca14 100644 --- a/config.toml +++ b/config.toml @@ -25,3 +25,10 @@ request_id_retention_versions = 100000000 max_buffer_size_bytes = 10485760 # 10MB # Interval for sending keepalive comments to prevent idle timeouts (seconds) keepalive_interval_seconds = 30 + +[benchmark] +# CPU-intensive loop iterations for /ok requests in resolve stage +# 0 = health check only (no CPU work) +# 7000 = default benchmark load (650ns CPU work, 1M req/s) +# Higher values = more CPU stress testing +ok_resolve_iterations = 7000 diff --git a/src/config.cpp b/src/config.cpp index 58ae998..091df57 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -14,6 +14,7 @@ ConfigParser::load_from_file(const std::string &file_path) { parse_server_config(toml_data, config.server); parse_commit_config(toml_data, config.commit); parse_subscription_config(toml_data, config.subscription); + parse_benchmark_config(toml_data, config.benchmark); if (!validate_config(config)) { return std::nullopt; @@ -36,6 +37,7 @@ ConfigParser::parse_toml_string(const std::string &toml_content) { parse_server_config(toml_data, config.server); parse_commit_config(toml_data, config.commit); parse_subscription_config(toml_data, config.subscription); + parse_benchmark_config(toml_data, config.benchmark); if (!validate_config(config)) { return std::nullopt; @@ -146,6 +148,13 @@ void ConfigParser::parse_subscription_config(const auto &toml_data, }); } +void ConfigParser::parse_benchmark_config(const auto &toml_data, + BenchmarkConfig &config) { + parse_section(toml_data, "benchmark", [&](const auto &bench) { + parse_field(bench, "ok_resolve_iterations", config.ok_resolve_iterations); + }); +} + bool ConfigParser::validate_config(const Config &config) { bool valid = true; diff --git a/src/config.hpp b/src/config.hpp index 7a2854d..f708710 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -74,6 +74,15 @@ struct SubscriptionConfig { std::chrono::seconds keepalive_interval{30}; }; +/** + * @brief Configuration settings for benchmarking and health check behavior. + */ +struct BenchmarkConfig { + /// CPU-intensive loop iterations for /ok requests in resolve stage + /// 0 = health check only, 7000 = default benchmark load (650ns, 1M req/s) + int ok_resolve_iterations = 7000; +}; + /** * @brief Top-level configuration container for all WeaselDB settings. */ @@ -81,6 +90,7 @@ struct Config { ServerConfig server; ///< Server networking and request handling settings CommitConfig commit; ///< Commit processing and validation settings SubscriptionConfig subscription; ///< Subscription streaming settings + BenchmarkConfig benchmark; ///< Benchmarking and health check settings }; /** @@ -152,6 +162,8 @@ private: static void parse_commit_config(const auto &toml_data, CommitConfig &config); static void parse_subscription_config(const auto &toml_data, SubscriptionConfig &config); + static void parse_benchmark_config(const auto &toml_data, + BenchmarkConfig &config); }; } // namespace weaseldb diff --git a/src/cpu_work.cpp b/src/cpu_work.cpp new file mode 100644 index 0000000..a66a482 --- /dev/null +++ b/src/cpu_work.cpp @@ -0,0 +1,10 @@ +#include "cpu_work.hpp" + +void spend_cpu_cycles(int iterations) { + // Perform CPU-intensive work that cannot be optimized away + // Use inline assembly to prevent compiler optimization + for (int i = 0; i < iterations; ++i) { + // Simple loop with inline assembly barrier to prevent optimization + asm volatile(""); + } +} \ No newline at end of file diff --git a/src/cpu_work.hpp b/src/cpu_work.hpp new file mode 100644 index 0000000..d9cf313 --- /dev/null +++ b/src/cpu_work.hpp @@ -0,0 +1,21 @@ +#pragma once + +/** + * @brief Perform CPU-intensive work for benchmarking and health check purposes. + * + * This function performs a deterministic amount of CPU work that cannot be + * optimized away by the compiler. It's used both in the health check resolve + * stage and in benchmarks to measure the actual CPU time consumed. + * + * @param iterations Number of loop iterations to perform + */ +void spend_cpu_cycles(int iterations); + +/** + * @brief Default CPU work iterations for health check benchmarking. + * + * Represents the number of CPU-intensive loop iterations used in the + * /ok health check resolve stage. This value provides 650ns of CPU work + * and achieves 1M requests/second throughput through the 4-stage pipeline. + */ +constexpr int DEFAULT_HEALTH_CHECK_ITERATIONS = 7000; \ No newline at end of file diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 2451a86..778867b 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -7,6 +7,7 @@ #include "api_url_parser.hpp" #include "arena_allocator.hpp" +#include "cpu_work.hpp" #include "format.hpp" #include "json_commit_request_parser.hpp" #include "metric.hpp" @@ -26,6 +27,8 @@ thread_local auto status_counter = requests_counter_family.create({{"path", "/v1/status"}}); thread_local auto version_counter = requests_counter_family.create({{"path", "/v1/version"}}); +thread_local auto ok_counter = + requests_counter_family.create({{"path", "/ok"}}); // Metric for banned request IDs memory usage auto banned_request_ids_memory_gauge = @@ -83,10 +86,10 @@ void HttpHandler::on_write_buffer_drained( void HttpHandler::on_batch_complete( std::span> batch) { - // Collect commit requests and status requests for pipeline processing + // Collect commit, status, and health check requests for pipeline processing int pipeline_count = 0; - // Count both commit and status requests + // Count commit, status, and health check requests for (auto &conn : batch) { if (conn && conn->user_data) { auto *state = static_cast(conn->user_data); @@ -102,6 +105,12 @@ void HttpHandler::on_batch_complete( conn->outgoing_bytes_queued() == 0) { pipeline_count++; } + // Count health check requests + else if (state->route == HttpRoute::GetOk && + // Error message not already queued + conn->outgoing_bytes_queued() == 0) { + pipeline_count++; + } } } @@ -123,6 +132,10 @@ void HttpHandler::on_batch_complete( else if (state->route == HttpRoute::GetStatus) { *out_iter++ = StatusEntry{std::move(conn)}; } + // Create HealthCheckEntry for health check requests + else if (state->route == HttpRoute::GetOk) { + *out_iter++ = HealthCheckEntry{std::move(conn)}; + } } } } @@ -384,9 +397,11 @@ void HttpHandler::handle_get_metrics(Connection &conn, void HttpHandler::handle_get_ok(Connection &conn, const HttpConnectionState &state) { + ok_counter.inc(); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); - sendResponse(conn, 200, "text/plain", "OK", state.connection_close); + // Health check requests are processed through the pipeline + // Response will be generated in the release stage after pipeline processing } void HttpHandler::handle_not_found(Connection &conn, @@ -395,9 +410,9 @@ void HttpHandler::handle_not_found(Connection &conn, } // HTTP utility methods -void HttpHandler::sendResponse(Connection &conn, int status_code, - std::string_view content_type, - std::string_view body, bool close_connection) { +void HttpHandler::send_response(Connection &conn, int status_code, + std::string_view content_type, + std::string_view body, bool close_connection) { ArenaAllocator &arena = conn.get_arena(); auto *state = static_cast(conn.user_data); @@ -446,7 +461,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code, void HttpHandler::send_json_response(Connection &conn, int status_code, std::string_view json, bool close_connection) { - sendResponse(conn, status_code, "application/json", json, close_connection); + send_response(conn, status_code, "application/json", json, close_connection); } void HttpHandler::send_error_response(Connection &conn, int status_code, @@ -674,6 +689,18 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { R"({"status": "not_committed"})", state->connection_close); + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process health check entry: noop in sequence stage + auto &health_check_entry = e; + auto *state = static_cast( + health_check_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "sequence_health_check", + perfetto::Flow::Global(state->http_request_id)); + } + return false; // Continue processing } @@ -725,6 +752,21 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { // Status entries are not processed in resolve stage // They were already handled in sequence stage return false; + } else if constexpr (std::is_same_v) { + // Process health check entry: perform configurable CPU work + auto &health_check_entry = e; + auto *state = static_cast( + health_check_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "resolve_health_check", + perfetto::Flow::Global(state->http_request_id)); + } + + // Perform configurable CPU-intensive work for benchmarking + spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); + + return false; // Continue processing } return false; // Unknown type, continue @@ -798,6 +840,22 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { // Status entries are not processed in persist stage // They were already handled in sequence stage return false; + } else if constexpr (std::is_same_v) { + // Process health check entry: generate OK response + auto &health_check_entry = e; + auto *state = static_cast( + health_check_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "persist_health_check", + perfetto::Flow::Global(state->http_request_id)); + + // Generate OK response + send_response(*health_check_entry.connection, 200, "text/plain", + "OK", state->connection_close); + } + + return false; // Continue processing } return false; // Unknown type, continue @@ -853,6 +911,22 @@ bool HttpHandler::process_release_batch(BatchType &batch) { // Return connection to server for further processing or cleanup Server::release_back_to_server(std::move(status_entry.connection)); + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process health check entry: return connection to server + auto &health_check_entry = e; + auto *state = static_cast( + health_check_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "release_health_check", + perfetto::Flow::Global(state->http_request_id)); + } + + // Return connection to server for further processing or cleanup + Server::release_back_to_server( + std::move(health_check_entry.connection)); + return false; // Continue processing } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 6a33a7a..817f8e4 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -10,9 +10,9 @@ #include "api_url_parser.hpp" #include "arena_allocator.hpp" +#include "config.hpp" #include "connection.hpp" #include "connection_handler.hpp" -#include "loop_iterations.hpp" #include "perfetto_categories.hpp" #include "pipeline_entry.hpp" #include "server.hpp" @@ -70,9 +70,9 @@ struct HttpConnectionState { * Supports the WeaselDB REST API endpoints with enum-based routing. */ struct HttpHandler : ConnectionHandler { - HttpHandler() - : banned_request_ids( - ArenaStlAllocator(&banned_request_arena)) { + explicit HttpHandler(const weaseldb::Config &config) + : config_(config), banned_request_ids(ArenaStlAllocator( + &banned_request_arena)) { // Stage 0: Sequence assignment thread sequenceThread = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-sequence"); @@ -153,6 +153,9 @@ struct HttpHandler : ConnectionHandler { private: static constexpr int lg_size = 16; + // Configuration reference + const weaseldb::Config &config_; + // Pipeline state (sequence thread only) int64_t next_version = 1; // Next version to assign (sequence thread only) @@ -208,9 +211,10 @@ private: void handle_not_found(Connection &conn, const HttpConnectionState &state); // HTTP utilities - static void sendResponse(Connection &conn, int status_code, - std::string_view content_type, std::string_view body, - bool close_connection = false); + static void send_response(Connection &conn, int status_code, + std::string_view content_type, + std::string_view body, + bool close_connection = false); static void send_json_response(Connection &conn, int status_code, std::string_view json, bool close_connection = false); diff --git a/src/loop_iterations.hpp b/src/loop_iterations.hpp deleted file mode 100644 index c920ca0..0000000 --- a/src/loop_iterations.hpp +++ /dev/null @@ -1,3 +0,0 @@ -#pragma once - -constexpr int loop_iterations = 1725; diff --git a/src/main.cpp b/src/main.cpp index aad7b3a..6c16588 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -252,12 +252,14 @@ int main(int argc, char *argv[]) { std::cout << "Keepalive interval: " << config->subscription.keepalive_interval.count() << " seconds" << std::endl; + std::cout << "Health check resolve iterations: " + << config->benchmark.ok_resolve_iterations << std::endl; // Create listen sockets std::vector listen_fds = create_listen_sockets(*config); // Create handler and server - HttpHandler http_handler; + HttpHandler http_handler(*config); auto server = Server::create(*config, http_handler, listen_fds); g_server = server.get(); diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index d71b497..f79b082 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -32,6 +32,19 @@ struct StatusEntry { : connection(std::move(conn)) {} }; +/** + * Pipeline entry for /ok health check requests. + * Flows through all pipeline stages as a noop except resolve stage. + * Resolve stage can perform configurable CPU work for benchmarking. + */ +struct HealthCheckEntry { + std::unique_ptr connection; + + HealthCheckEntry() = default; // Default constructor for variant + explicit HealthCheckEntry(std::unique_ptr conn) + : connection(std::move(conn)) {} +}; + /** * Pipeline entry for coordinated shutdown of all stages. * Flows through all stages to ensure proper cleanup. @@ -44,4 +57,5 @@ struct ShutdownEntry { * Pipeline entry variant type used by the commit processing pipeline. * Each stage pattern-matches on the variant type to handle appropriately. */ -using PipelineEntry = std::variant; \ No newline at end of file +using PipelineEntry = + std::variant; \ No newline at end of file diff --git a/test_benchmark_config.toml b/test_benchmark_config.toml new file mode 100644 index 0000000..8f6c204 --- /dev/null +++ b/test_benchmark_config.toml @@ -0,0 +1,33 @@ +# WeaselDB Test Configuration with Benchmark Health Check + +[server] +# Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing +interfaces = [ + { type = "tcp", address = "127.0.0.1", port = 8080 }, + { type = "unix", path = "weaseldb.sock" } +] +# Maximum request size in bytes (for 413 Content Too Large responses) +max_request_size_bytes = 1048576 # 1MB +# Number of I/O threads for handling connections and network events +io_threads = 8 +epoll_instances = 8 +# Event batch size for epoll processing +event_batch_size = 64 + +[commit] +# Minimum length for request_id to ensure sufficient entropy +min_request_id_length = 20 +# How long to retain request IDs for /v1/status queries (hours) +request_id_retention_hours = 24 +# Minimum number of versions to retain request IDs +request_id_retention_versions = 100000000 + +[subscription] +# Maximum buffer size for unconsumed data in /v1/subscribe (bytes) +max_buffer_size_bytes = 10485760 # 10MB +# Interval for sending keepalive comments to prevent idle timeouts (seconds) +keepalive_interval_seconds = 30 + +[benchmark] +# Use original benchmark load for testing +ok_resolve_iterations = 7000 diff --git a/threading_performance_report.md b/threading_performance_report.md index 407f2be..ca9d76a 100644 --- a/threading_performance_report.md +++ b/threading_performance_report.md @@ -2,30 +2,30 @@ ## Summary -WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline with futex wake optimization, delivering 550ns serial CPU time per request while maintaining 0% CPU usage when idle. Higher serial CPU time means more CPU budget available for serial processing. +WeaselDB's /ok health check endpoint achieves 1M requests/second with 650ns of configurable CPU work per request through the 4-stage commit pipeline, while maintaining 0% CPU usage when idle. The configurable CPU work serves both as a health check (validating the full pipeline) and as a benchmarking tool for measuring per-request processing capacity. ## Performance Metrics ### Throughput -- **1.3M requests/second** over unix socket +- **1.0M requests/second** /ok health check endpoint (4-stage commit pipeline) - 8 I/O threads with 8 epoll instances - Load tester used 12 network threads -- Max latency: 4ms out of 90M requests - **0% CPU usage when idle** (optimized futex wake implementation) ### Threading Architecture -- Two-stage pipeline: Stage-0 (noop) → Stage-1 (connection return) +- **Four-stage commit pipeline**: Sequence → Resolve → Persist → Release - Lock-free coordination using atomic ring buffer - **Optimized futex wake**: Only wake on final pipeline stage -- Each request "processed" serially on single thread +- Configurable CPU work performed serially in resolve stage ### Performance Characteristics -**Optimized Pipeline Mode**: -- **Throughput**: 1.3M requests/second -- **Serial CPU time per request**: 550ns (validated with nanobench) -- **Theoretical maximum serial CPU time**: 769ns (1,000,000,000ns ÷ 1,300,000 req/s) -- **Serial efficiency**: 71.5% (550ns ÷ 769ns) +**Health Check Pipeline (/ok endpoint)**: +- **Throughput**: 1.0M requests/second +- **Configurable CPU work**: 650ns (7000 iterations, validated with nanobench) +- **Theoretical maximum CPU time**: 1000ns (1,000,000,000ns ÷ 1,000,000 req/s) +- **CPU work efficiency**: 65% (650ns ÷ 1000ns) +- **Pipeline stages**: Sequence (noop) → Resolve (CPU work) → Persist (response) → Release (cleanup) - **CPU usage when idle**: 0% ### Key Optimizations @@ -49,14 +49,20 @@ WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeli - **Overall improvement**: 38.9% increase from baseline (396ns → 550ns) ### Request Flow + +**Health Check Pipeline** (/ok endpoint): ``` -I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline +I/O Threads (8) → HttpHandler::on_batch_complete() → Commit Pipeline ↑ ↓ - | Stage 0: Noop thread - | (550ns serial CPU per request) - | (batch size: 1) + | Stage 0: Sequence (noop) | ↓ - | Stage 1: Connection return + | Stage 1: Resolve (650ns CPU work) + | (spend_cpu_cycles(7000)) + | ↓ + | Stage 2: Persist (generate response) + | (send "OK" response) + | ↓ + | Stage 3: Release (connection return) | (optimized futex wake) | ↓ └─────────────────────── Server::release_back_to_server() @@ -64,7 +70,9 @@ I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline ## Test Configuration -- Server: test_config.toml with 8 io_threads, 8 epoll_instances -- Load tester: ./load_tester --network-threads 12 +- Server: test_benchmark_config.toml with 8 io_threads, 8 epoll_instances +- Configuration: `ok_resolve_iterations = 7000` (650ns CPU work) +- Load tester: targeting /ok endpoint +- Benchmark validation: ./bench_cpu_work 7000 - Build: ninja -- Command: ./weaseldb --config test_config.toml +- Command: ./weaseldb --config test_benchmark_config.toml