From 96aae52853b6e5ca67ea1da481f5e57970dcd2bb Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Thu, 4 Sep 2025 15:40:17 -0400 Subject: [PATCH] Basic implementation of /commit, /version, and /status No precondition checking, persistence, or log scanning yet. --- CMakeLists.txt | 3 + src/connection.hpp | 2 +- src/http_handler.cpp | 472 ++++++++++++++++++++++++++++++----------- src/http_handler.hpp | 99 ++++++--- src/pipeline_entry.hpp | 47 ++++ 5 files changed, 475 insertions(+), 148 deletions(-) create mode 100644 src/pipeline_entry.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 76f0df5..59c1223 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -156,6 +156,8 @@ add_executable( test_http_handler tests/test_http_handler.cpp src/http_handler.cpp + src/server.cpp + src/config.cpp src/json_commit_request_parser.cpp src/arena_allocator.cpp src/format.cpp @@ -169,6 +171,7 @@ target_link_libraries( doctest::doctest llhttp_static Threads::Threads + toml11::toml11 perfetto simdutf::simdutf weaseljson) diff --git a/src/connection.hpp b/src/connection.hpp index 3cd9285..8e32e0e 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -209,7 +209,7 @@ struct Connection { * metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued()); * ``` */ - int64_t outgoingBytesQueued() const { + int64_t outgoing_bytes_queued() const { #ifndef NDEBUG // Debug build: validate counter accuracy int64_t computed_total = 0; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 4600cc9..b62ac60 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -1,5 +1,6 @@ #include "http_handler.hpp" +#include #include #include #include @@ -9,6 +10,7 @@ #include "json_commit_request_parser.hpp" #include "metric.hpp" #include "perfetto_categories.hpp" +#include "pipeline_entry.hpp" #include "server.hpp" auto requests_counter_family = metric::create_counter( @@ -16,6 +18,12 @@ auto requests_counter_family = metric::create_counter( thread_local auto metrics_counter = requests_counter_family.create({{"path", "/metrics"}}); +// Metric for banned request IDs memory usage +auto banned_request_ids_memory_gauge = + metric::create_gauge("weaseldb_banned_request_ids_memory_bytes", + "Memory used by banned request IDs arena") + .create({}); + // HttpConnectionState implementation HttpConnectionState::HttpConnectionState(ArenaAllocator &arena) : current_header_field_buf(ArenaStlAllocator(&arena)), @@ -63,30 +71,45 @@ void HttpHandler::on_write_buffer_drained( void HttpHandler::on_batch_complete( std::span> batch) { - // Collect commit requests that passed basic validation for pipeline - // processing - int commit_count = 0; + // Collect commit requests and status requests for pipeline processing + int pipeline_count = 0; + + // Count both commit and status requests for (auto &conn : batch) { if (conn && conn->user_data) { auto *state = static_cast(conn->user_data); + + // Count commit requests that passed basic validation if (state->route == HttpRoute::POST_commit && state->commit_request && state->parsing_commit && state->basic_validation_passed) { - commit_count++; + pipeline_count++; + } + // Count status requests + else if (state->route == HttpRoute::GET_status && + // Error message not already queued + conn->outgoing_bytes_queued() == 0) { + pipeline_count++; } } } - // Send commit requests to pipeline in batch - if (commit_count > 0) { - auto guard = commitPipeline.push(commit_count, true); + // Send requests to 4-stage pipeline in batch + if (pipeline_count > 0) { + auto guard = commitPipeline.push(pipeline_count, true); auto out_iter = guard.batch.begin(); for (auto &conn : batch) { if (conn && conn->user_data) { auto *state = static_cast(conn->user_data); + + // Create CommitEntry for commit requests if (state->route == HttpRoute::POST_commit && state->commit_request && state->parsing_commit && state->basic_validation_passed) { - *out_iter++ = std::move(conn); + *out_iter++ = CommitEntry{std::move(conn)}; + } + // Create StatusEntry for status requests + else if (state->route == HttpRoute::GET_status) { + *out_iter++ = StatusEntry{std::move(conn)}; } } } @@ -127,9 +150,6 @@ void HttpHandler::on_data_arrived(std::string_view data, break; case HttpRoute::POST_commit: handlePostCommit(*conn_ptr, *state); - // Commit requests will be sent to pipeline in batch via on_batch_complete - // If basic validation failed, connection stays in batch but won't be sent - // to pipeline break; case HttpRoute::GET_subscribe: handleGetSubscribe(*conn_ptr, *state); @@ -203,7 +223,8 @@ void HttpHandler::handleGetVersion(Connection &conn, const HttpConnectionState &state) { send_json_response( conn, 200, - R"({"version":"0.0.1","leader":"node-1","committed_version":42})", + format(conn.get_arena(), R"({"version":%ld,"leader":""})", + this->committed_version.load(std::memory_order_seq_cst)), state.connection_close); } @@ -240,12 +261,6 @@ void HttpHandler::handlePostCommit(Connection &conn, error_msg = "Commit request must specify a leader_id"; } - // Check leader_id matches current leader (static for process lifetime) - if (valid && commit_request.leader_id() != "test-leader") { - valid = false; - error_msg = "Leader ID mismatch - not current leader"; - } - // Check operations are well-formed if (valid) { for (const auto &op : commit_request.operations()) { @@ -267,9 +282,9 @@ void HttpHandler::handlePostCommit(Connection &conn, return; } - // Basic validation passed - mark for pipeline processing + // Basic validation passed - mark for 4-stage pipeline processing const_cast(state).basic_validation_passed = true; - // Response will be sent after pipeline processing is complete + // Response will be sent after 4-stage pipeline processing is complete } void HttpHandler::handleGetSubscribe(Connection &conn, @@ -282,12 +297,62 @@ void HttpHandler::handleGetSubscribe(Connection &conn, } void HttpHandler::handleGetStatus(Connection &conn, - const HttpConnectionState &state) { - // TODO: Extract request_id from URL and check status - send_json_response( - conn, 200, - R"({"request_id":"example","status":"committed","version":43})", - state.connection_close); + HttpConnectionState &state) { + // Status requests are processed through the pipeline + // Response will be generated in the sequence stage + // This handler extracts request_id from query parameters and prepares for + // pipeline processing + + // Extract request_id from query parameters: + // /v1/status?request_id=&min_version= + std::string_view url = state.url; + + // Find query parameters + size_t query_pos = url.find('?'); + if (query_pos == std::string_view::npos) { + // No query parameters + send_error_response(conn, 400, + "Missing required query parameter: request_id", + state.connection_close); + return; + } + + std::string_view query_string = url.substr(query_pos + 1); + + // Simple query parameter parsing for request_id + // Look for "request_id=" in the query string + size_t request_id_pos = query_string.find("request_id="); + if (request_id_pos == std::string_view::npos) { + send_error_response(conn, 400, + "Missing required query parameter: request_id", + state.connection_close); + return; + } + + // Extract the request_id value + size_t value_start = request_id_pos + 11; // length of "request_id=" + if (value_start >= query_string.length()) { + send_error_response(conn, 400, "Empty request_id parameter", + state.connection_close); + return; + } + + // Find the end of the request_id value (next & or end of string) + size_t value_end = query_string.find('&', value_start); + if (value_end == std::string_view::npos) { + value_end = query_string.length(); + } + + state.status_request_id = + query_string.substr(value_start, value_end - value_start); + + if (state.status_request_id.empty()) { + send_error_response(conn, 400, "Empty request_id parameter", + state.connection_close); + return; + } + + // Ready for pipeline processing } void HttpHandler::handlePutRetention(Connection &conn, @@ -331,16 +396,16 @@ void HttpHandler::handleGetMetrics(Connection &conn, arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", - "X-Response-ID: ", static_cast(http_state->request_id), "\r\n", - "Connection: close\r\n", "\r\n"); + "X-Response-ID: ", static_cast(http_state->http_request_id), + "\r\n", "Connection: close\r\n", "\r\n"); conn.close_after_send(); } else { headers = static_format( arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", - "X-Response-ID: ", static_cast(http_state->request_id), "\r\n", - "Connection: keep-alive\r\n", "\r\n"); + "X-Response-ID: ", static_cast(http_state->http_request_id), + "\r\n", "Connection: keep-alive\r\n", "\r\n"); } // Send headers @@ -354,7 +419,7 @@ void HttpHandler::handleGetMetrics(Connection &conn, void HttpHandler::handleGetOk(Connection &conn, const HttpConnectionState &state) { - TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id)); + TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); sendResponse(conn, 200, "text/plain", "OK", state.connection_close); } @@ -403,7 +468,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code, "\r\n%.*s", status_code, static_cast(status_text.size()), status_text.data(), static_cast(content_type.size()), - content_type.data(), body.size(), state->request_id, + content_type.data(), body.size(), state->http_request_id, connection_header, static_cast(body.size()), body.data()); if (close_connection) { @@ -489,7 +554,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) { id = id * 10 + (c - '0'); } } - state->request_id = id; + state->http_request_id = id; } // Clear buffers for next header @@ -557,110 +622,281 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) { } // Pipeline stage implementations (batch-based) -bool HttpHandler::process_validation_batch(BatchType &batch) { +bool HttpHandler::process_sequence_batch(BatchType &batch) { + // Stage 0: Sequence assignment // This stage performs ONLY work that requires serial processing: - // - Version assignment (must be sequential) - // - Precondition validation (requires consistent database state view) + // - Version/sequence number assignment (must be sequential) + // - Request ID banned list management - for (auto &conn : batch) { - if (!conn) { - return true; // Shutdown signal + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: check banned list, assign version + auto &commit_entry = e; + auto *state = static_cast( + commit_entry.connection->user_data); + + if (!state || !state->commit_request) { + // Should not happen - basic validation was done on I/O thread + send_error_response(*commit_entry.connection, 500, + "Internal server error", true); + return false; + } + + // Check if request_id is banned (for status queries) + // Only check CommitRequest request_id, not HTTP header + if (state->commit_request && + state->commit_request->request_id().has_value()) { + auto commit_request_id = + state->commit_request->request_id().value(); + if (banned_request_ids.find(commit_request_id) != + banned_request_ids.end()) { + // Request ID is banned, this commit should fail + send_json_response( + *commit_entry.connection, 409, + R"({"status": "not_committed", "error": "request_id_banned"})", + state->connection_close); + return false; + } + } + + // Assign sequential version number + commit_entry.assigned_version = next_version++; + + TRACE_EVENT("http", "sequence_commit", + perfetto::Flow::Global(state->http_request_id)); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: add request_id to banned list, get version + // upper bound + auto &status_entry = e; + auto *state = static_cast( + status_entry.connection->user_data); + + if (state && !state->status_request_id.empty()) { + // Add request_id to banned list - store the string in arena and + // use string_view + char *arena_chars = banned_request_arena.allocate( + state->status_request_id.size()); + std::memcpy(arena_chars, state->status_request_id.data(), + state->status_request_id.size()); + std::string_view request_id_view(arena_chars, + state->status_request_id.size()); + banned_request_ids.insert(request_id_view); + + // Update memory usage metric + banned_request_ids_memory_gauge.set( + banned_request_arena.total_allocated()); + + // Set version upper bound to current highest assigned version + status_entry.version_upper_bound = next_version - 1; + } + + TRACE_EVENT("http", "sequence_status", + perfetto::Flow::Global(state->http_request_id)); + + // TODO: Transfer to status threadpool - for now just respond + // not_committed + send_json_response(*status_entry.connection, 200, + R"({"status": "not_committed"})", + state->connection_close); + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; } - - TRACE_EVENT( - "http", "validation", - perfetto::Flow::Global( - static_cast(conn->user_data)->request_id)); - - auto *state = static_cast(conn->user_data); - if (!state || !state->commit_request) { - // Should not happen - basic validation was done on I/O thread - send_error_response(*conn, 500, "Internal server error", true); - state = nullptr; // Mark as error - continue; - } - - // TODO: Implement serialized validation logic: - // 1. Assign version number to this commit (must be sequential) - // 2. Validate preconditions against current database state: - // - Check point reads match expected values/existence - // - Check range reads haven't been modified since read_version - // - Ensure no conflicting writes from other transactions - - // For now, just simulate version assignment - // All basic validation (including leader check) was done on I/O thread - - // TODO: Actual precondition validation would go here - // This is where we'd check each precondition in - // commit_request.preconditions() against the current database state to - // ensure no conflicts - - // For now, assume all preconditions pass since basic validation was done - - // Precondition validation passed - continue to persistence stage } return false; // Continue processing } -bool HttpHandler::process_persistence_batch(BatchType &batch) { - // TODO: This is where we can implement efficient batching for S3 writes - // - Collect all valid commits in the batch - // - Write them as a single S3 batch operation - // - Generate responses for all commits in the batch +bool HttpHandler::process_resolve_batch(BatchType &batch) { + // Stage 1: Precondition resolution + // This stage must be serialized to maintain consistent database state view + // - Validate preconditions against current database state + // - Check for conflicts with other transactions - for (auto &conn : batch) { - if (!conn) { - return true; // Shutdown signal + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: accept all commits (simplified + // implementation) + auto &commit_entry = e; + auto *state = static_cast( + commit_entry.connection->user_data); + + if (!state || !state->commit_request) { + // Skip processing for failed sequence stage + return false; + } + + // Accept all commits (simplified implementation) + commit_entry.resolve_success = true; + + TRACE_EVENT("http", "resolve_commit", + perfetto::Flow::Global(state->http_request_id)); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Status entries are not processed in resolve stage + // They were already handled in sequence stage + return false; + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; } + } + return false; // Continue processing +} - auto *state = static_cast(conn->user_data); +bool HttpHandler::process_persist_batch(BatchType &batch) { + // Stage 2: Transaction persistence + // Mark everything as durable immediately (simplified implementation) + // In real implementation: batch S3 writes, update subscribers, etc. - // Check if validation stage failed or connection is in error state - if (!state || !state->commit_request || !state->message_complete) { - // Skip persistence processing for failed validation - continue; + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: mark as durable, generate response + auto &commit_entry = e; + auto *state = static_cast( + commit_entry.connection->user_data); + + // Skip if resolve failed or connection is in error state + if (!state || !state->commit_request || + !commit_entry.resolve_success) { + return false; + } + + // Mark as persisted and update committed version high water mark + commit_entry.persist_success = true; + committed_version.store(commit_entry.assigned_version, + std::memory_order_seq_cst); + + TRACE_EVENT("http", "persist_commit", + perfetto::Flow::Global(state->http_request_id)); + + const CommitRequest &commit_request = *state->commit_request; + ArenaAllocator &arena = commit_entry.connection->get_arena(); + std::string_view response; + + // Generate success response with actual assigned version + if (commit_request.request_id().has_value()) { + response = format( + arena, + R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", + static_cast(commit_request.request_id().value().size()), + commit_request.request_id().value().data(), + commit_entry.assigned_version); + } else { + response = format( + arena, + R"({"status":"committed","version":%ld,"leader_id":"leader123"})", + commit_entry.assigned_version); + } + + send_json_response(*commit_entry.connection, 200, response, + state->connection_close); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Status entries are not processed in persist stage + // They were already handled in sequence stage + return false; + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; } - - TRACE_EVENT("http", "persistence", - perfetto::Flow::Global(state->request_id)); - - const CommitRequest &commit_request = *state->commit_request; - - // For now, process individually but this could be batched - ArenaAllocator &arena = conn->get_arena(); - std::string_view response; - - if (commit_request.request_id().has_value()) { - response = format( - arena, - R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})", - static_cast(commit_request.request_id().value().size()), - commit_request.request_id().value().data()); - } else { - response = static_format( - arena, - R"({"status":"committed","version":43,"leader_id":"leader123"})"); - } - - send_json_response(*conn, 200, response, state->connection_close); } return false; // Continue processing } bool HttpHandler::process_release_batch(BatchType &batch) { - for (auto &conn : batch) { - if (!conn) { - return true; // Shutdown signal + // Stage 3: Connection release + // Return connections to server for response transmission + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + bool should_shutdown = std::visit( + [&](auto &&e) -> bool { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry: return connection to server + auto &commit_entry = e; + auto *state = static_cast( + commit_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "release_commit", + perfetto::Flow::Global(state->http_request_id)); + } + + // Return connection to server for further processing or cleanup + Server::release_back_to_server(std::move(commit_entry.connection)); + + return false; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: return connection to server + auto &status_entry = e; + auto *state = static_cast( + status_entry.connection->user_data); + + if (state) { + TRACE_EVENT("http", "release_status", + perfetto::Flow::Global(state->http_request_id)); + } + + // Return connection to server for further processing or cleanup + Server::release_back_to_server(std::move(status_entry.connection)); + + return false; // Continue processing + } + + return false; // Unknown type, continue + }, + entry); + + if (should_shutdown) { + return true; } - - TRACE_EVENT( - "http", "release", - perfetto::Flow::Global( - static_cast(conn->user_data)->request_id)); - - // Return connection to server for further processing or cleanup - Server::release_back_to_server(std::move(conn)); } return false; // Continue processing diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 1b0846c..b2ac75b 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -1,15 +1,19 @@ #pragma once +#include #include #include #include +#include #include +#include "arena_allocator.hpp" #include "connection.hpp" #include "connection_handler.hpp" #include "loop_iterations.hpp" #include "perfetto_categories.hpp" +#include "pipeline_entry.hpp" #include "server.hpp" #include "thread_pipeline.hpp" @@ -52,13 +56,18 @@ struct HttpConnectionState { bool connection_close = false; // Client requested connection close HttpRoute route = HttpRoute::NotFound; + // Status request data + std::string_view + status_request_id; // Request ID extracted from /v1/status/{id} URL + // Header accumulation buffers (arena-allocated) using ArenaString = std::basic_string, ArenaStlAllocator>; ArenaString current_header_field_buf; ArenaString current_header_value_buf; bool header_field_complete = false; - int64_t request_id = 0; // X-Request-Id header value + int64_t http_request_id = + 0; // X-Request-Id header value (for tracing/logging) // Streaming parser for POST requests std::unique_ptr commit_parser; @@ -75,34 +84,47 @@ struct HttpConnectionState { * Supports the WeaselDB REST API endpoints with enum-based routing. */ struct HttpHandler : ConnectionHandler { - HttpHandler() { - // Stage 0: Version assignment and precondition validation thread - validationThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-validate"); + HttpHandler() + : banned_request_ids( + ArenaStlAllocator(&banned_request_arena)) { + // Stage 0: Sequence assignment thread + sequenceThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-sequence"); for (;;) { auto guard = commitPipeline.acquire<0, 0>(); - if (process_validation_batch(guard.batch)) { + if (process_sequence_batch(guard.batch)) { return; // Shutdown signal received } } }}; - // Stage 1: Transaction persistence and subscriber streaming thread - persistenceThread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-persist"); + // Stage 1: Precondition resolution thread + resolveThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-resolve"); for (;;) { auto guard = commitPipeline.acquire<1, 0>(); - if (process_persistence_batch(guard.batch)) { + if (process_resolve_batch(guard.batch)) { return; // Shutdown signal received } } }}; - // Stage 2: Connection return to server thread + // Stage 2: Transaction persistence thread + persistThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-persist"); + for (;;) { + auto guard = commitPipeline.acquire<2, 0>(); + if (process_persist_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; + + // Stage 3: Connection return to server thread releaseThread = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-release"); for (;;) { - auto guard = commitPipeline.acquire<2, 0>(); + auto guard = commitPipeline.acquire<3, 0>(); if (process_release_batch(guard.batch)) { return; // Shutdown signal received } @@ -110,17 +132,17 @@ struct HttpHandler : ConnectionHandler { }}; } ~HttpHandler() { - // Send shutdown signals to all pipeline stages + // Send single shutdown signal that flows through all pipeline stages { - auto guard = commitPipeline.push(3, true); - for (auto &c : guard.batch) { - c = {}; // null connection signals shutdown - } + auto guard = commitPipeline.push(1, true); + guard.batch[0] = + ShutdownEntry{}; // Single ShutdownEntry flows through all stages } // Join all pipeline threads - validationThread.join(); - persistenceThread.join(); + sequenceThread.join(); + resolveThread.join(); + persistThread.join(); releaseThread.join(); } @@ -148,29 +170,48 @@ struct HttpHandler : ConnectionHandler { private: static constexpr int lg_size = 16; - // Main commit processing pipeline: validation -> persistence -> release - StaticThreadPipeline, - WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1> + // Pipeline state (sequence thread only) + int64_t next_version = 1; // Next version to assign (sequence thread only) + + // Pipeline state (persist thread writes, I/O threads read) + std::atomic committed_version{ + 0}; // Highest committed version (persist thread writes, I/O threads read) + + // Arena for banned request IDs and related data structures (sequence thread + // only) + ArenaAllocator banned_request_arena; + using BannedRequestIdSet = + std::unordered_set, + std::equal_to, + ArenaStlAllocator>; + BannedRequestIdSet banned_request_ids; // Request IDs that should not commit + // (string_views into arena) + + // Main commit processing pipeline: sequence -> resolve -> persist -> release + StaticThreadPipeline commitPipeline{lg_size}; // Pipeline stage threads - std::thread validationThread; - std::thread persistenceThread; + std::thread sequenceThread; + std::thread resolveThread; + std::thread persistThread; std::thread releaseThread; // Pipeline stage processing methods (batch-based) using BatchType = - StaticThreadPipeline, - WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>::Batch; - bool process_validation_batch(BatchType &batch); - bool process_persistence_batch(BatchType &batch); + StaticThreadPipeline::Batch; + bool process_sequence_batch(BatchType &batch); + bool process_resolve_batch(BatchType &batch); + bool process_persist_batch(BatchType &batch); bool process_release_batch(BatchType &batch); // Route handlers void handleGetVersion(Connection &conn, const HttpConnectionState &state); void handlePostCommit(Connection &conn, const HttpConnectionState &state); void handleGetSubscribe(Connection &conn, const HttpConnectionState &state); - void handleGetStatus(Connection &conn, const HttpConnectionState &state); + void handleGetStatus(Connection &conn, HttpConnectionState &state); void handlePutRetention(Connection &conn, const HttpConnectionState &state); void handleGetRetention(Connection &conn, const HttpConnectionState &state); void handleDeleteRetention(Connection &conn, diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp new file mode 100644 index 0000000..d71b497 --- /dev/null +++ b/src/pipeline_entry.hpp @@ -0,0 +1,47 @@ +#pragma once + +#include "connection.hpp" +#include +#include + +/** + * Pipeline entry for commit requests that need full 4-stage processing. + * Contains connection with parsed CommitRequest. + */ +struct CommitEntry { + std::unique_ptr connection; + int64_t assigned_version = 0; // Set by sequence stage + bool resolve_success = false; // Set by resolve stage + bool persist_success = false; // Set by persist stage + + CommitEntry() = default; // Default constructor for variant + explicit CommitEntry(std::unique_ptr conn) + : connection(std::move(conn)) {} +}; + +/** + * Pipeline entry for status requests that need sequence stage processing + * then transfer to status threadpool. + */ +struct StatusEntry { + std::unique_ptr connection; + int64_t version_upper_bound = 0; // Set by sequence stage + + StatusEntry() = default; // Default constructor for variant + explicit StatusEntry(std::unique_ptr conn) + : connection(std::move(conn)) {} +}; + +/** + * Pipeline entry for coordinated shutdown of all stages. + * Flows through all stages to ensure proper cleanup. + */ +struct ShutdownEntry { + // Empty struct - presence indicates shutdown +}; + +/** + * Pipeline entry variant type used by the commit processing pipeline. + * Each stage pattern-matches on the variant type to handle appropriately. + */ +using PipelineEntry = std::variant; \ No newline at end of file