Move GetVersion to commit pipeline

This commit is contained in:
2025-09-15 12:30:02 -04:00
parent f3c3f77a24
commit 34accb9d80
3 changed files with 64 additions and 18 deletions

View File

@@ -7,6 +7,7 @@
#include "cpu_work.hpp" #include "cpu_work.hpp"
#include "format.hpp" #include "format.hpp"
#include "metric.hpp" #include "metric.hpp"
#include "pipeline_entry.hpp"
// Metric for banned request IDs memory usage // Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge = auto banned_request_ids_memory_gauge =
@@ -344,6 +345,20 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) {
health_check_entry.response_json = "OK"; health_check_entry.response_json = "OK";
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
auto conn_ref = get_version_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// TODO validate we're still the leader at some version > the
// proposed version for external consistency.
// TODO include leader in response
get_version_entry.response_json = format(
get_version_entry.request_arena,
R"({"version":%ld,"leader":""})", get_version_entry.version);
} }
return false; // Unknown type, continue return false; // Unknown type, continue
@@ -419,6 +434,19 @@ bool CommitPipeline::process_release_batch(BatchType &batch) {
std::move(health_check_entry.request_arena)); std::move(health_check_entry.request_arena));
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
auto conn_ref = get_version_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// Send the response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(get_version_entry.protocol_context,
get_version_entry.response_json,
std::move(get_version_entry.request_arena));
} }
return false; // Unknown type, continue return false; // Unknown type, continue

View File

@@ -1,6 +1,5 @@
#include "http_handler.hpp" #include "http_handler.hpp"
#include <atomic>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <strings.h> #include <strings.h>
@@ -11,7 +10,6 @@
#include "format.hpp" #include "format.hpp"
#include "json_commit_request_parser.hpp" #include "json_commit_request_parser.hpp"
#include "metric.hpp" #include "metric.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp" #include "pipeline_entry.hpp"
auto requests_counter_family = metric::create_counter( auto requests_counter_family = metric::create_counter(
@@ -222,6 +220,12 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
g_batch_entries.emplace_back( g_batch_entries.emplace_back(
HealthCheckEntry(conn->get_weak_ref(), ctx, std::move(req.arena))); HealthCheckEntry(conn->get_weak_ref(), ctx, std::move(req.arena)));
} }
// Create GetVersionEntry for version requests
else if (req.route == HttpRoute::GetVersion) {
g_batch_entries.emplace_back(
GetVersionEntry(conn->get_weak_ref(), ctx, std::move(req.arena),
commit_pipeline_.get_committed_version()));
}
} }
state->queue.clear(); state->queue.clear();
} }
@@ -279,21 +283,9 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
} }
// Route handlers (basic implementations) // Route handlers (basic implementations)
void HttpHandler::handle_get_version(Connection &conn, void HttpHandler::handle_get_version(Connection &, HttpRequestState &) {
HttpRequestState &state) {
version_counter.inc(); version_counter.inc();
// Sent to commit pipeline
// Generate JSON response
auto json_response = format(state.arena, R"({"version":%ld,"leader":""})",
commit_pipeline_.get_committed_version());
// Format HTTP response
auto http_response =
format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_post_commit(Connection &conn, void HttpHandler::handle_post_commit(Connection &conn,

View File

@@ -82,6 +82,32 @@ struct HealthCheckEntry {
request_arena(std::move(arena)) {} request_arena(std::move(arena)) {}
}; };
/**
* Pipeline entry for /v1/version requests.
* Needs to integrate with the pipeline because for external consistency.
*/
struct GetVersionEntry {
WeakRef<MessageSender> connection;
// Protocol-agnostic context (arena-allocated, protocol-specific)
void *protocol_context = nullptr;
// Request arena for response data
Arena request_arena;
// JSON response body (set by persist stage, arena-allocated)
std::string_view response_json;
// Proposed response version
int64_t version;
GetVersionEntry() = default; // Default constructor for variant
explicit GetVersionEntry(WeakRef<MessageSender> conn, void *ctx, Arena arena,
int64_t version)
: connection(std::move(conn)), protocol_context(ctx),
request_arena(std::move(arena)), version(version) {}
};
/** /**
* Pipeline entry for coordinated shutdown of all stages. * Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup. * Flows through all stages to ensure proper cleanup.
@@ -94,5 +120,5 @@ struct ShutdownEntry {
* Pipeline entry variant type used by the commit processing pipeline. * Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately. * Each stage pattern-matches on the variant type to handle appropriately.
*/ */
using PipelineEntry = using PipelineEntry = std::variant<CommitEntry, StatusEntry, HealthCheckEntry,
std::variant<CommitEntry, StatusEntry, HealthCheckEntry, ShutdownEntry>; ShutdownEntry, GetVersionEntry>;