From f3c3f77a2496e907ef197ca163a28bb33d498d10 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 11:51:01 -0400 Subject: [PATCH] Extract commit pipeline to its own module --- CMakeLists.txt | 1 + src/commit_pipeline.cpp | 434 ++++++++++++++++++++++++++++++++++++++++ src/commit_pipeline.hpp | 133 ++++++++++++ src/http_handler.cpp | 397 +----------------------------------- src/http_handler.hpp | 104 +--------- 5 files changed, 577 insertions(+), 492 deletions(-) create mode 100644 src/commit_pipeline.cpp create mode 100644 src/commit_pipeline.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 95cf837..499ea9b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ target_link_libraries(nanobench_impl PUBLIC nanobench) # Define all source files in one place set(WEASELDB_SOURCES src/arena.cpp + src/commit_pipeline.cpp src/cpu_work.cpp src/format.cpp src/metric.cpp diff --git a/src/commit_pipeline.cpp b/src/commit_pipeline.cpp new file mode 100644 index 0000000..5ec0f59 --- /dev/null +++ b/src/commit_pipeline.cpp @@ -0,0 +1,434 @@ +#include "commit_pipeline.hpp" + +#include +#include + +#include "commit_request.hpp" +#include "cpu_work.hpp" +#include "format.hpp" +#include "metric.hpp" + +// Metric for banned request IDs memory usage +auto banned_request_ids_memory_gauge = + metric::create_gauge("weaseldb_banned_request_ids_memory_bytes", + "Memory used by banned request IDs arena") + .create({}); + +CommitPipeline::CommitPipeline(const weaseldb::Config &config) + : config_(config), banned_request_ids_(ArenaStlAllocator( + &banned_request_arena_)), + pipeline_(lg_size) { + + // Stage 0: Sequence assignment thread + sequence_thread_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-sequence"); + for (;;) { + auto guard = pipeline_.acquire<0, 0>(); + if (process_sequence_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; + + // Stage 1: Precondition resolution thread + resolve_thread_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-resolve"); + for (;;) { + auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); + if (process_resolve_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; + + // Stage 2: Transaction persistence thread + persist_thread_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-persist"); + for (;;) { + auto guard = pipeline_.acquire<2, 0>(); + if (process_persist_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; + + // Stage 3: Connection return to server thread + release_thread_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-release"); + for (;;) { + auto guard = pipeline_.acquire<3, 0>(); + if (process_release_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; +} + +CommitPipeline::~CommitPipeline() { + // Send single shutdown signal that flows through all pipeline stages + { + auto guard = pipeline_.push(1, true); + guard.batch[0] = ShutdownEntry{}; + } + + // Join all pipeline threads + sequence_thread_.join(); + resolve_thread_.join(); + persist_thread_.join(); + release_thread_.join(); +} + +void CommitPipeline::submit_batch(std::span entries) { + if (entries.empty()) { + return; + } + + // Get pipeline guard for batch size + auto guard = pipeline_.push(entries.size(), /*block=*/true); + + // Move entries into pipeline slots + std::move(entries.begin(), entries.end(), guard.batch.begin()); + + // Guard destructor publishes batch to stage 0 +} + +bool CommitPipeline::process_sequence_batch(BatchType &batch) { + // Stage 0: Sequence assignment + // This stage performs ONLY work that requires serial processing: + // - Version/sequence number assignment (must be sequential) + // - Request ID banned list management + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: check banned list, assign version + auto &commit_entry = e; + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + if (!commit_entry.commit_request) { + // Should not happen - basic validation was done on I/O thread + conn_ref->send_response(commit_entry.protocol_context, + R"({"error":"Internal server error"})", + Arena{}); + return false; + } + + // Check if request_id is banned (for status queries) + // Only check CommitRequest request_id, not HTTP header + if (commit_entry.commit_request && + commit_entry.commit_request->request_id().has_value()) { + auto commit_request_id = + commit_entry.commit_request->request_id().value(); + if (banned_request_ids_.find(commit_request_id) != + banned_request_ids_.end()) { + // Request ID is banned, this commit should fail + conn_ref->send_response( + commit_entry.protocol_context, + R"({"status": "not_committed", "error": "request_id_banned"})", + Arena{}); + return false; + } + } + + // Assign sequential version number + commit_entry.assigned_version = next_version_++; + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: add request_id to banned list, get version + // upper bound + auto &status_entry = e; + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + if (!status_entry.status_request_id.empty()) { + // Add request_id to banned list - store the string in arena and + // use string_view + std::string_view request_id_view = + banned_request_arena_.copy_string( + status_entry.status_request_id); + banned_request_ids_.insert(request_id_view); + + // Update memory usage metric + banned_request_ids_memory_gauge.set( + banned_request_arena_.total_allocated()); + + // Set version upper bound to current highest assigned version + status_entry.version_upper_bound = next_version_ - 1; + } + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process health check entry: noop in sequence stage + auto &health_check_entry = e; + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; + } + } + return false; // Continue processing +} + +bool CommitPipeline::process_resolve_batch(BatchType &batch) { + // Stage 1: Precondition resolution + // This stage must be serialized to maintain consistent database state view + // - Validate preconditions against current database state + // - Check for conflicts with other transactions + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: accept all commits (simplified + // implementation) + auto &commit_entry = e; + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + if (!commit_entry.commit_request) { + // Skip processing for failed sequence stage + return false; + } + + // Accept all commits (simplified implementation) + commit_entry.resolve_success = true; + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Status entries are not processed in resolve stage + // They were already handled in sequence stage + return false; + } else if constexpr (std::is_same_v) { + // Process health check entry: perform configurable CPU work + auto &health_check_entry = e; + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Perform configurable CPU-intensive work for benchmarking + spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; + } + } + return false; // Continue processing +} + +bool CommitPipeline::process_persist_batch(BatchType &batch) { + // Stage 2: Transaction persistence + // Mark everything as durable immediately (simplified implementation) + // In real implementation: batch S3 writes, update subscribers, etc. + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: mark as durable, generate response + auto &commit_entry = e; + // Check if connection is still alive first + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Skip if resolve failed or connection is in error state + if (!commit_entry.commit_request || !commit_entry.resolve_success) { + return false; + } + + // Mark as persisted and update committed version high water mark + commit_entry.persist_success = true; + committed_version_.store(commit_entry.assigned_version, + std::memory_order_seq_cst); + + const CommitRequest &commit_request = *commit_entry.commit_request; + + // Generate success JSON response with actual assigned version + std::string_view response_json; + if (commit_request.request_id().has_value()) { + response_json = format( + commit_entry.request_arena, + R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", + static_cast(commit_request.request_id().value().size()), + commit_request.request_id().value().data(), + commit_entry.assigned_version); + } else { + response_json = format( + commit_entry.request_arena, + R"({"status":"committed","version":%ld,"leader_id":"leader123"})", + commit_entry.assigned_version); + } + + // Store JSON response in arena for release stage + char *json_buffer = + commit_entry.request_arena.template allocate( + response_json.size()); + std::memcpy(json_buffer, response_json.data(), + response_json.size()); + commit_entry.response_json = + std::string_view(json_buffer, response_json.size()); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: generate not_committed response + auto &status_entry = e; + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; + } + + // Store JSON response for release stage + status_entry.response_json = R"({"status": "not_committed"})"; + + return false; + } else if constexpr (std::is_same_v) { + // Process health check entry: generate OK response + auto &health_check_entry = e; + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Store plain text "OK" response for release stage + health_check_entry.response_json = "OK"; + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; + } + } + + return false; // Continue processing +} + +bool CommitPipeline::process_release_batch(BatchType &batch) { + // Stage 3: Connection release + // Return connections to server for response transmission + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: return connection to server + auto &commit_entry = e; + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(commit_entry.protocol_context, + commit_entry.response_json, + std::move(commit_entry.request_arena)); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: return connection to server + auto &status_entry = e; + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(status_entry.protocol_context, + status_entry.response_json, + std::move(status_entry.request_arena)); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process health check entry: return connection to server + auto &health_check_entry = e; + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Send the response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response( + health_check_entry.protocol_context, + health_check_entry.response_json, + std::move(health_check_entry.request_arena)); + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; + } + } + + return false; // Continue processing +} diff --git a/src/commit_pipeline.hpp b/src/commit_pipeline.hpp new file mode 100644 index 0000000..25d7a52 --- /dev/null +++ b/src/commit_pipeline.hpp @@ -0,0 +1,133 @@ +#pragma once + +#include +#include +#include +#include + +#include "arena.hpp" +#include "config.hpp" +#include "pipeline_entry.hpp" +#include "thread_pipeline.hpp" + +/** + * High-performance 4-stage commit processing pipeline. + * + * Provides protocol-agnostic transaction processing through a lock-free + * multi-stage pipeline optimized for high throughput and low latency. + * + * Pipeline Stages: + * 1. Sequence: Version assignment and request ID deduplication + * 2. Resolve: Precondition validation and conflict detection + * 3. Persist: Transaction durability and response generation + * 4. Release: Connection return and response transmission + * + * Thread Safety: + * - submit_batch() is thread-safe for concurrent producers + * - Internal pipeline uses lock-free algorithms + * - Each stage runs on dedicated threads for optimal performance + * + * Usage: + * ```cpp + * CommitPipeline pipeline(config); + * + * // Build pipeline entries + * std::vector entries; + * entries.emplace_back(CommitEntry(connection, context, request, arena)); + * + * // Submit for processing + * pipeline.submit_batch(entries); + * ``` + */ +struct CommitPipeline { + /** + * Create pipeline with 4 processing stages. + * + * @param config Server configuration for pipeline tuning + */ + explicit CommitPipeline(const weaseldb::Config &config); + + /** + * Destructor ensures clean shutdown and thread join. + * Sends shutdown signal through pipeline and waits for all stages to + * complete. + */ + ~CommitPipeline(); + + /** + * Submit batch of pipeline entries for processing. + * + * Thread-safe method for submitting work to the pipeline. Entries flow + * through all 4 stages in order with proper synchronization. + * + * @param entries Span of pipeline entries to process + * + * Entry types: + * - CommitEntry: Full transaction processing through all stages + * - StatusEntry: Request status lookup with sequence stage processing + * - HealthCheckEntry: Health check with configurable CPU work + * - ShutdownEntry: Coordinated pipeline shutdown signal + * + * @note Thread Safety: Safe for concurrent calls from multiple threads + * @note Performance: Batching reduces pipeline contention - prefer larger + * batches + * @note Blocking: May block if pipeline is at capacity (backpressure) + */ + void submit_batch(std::span entries); + + /** + * Get the highest committed version number. + * + * @return Current committed version (persist thread writes, other threads + * read) + * @note Thread Safety: Safe to read from any thread + */ + int64_t get_committed_version() const { + return committed_version_.load(std::memory_order_seq_cst); + } + +private: + // Configuration reference + const weaseldb::Config &config_; + + // Pipeline state (sequence stage only) + int64_t next_version_ = 1; // Next version to assign (sequence thread only) + + // Pipeline state (persist thread writes, other threads read) + std::atomic committed_version_{0}; // Highest committed version + + // Request ID deduplication (sequence stage only) + Arena banned_request_arena_; + using BannedRequestIdSet = + std::unordered_set, + std::equal_to, + ArenaStlAllocator>; + BannedRequestIdSet banned_request_ids_; + + // Lock-free pipeline configuration + static constexpr int lg_size = 16; // Ring buffer size (2^16 slots) + static constexpr auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle; + + // 4-stage pipeline: sequence -> resolve -> persist -> release + StaticThreadPipeline pipeline_; + + // Stage processing threads + std::thread sequence_thread_; + std::thread resolve_thread_; + std::thread persist_thread_; + std::thread release_thread_; + + // Pipeline stage processing methods (batch-based) + using BatchType = + StaticThreadPipeline::Batch; + bool process_sequence_batch(BatchType &batch); + bool process_resolve_batch(BatchType &batch); + bool process_persist_batch(BatchType &batch); + bool process_release_batch(BatchType &batch); + + // Make non-copyable and non-movable + CommitPipeline(const CommitPipeline &) = delete; + CommitPipeline &operator=(const CommitPipeline &) = delete; + CommitPipeline(CommitPipeline &&) = delete; + CommitPipeline &operator=(CommitPipeline &&) = delete; +}; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index e17d43a..18646ee 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -8,7 +8,6 @@ #include "api_url_parser.hpp" #include "arena.hpp" #include "connection.hpp" -#include "cpu_work.hpp" #include "format.hpp" #include "json_commit_request_parser.hpp" #include "metric.hpp" @@ -30,12 +29,6 @@ thread_local auto version_counter = thread_local auto ok_counter = requests_counter_family.create({{"path", "/ok"}}); -// Metric for banned request IDs memory usage -auto banned_request_ids_memory_gauge = - metric::create_gauge("weaseldb_banned_request_ids_memory_bytes", - "Memory used by banned request IDs arena") - .create({}); - HttpConnectionState::HttpConnectionState() { llhttp_settings_init(&settings); @@ -82,11 +75,6 @@ void HttpHandler::on_preprocess_writes( for (auto &pending : pending_responses) { auto *ctx = static_cast(pending.protocol_context); - printf( - "Processing response: sequence_id=%ld, request_id=%ld, json='%.*s'\n", - ctx->sequence_id, ctx->http_request_id, - (int)pending.response_json.size(), pending.response_json.data()); - // Determine HTTP status code and content type from response content int status_code = 200; std::string_view content_type = "application/json"; @@ -109,28 +97,21 @@ void HttpHandler::on_preprocess_writes( status_code, content_type, pending.response_json, pending.arena, ctx->http_request_id, ctx->connection_close); - printf("Adding response to queue: sequence_id=%ld\n", ctx->sequence_id); state->ready_responses[ctx->sequence_id] = ResponseData{ http_response, std::move(pending.arena), ctx->connection_close}; } // Send responses in sequential order - printf("Checking for sequential responses, next_sequence_to_send=%ld\n", - state->next_sequence_to_send); auto iter = state->ready_responses.begin(); while (iter != state->ready_responses.end() && iter->first == state->next_sequence_to_send) { auto &[sequence_id, response_data] = *iter; - printf("Sending response: sequence_id=%ld\n", sequence_id); conn.append_bytes(response_data.data, std::move(response_data.arena), response_data.connection_close); state->next_sequence_to_send++; iter = state->ready_responses.erase(iter); } - printf("After processing, next_sequence_to_send=%ld, " - "ready_responses.size()=%zu\n", - state->next_sequence_to_send, state->ready_responses.size()); } } @@ -245,12 +226,10 @@ void HttpHandler::on_batch_complete(std::span batch) { state->queue.clear(); } - // Send requests to 4-stage pipeline in batch. Batching here reduces + // Send requests to commit pipeline in batch. Batching here reduces // contention on the way into the pipeline. - if (g_batch_entries.size() > 0) { - auto guard = commitPipeline.push(g_batch_entries.size(), true); - std::move(g_batch_entries.begin(), g_batch_entries.end(), - guard.batch.begin()); + if (!g_batch_entries.empty()) { + commit_pipeline_.submit_batch(g_batch_entries); } g_batch_entries.clear(); } @@ -305,9 +284,8 @@ void HttpHandler::handle_get_version(Connection &conn, version_counter.inc(); // Generate JSON response - auto json_response = - format(state.arena, R"({"version":%ld,"leader":""})", - this->committed_version.load(std::memory_order_seq_cst)); + auto json_response = format(state.arena, R"({"version":%ld,"leader":""})", + commit_pipeline_.get_committed_version()); // Format HTTP response auto http_response = @@ -809,368 +787,3 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) { state->message_complete = true; return HPE_PAUSED; } - -// Pipeline stage implementations (batch-based) -bool HttpHandler::process_sequence_batch(BatchType &batch) { - // Stage 0: Sequence assignment - // This stage performs ONLY work that requires serial processing: - // - Version/sequence number assignment (must be sequential) - // - Request ID banned list management - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - return true; // Signal shutdown - } else if constexpr (std::is_same_v) { - // Process commit entry: check banned list, assign version - auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - if (!commit_entry.commit_request) { - // Should not happen - basic validation was done on I/O thread - conn_ref->send_response(commit_entry.protocol_context, - R"({"error":"Internal server error"})", - Arena{}); - return false; - } - - // Check if request_id is banned (for status queries) - // Only check CommitRequest request_id, not HTTP header - if (commit_entry.commit_request && - commit_entry.commit_request->request_id().has_value()) { - auto commit_request_id = - commit_entry.commit_request->request_id().value(); - if (banned_request_ids.find(commit_request_id) != - banned_request_ids.end()) { - // Request ID is banned, this commit should fail - conn_ref->send_response( - commit_entry.protocol_context, - R"({"status": "not_committed", "error": "request_id_banned"})", - Arena{}); - return false; - } - } - - // Assign sequential version number - commit_entry.assigned_version = next_version++; - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Process status entry: add request_id to banned list, get version - // upper bound - auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - if (!status_entry.status_request_id.empty()) { - // Add request_id to banned list - store the string in arena and - // use string_view - std::string_view request_id_view = - banned_request_arena.copy_string( - status_entry.status_request_id); - banned_request_ids.insert(request_id_view); - - // Update memory usage metric - banned_request_ids_memory_gauge.set( - banned_request_arena.total_allocated()); - - // Set version upper bound to current highest assigned version - status_entry.version_upper_bound = next_version - 1; - } - - // TODO: Transfer to status threadpool - for now mark as processed - // Response will be generated in persist stage - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Process health check entry: noop in sequence stage - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - return false; // Continue processing - } - - return false; // Unknown type, continue - }, - entry); - - if (should_shutdown) { - return true; - } - } - return false; // Continue processing -} - -bool HttpHandler::process_resolve_batch(BatchType &batch) { - // Stage 1: Precondition resolution - // This stage must be serialized to maintain consistent database state view - // - Validate preconditions against current database state - // - Check for conflicts with other transactions - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - return true; // Signal shutdown - } else if constexpr (std::is_same_v) { - // Process commit entry: accept all commits (simplified - // implementation) - auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - if (!commit_entry.commit_request) { - // Skip processing for failed sequence stage - return false; - } - - // Accept all commits (simplified implementation) - commit_entry.resolve_success = true; - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Status entries are not processed in resolve stage - // They were already handled in sequence stage - return false; - } else if constexpr (std::is_same_v) { - // Process health check entry: perform configurable CPU work - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Perform configurable CPU-intensive work for benchmarking - spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); - - return false; // Continue processing - } - - return false; // Unknown type, continue - }, - entry); - - if (should_shutdown) { - return true; - } - } - return false; // Continue processing -} - -bool HttpHandler::process_persist_batch(BatchType &batch) { - // Stage 2: Transaction persistence - // Mark everything as durable immediately (simplified implementation) - // In real implementation: batch S3 writes, update subscribers, etc. - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - return true; // Signal shutdown - } else if constexpr (std::is_same_v) { - // Process commit entry: mark as durable, generate response - auto &commit_entry = e; - // Check if connection is still alive first - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Skip if resolve failed or connection is in error state - if (!commit_entry.commit_request || !commit_entry.resolve_success) { - return false; - } - - // Mark as persisted and update committed version high water mark - commit_entry.persist_success = true; - committed_version.store(commit_entry.assigned_version, - std::memory_order_seq_cst); - - const CommitRequest &commit_request = *commit_entry.commit_request; - - // Generate success JSON response with actual assigned version - std::string_view response_json; - if (commit_request.request_id().has_value()) { - response_json = format( - commit_entry.request_arena, - R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", - static_cast(commit_request.request_id().value().size()), - commit_request.request_id().value().data(), - commit_entry.assigned_version); - } else { - response_json = format( - commit_entry.request_arena, - R"({"status":"committed","version":%ld,"leader_id":"leader123"})", - commit_entry.assigned_version); - } - - // Store JSON response in arena for release stage - char *json_buffer = - commit_entry.request_arena.template allocate( - response_json.size()); - std::memcpy(json_buffer, response_json.data(), - response_json.size()); - commit_entry.response_json = - std::string_view(json_buffer, response_json.size()); - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Process status entry: generate not_committed response - auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; - } - - // Store JSON response for release stage - status_entry.response_json = R"({"status": "not_committed"})"; - - return false; - } else if constexpr (std::is_same_v) { - // Process health check entry: generate OK response - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Store plain text "OK" response for release stage - health_check_entry.response_json = "OK"; - - return false; // Continue processing - } - - return false; // Unknown type, continue - }, - entry); - - if (should_shutdown) { - return true; - } - } - - return false; // Continue processing -} - -bool HttpHandler::process_release_batch(BatchType &batch) { - // Stage 3: Connection release - // Return connections to server for response transmission - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - return true; // Signal shutdown - } else if constexpr (std::is_same_v) { - // Process commit entry: return connection to server - auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Send the JSON response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response(commit_entry.protocol_context, - commit_entry.response_json, - std::move(commit_entry.request_arena)); - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Process status entry: return connection to server - auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Send the JSON response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response(status_entry.protocol_context, - status_entry.response_json, - std::move(status_entry.request_arena)); - - return false; // Continue processing - } else if constexpr (std::is_same_v) { - // Process health check entry: return connection to server - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently and increment - // metric - // TODO: Add dropped_pipeline_entries metric - return false; // Skip this entry and continue processing - } - - // Send the response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response( - health_check_entry.protocol_context, - health_check_entry.response_json, - std::move(health_check_entry.request_arena)); - - return false; // Continue processing - } - - return false; // Unknown type, continue - }, - entry); - - if (should_shutdown) { - return true; - } - } - - return false; // Continue processing -} diff --git a/src/http_handler.hpp b/src/http_handler.hpp index b077775..ad3f063 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -13,11 +13,10 @@ #include "api_url_parser.hpp" #include "arena.hpp" +#include "commit_pipeline.hpp" #include "config.hpp" #include "connection.hpp" #include "connection_handler.hpp" -#include "pipeline_entry.hpp" -#include "thread_pipeline.hpp" // Forward declarations struct CommitRequest; @@ -108,66 +107,7 @@ struct HttpConnectionState { */ struct HttpHandler : ConnectionHandler { explicit HttpHandler(const weaseldb::Config &config) - : config_(config), banned_request_ids(ArenaStlAllocator( - &banned_request_arena)) { - // Stage 0: Sequence assignment thread - sequenceThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-sequence"); - for (;;) { - auto guard = commitPipeline.acquire<0, 0>(); - if (process_sequence_batch(guard.batch)) { - return; // Shutdown signal received - } - } - }}; - - // Stage 1: Precondition resolution thread - resolveThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-resolve"); - for (;;) { - auto guard = commitPipeline.acquire<1, 0>(/*maxBatch*/ 1); - if (process_resolve_batch(guard.batch)) { - return; // Shutdown signal received - } - } - }}; - - // Stage 2: Transaction persistence thread - persistThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-persist"); - for (;;) { - auto guard = commitPipeline.acquire<2, 0>(); - if (process_persist_batch(guard.batch)) { - return; // Shutdown signal received - } - } - }}; - - // Stage 3: Connection return to server thread - releaseThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-release"); - for (;;) { - auto guard = commitPipeline.acquire<3, 0>(); - if (process_release_batch(guard.batch)) { - return; // Shutdown signal received - } - } - }}; - } - ~HttpHandler() { - // Send single shutdown signal that flows through all pipeline stages - { - auto guard = commitPipeline.push(1, true); - guard.batch[0] = - ShutdownEntry{}; // Single ShutdownEntry flows through all stages - } - - // Join all pipeline threads - sequenceThread.join(); - resolveThread.join(); - persistThread.join(); - releaseThread.join(); - } + : config_(config), commit_pipeline_(config) {} void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; @@ -188,47 +128,11 @@ struct HttpHandler : ConnectionHandler { static int onMessageComplete(llhttp_t *parser); private: - static constexpr int lg_size = 16; - // Configuration reference const weaseldb::Config &config_; - // Pipeline state (sequence thread only) - int64_t next_version = 1; // Next version to assign (sequence thread only) - - // Pipeline state (persist thread writes, I/O threads read) - std::atomic committed_version{ - 0}; // Highest committed version (persist thread writes, I/O threads read) - - // Arena for banned request IDs and related data structures (sequence thread - // only) - Arena banned_request_arena; - using BannedRequestIdSet = - std::unordered_set, - std::equal_to, - ArenaStlAllocator>; - BannedRequestIdSet banned_request_ids; // Request IDs that should not commit - // (string_views into arena) - - constexpr static auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle; - - // Main commit processing pipeline: sequence -> resolve -> persist -> release - StaticThreadPipeline commitPipeline{ - lg_size}; - - // Pipeline stage threads - std::thread sequenceThread; - std::thread resolveThread; - std::thread persistThread; - std::thread releaseThread; - - // Pipeline stage processing methods (batch-based) - using BatchType = - StaticThreadPipeline::Batch; - bool process_sequence_batch(BatchType &batch); - bool process_resolve_batch(BatchType &batch); - bool process_persist_batch(BatchType &batch); - bool process_release_batch(BatchType &batch); + // Commit processing pipeline + CommitPipeline commit_pipeline_; // Route handlers void handle_get_version(Connection &conn, HttpRequestState &state);