From 34accb9d8093d31bb01f9b5444c86d70afe924dd Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 12:30:02 -0400 Subject: [PATCH] Move GetVersion to commit pipeline --- src/commit_pipeline.cpp | 28 ++++++++++++++++++++++++++++ src/http_handler.cpp | 24 ++++++++---------------- src/pipeline_entry.hpp | 30 ++++++++++++++++++++++++++++-- 3 files changed, 64 insertions(+), 18 deletions(-) diff --git a/src/commit_pipeline.cpp b/src/commit_pipeline.cpp index 5ec0f59..a911593 100644 --- a/src/commit_pipeline.cpp +++ b/src/commit_pipeline.cpp @@ -7,6 +7,7 @@ #include "cpu_work.hpp" #include "format.hpp" #include "metric.hpp" +#include "pipeline_entry.hpp" // Metric for banned request IDs memory usage auto banned_request_ids_memory_gauge = @@ -344,6 +345,20 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) { health_check_entry.response_json = "OK"; return false; // Continue processing + } else if constexpr (std::is_same_v) { + auto &get_version_entry = e; + auto conn_ref = get_version_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // TODO validate we're still the leader at some version > the + // proposed version for external consistency. + // TODO include leader in response + get_version_entry.response_json = format( + get_version_entry.request_arena, + R"({"version":%ld,"leader":""})", get_version_entry.version); } return false; // Unknown type, continue @@ -419,6 +434,19 @@ bool CommitPipeline::process_release_batch(BatchType &batch) { std::move(health_check_entry.request_arena)); return false; // Continue processing + } else if constexpr (std::is_same_v) { + auto &get_version_entry = e; + auto conn_ref = get_version_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; // Skip this entry and continue processing + } + + // Send the response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(get_version_entry.protocol_context, + get_version_entry.response_json, + std::move(get_version_entry.request_arena)); } return false; // Unknown type, continue diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 18646ee..ed02a70 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -1,6 +1,5 @@ #include "http_handler.hpp" -#include #include #include #include @@ -11,7 +10,6 @@ #include "format.hpp" #include "json_commit_request_parser.hpp" #include "metric.hpp" -#include "perfetto_categories.hpp" #include "pipeline_entry.hpp" auto requests_counter_family = metric::create_counter( @@ -222,6 +220,12 @@ void HttpHandler::on_batch_complete(std::span batch) { g_batch_entries.emplace_back( HealthCheckEntry(conn->get_weak_ref(), ctx, std::move(req.arena))); } + // Create GetVersionEntry for version requests + else if (req.route == HttpRoute::GetVersion) { + g_batch_entries.emplace_back( + GetVersionEntry(conn->get_weak_ref(), ctx, std::move(req.arena), + commit_pipeline_.get_committed_version())); + } } state->queue.clear(); } @@ -279,21 +283,9 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { } // Route handlers (basic implementations) -void HttpHandler::handle_get_version(Connection &conn, - HttpRequestState &state) { +void HttpHandler::handle_get_version(Connection &, HttpRequestState &) { version_counter.inc(); - - // Generate JSON response - auto json_response = format(state.arena, R"({"version":%ld,"leader":""})", - commit_pipeline_.get_committed_version()); - - // Format HTTP response - auto http_response = - format_json_response(200, json_response, state.arena, - state.http_request_id, state.connection_close); - - // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + // Sent to commit pipeline } void HttpHandler::handle_post_commit(Connection &conn, diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index 824384d..66982b8 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -82,6 +82,32 @@ struct HealthCheckEntry { request_arena(std::move(arena)) {} }; +/** + * Pipeline entry for /v1/version requests. + * Needs to integrate with the pipeline because for external consistency. + */ +struct GetVersionEntry { + WeakRef connection; + + // Protocol-agnostic context (arena-allocated, protocol-specific) + void *protocol_context = nullptr; + + // Request arena for response data + Arena request_arena; + + // JSON response body (set by persist stage, arena-allocated) + std::string_view response_json; + + // Proposed response version + int64_t version; + + GetVersionEntry() = default; // Default constructor for variant + explicit GetVersionEntry(WeakRef conn, void *ctx, Arena arena, + int64_t version) + : connection(std::move(conn)), protocol_context(ctx), + request_arena(std::move(arena)), version(version) {} +}; + /** * Pipeline entry for coordinated shutdown of all stages. * Flows through all stages to ensure proper cleanup. @@ -94,5 +120,5 @@ struct ShutdownEntry { * Pipeline entry variant type used by the commit processing pipeline. * Each stage pattern-matches on the variant type to handle appropriately. */ -using PipelineEntry = - std::variant; +using PipelineEntry = std::variant;