Files
weaseldb/src/http_handler.cpp
Andrew Noyes fac6b8de88 Add test that shows parsing issue
It's meant to show the pipelining issue. I guess we'll solve the
newly-discovered parsing issue first.
2025-09-14 22:24:02 -04:00

1077 lines
39 KiB
C++

#include "http_handler.hpp"
#include <atomic>
#include <cstring>
#include <string>
#include <strings.h>
#include "api_url_parser.hpp"
#include "arena.hpp"
#include "connection.hpp"
#include "cpu_work.hpp"
#include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
auto requests_counter_family = metric::create_counter(
"weaseldb_http_requests_total", "Total http requests");
thread_local auto metrics_counter =
requests_counter_family.create({{"path", "/metrics"}});
// API endpoint request counters
thread_local auto commit_counter =
requests_counter_family.create({{"path", "/v1/commit"}});
thread_local auto status_counter =
requests_counter_family.create({{"path", "/v1/status"}});
thread_local auto version_counter =
requests_counter_family.create({{"path", "/v1/version"}});
thread_local auto ok_counter =
requests_counter_family.create({{"path", "/ok"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
"Memory used by banned request IDs arena")
.create({});
// HttpConnectionState implementation
HttpConnectionState::HttpConnectionState()
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
current_header_value_buf(ArenaStlAllocator<char>(&arena)) {
llhttp_settings_init(&settings);
// Set up llhttp callbacks
settings.on_url = HttpHandler::onUrl;
settings.on_header_field = HttpHandler::onHeaderField;
settings.on_header_field_complete = HttpHandler::onHeaderFieldComplete;
settings.on_header_value = HttpHandler::onHeaderValue;
settings.on_header_value_complete = HttpHandler::onHeaderValueComplete;
settings.on_headers_complete = HttpHandler::onHeadersComplete;
settings.on_body = HttpHandler::onBody;
settings.on_message_complete = HttpHandler::onMessageComplete;
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
}
void HttpConnectionState::reset() {
TRACE_EVENT("http", "reply", perfetto::Flow::Global(http_request_id));
// Reset request-specific state for next HTTP request
method = {};
url = {};
headers_complete = false;
message_complete = false;
connection_close = false;
route = HttpRoute::NotFound;
status_request_id = {};
// Reset header buffers - need to recreate with arena allocator
current_header_field_buf = ArenaString(ArenaStlAllocator<char>(&arena));
current_header_value_buf = ArenaString(ArenaStlAllocator<char>(&arena));
header_field_complete = false;
http_request_id = 0;
// Reset commit parsing state - safe to reset Arena::Ptr objects here
// since request processing is complete
commit_parser.reset();
commit_request.reset();
parsing_commit = false;
basic_validation_passed = false;
// Reset arena memory for next request to prevent memory growth
arena.reset();
// Reset llhttp parser for next request
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
}
// HttpHandler implementation
void HttpHandler::on_connection_established(Connection &conn) {
// Allocate HTTP state using server-provided arena for connection lifecycle
auto *state = new HttpConnectionState();
conn.user_data = state;
}
void HttpHandler::on_connection_closed(Connection &conn) {
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
delete state;
conn.user_data = nullptr;
}
void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
// Collect commit, status, and health check requests for pipeline processing
int pipeline_count = 0;
// Count commit, status, and health check requests
for (auto conn : batch) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Count commit requests that passed basic validation
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
pipeline_count++;
}
// Count status requests
else if (state->route == HttpRoute::GetStatus &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
// Count health check requests
else if (state->route == HttpRoute::GetOk &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
}
// Send requests to 4-stage pipeline in batch. Batching here reduces
// contention on the way into the pipeline.
if (pipeline_count > 0) {
auto guard = commitPipeline.push(pipeline_count, true);
auto out_iter = guard.batch.begin();
for (auto conn : batch) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ =
CommitEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->commit_request.get(),
std::move(state->arena)};
state->reset();
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GetStatus) {
*out_iter++ =
StatusEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->status_request_id,
std::move(state->arena)};
state->reset();
}
// Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) {
*out_iter++ =
HealthCheckEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, std::move(state->arena)};
state->reset();
}
}
}
}
void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
if (!state) {
send_error_response(conn, 500, "Internal server error", Arena{}, 0, true);
return;
}
// TODO: Enforce the configured max_request_size_bytes limit here.
// Should track cumulative bytes received for the current HTTP request
// and send 413 Request Entity Too Large if limit is exceeded.
// This prevents DoS attacks via oversized HTTP requests.
// Parse HTTP data with llhttp
enum llhttp_errno err =
llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) {
send_error_response(conn, 400, "Bad request", std::move(state->arena), 0,
true);
state->reset();
return;
}
// If message is complete, route and handle the request
if (state->message_complete) {
// Use connection state's arena for all request processing
char *url_buffer = state->arena.allocate<char>(state->url.size());
std::memcpy(url_buffer, state->url.data(), state->url.size());
RouteMatch route_match;
auto parse_result = ApiUrlParser::parse(
state->method, const_cast<char *>(state->url.data()),
static_cast<int>(state->url.size()), route_match);
if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding
send_error_response(conn, 400, "Malformed URL encoding",
std::move(state->arena), 0, true);
state->reset();
return;
}
state->route = route_match.route;
// Route to appropriate handler
switch (state->route) {
case HttpRoute::GetVersion:
handle_get_version(conn, *state);
break;
case HttpRoute::PostCommit:
handle_post_commit(conn, *state);
break;
case HttpRoute::GetSubscribe:
handle_get_subscribe(conn, *state);
break;
case HttpRoute::GetStatus:
handle_get_status(conn, *state, route_match);
break;
case HttpRoute::PutRetention:
handle_put_retention(conn, *state, route_match);
break;
case HttpRoute::GetRetention:
handle_get_retention(conn, *state, route_match);
break;
case HttpRoute::DeleteRetention:
handle_delete_retention(conn, *state, route_match);
break;
case HttpRoute::GetMetrics:
handle_get_metrics(conn, *state);
break;
case HttpRoute::GetOk:
handle_get_ok(conn, *state);
break;
case HttpRoute::NotFound:
default:
handle_not_found(conn, *state);
break;
}
}
}
// Route handlers (basic implementations)
void HttpHandler::handle_get_version(Connection &conn,
HttpConnectionState &state) {
version_counter.inc();
send_json_response(
conn, 200,
format(state.arena, R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)),
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_post_commit(Connection &conn,
HttpConnectionState &state) {
commit_counter.inc();
// Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) {
send_error_response(conn, 400, "Parse failed", std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
return;
}
const CommitRequest &commit_request = *state.commit_request;
// Perform basic validation that doesn't require serialization (done on I/O
// threads)
bool valid = true;
std::string_view error_msg;
// Check that we have at least one operation
if (commit_request.operations().empty()) {
valid = false;
error_msg = "Commit request must contain at least one operation";
}
// Check leader_id is not empty
if (valid && commit_request.leader_id().empty()) {
valid = false;
error_msg = "Commit request must specify a leader_id";
}
// Check operations are well-formed
if (valid) {
for (const auto &op : commit_request.operations()) {
if (op.param1.empty()) {
valid = false;
error_msg = "Operation key cannot be empty";
break;
}
if (op.type == Operation::Type::Write && op.param2.empty()) {
valid = false;
error_msg = "Write operation value cannot be empty";
break;
}
}
}
if (!valid) {
send_error_response(conn, 400, error_msg, std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
return;
}
// Basic validation passed - mark for 4-stage pipeline processing
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
// Response will be sent after 4-stage pipeline processing is complete
}
void HttpHandler::handle_get_subscribe(Connection &conn,
HttpConnectionState &state) {
// TODO: Implement subscription streaming
send_json_response(
conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_get_status(Connection &conn,
HttpConnectionState &state,
const RouteMatch &route_match) {
status_counter.inc();
// Status requests are processed through the pipeline
// Response will be generated in the sequence stage
// This handler extracts request_id from query parameters and prepares for
// pipeline processing
const auto &request_id =
route_match.params[static_cast<int>(ApiParameterKey::RequestId)];
if (!request_id) {
send_error_response(
conn, 400, "Missing required query parameter: request_id",
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
return;
}
if (request_id->empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
return;
}
// Store the request_id in the state for the pipeline
state.status_request_id = *request_id;
// Ready for pipeline processing
}
void HttpHandler::handle_put_retention(Connection &conn,
HttpConnectionState &state,
const RouteMatch &) {
// TODO: Parse retention policy from body and store
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
}
void HttpHandler::handle_get_retention(Connection &conn,
HttpConnectionState &state,
const RouteMatch &) {
// TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_delete_retention(Connection &conn,
HttpConnectionState &state,
const RouteMatch &) {
// TODO: Extract policy_id from URL and delete
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
}
void HttpHandler::handle_get_metrics(Connection &conn,
HttpConnectionState &state) {
metrics_counter.inc();
auto metrics_span = metric::render(state.arena);
// Calculate total size for the response body
size_t total_size = 0;
for (const auto &sv : metrics_span) {
total_size += sv.size();
}
auto *http_state = static_cast<HttpConnectionState *>(conn.user_data);
// Build HTTP response headers using arena
std::string_view headers;
if (state.connection_close) {
headers = static_format(
state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: close\r\n", "\r\n");
} else {
headers = static_format(
state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: keep-alive\r\n", "\r\n");
}
auto result =
state.arena.allocate_span<std::string_view>(metrics_span.size() + 1);
auto out = result.begin();
*out++ = headers;
for (auto sv : metrics_span) {
*out++ = sv;
}
conn.append_message(result, std::move(state.arena), state.connection_close);
state.reset();
}
void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) {
ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
// Health check requests are processed through the pipeline
// Response will be generated in the release stage after pipeline processing
}
void HttpHandler::handle_not_found(Connection &conn,
HttpConnectionState &state) {
send_error_response(conn, 404, "Not found", std::move(state.arena),
state.http_request_id, state.connection_close);
}
// HTTP utility methods
void HttpHandler::send_response(MessageSender &conn, int status_code,
std::string_view content_type,
std::string_view body, Arena response_arena,
int64_t http_request_id,
bool close_connection) {
// Status text
std::string_view status_text;
switch (status_code) {
case 200:
status_text = "OK";
break;
case 400:
status_text = "Bad Request";
break;
case 404:
status_text = "Not Found";
break;
case 500:
status_text = "Internal Server Error";
break;
default:
status_text = "Unknown";
break;
}
const char *connection_header = close_connection ? "close" : "keep-alive";
auto response = response_arena.allocate_span<std::string_view>(1);
response[0] =
format(response_arena,
"HTTP/1.1 %d %.*s\r\n"
"Content-Type: %.*s\r\n"
"Content-Length: %zu\r\n"
"X-Response-ID: %ld\r\n"
"Connection: %s\r\n"
"\r\n%.*s",
status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), http_request_id,
connection_header, static_cast<int>(body.size()), body.data());
conn.append_message(response, std::move(response_arena), close_connection);
}
void HttpHandler::send_json_response(MessageSender &conn, int status_code,
std::string_view json,
Arena response_arena,
int64_t http_request_id,
bool close_connection) {
send_response(conn, status_code, "application/json", json,
std::move(response_arena), http_request_id, close_connection);
}
void HttpHandler::send_error_response(MessageSender &conn, int status_code,
std::string_view message,
Arena response_arena,
int64_t http_request_id,
bool close_connection) {
std::string_view json =
format(response_arena, R"({"error":"%.*s"})",
static_cast<int>(message.size()), message.data());
send_json_response(conn, status_code, json, std::move(response_arena),
http_request_id, close_connection);
}
std::span<std::string_view>
HttpHandler::format_response(int status_code, std::string_view content_type,
std::string_view body, Arena &response_arena,
int64_t http_request_id, bool close_connection) {
// Status text
std::string_view status_text;
switch (status_code) {
case 200:
status_text = "OK";
break;
case 400:
status_text = "Bad Request";
break;
case 404:
status_text = "Not Found";
break;
case 500:
status_text = "Internal Server Error";
break;
default:
status_text = "Unknown";
break;
}
const char *connection_header = close_connection ? "close" : "keep-alive";
auto response = response_arena.allocate_span<std::string_view>(1);
response[0] =
format(response_arena,
"HTTP/1.1 %d %.*s\r\n"
"Content-Type: %.*s\r\n"
"Content-Length: %zu\r\n"
"X-Response-ID: %ld\r\n"
"Connection: %s\r\n"
"\r\n%.*s",
status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), http_request_id,
connection_header, static_cast<int>(body.size()), body.data());
return response;
}
std::span<std::string_view> HttpHandler::format_json_response(
int status_code, std::string_view json, Arena &response_arena,
int64_t http_request_id, bool close_connection) {
return format_response(status_code, "application/json", json, response_arena,
http_request_id, close_connection);
}
// llhttp callbacks
int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
// Store URL in arena (simplified - would need to accumulate for streaming)
state->url = std::string_view(at, length);
return 0;
}
int HttpHandler::onHeaderField(llhttp_t *parser, const char *at,
size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
// Accumulate header field data
state->current_header_field_buf.append(at, length);
return 0;
}
int HttpHandler::onHeaderFieldComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->header_field_complete = true;
return 0;
}
int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at,
size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
// Accumulate header value data
state->current_header_value_buf.append(at, length);
return 0;
}
int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
if (!state->header_field_complete) {
// Field is not complete yet, wait
return 0;
}
// Now we have complete header field and value
const auto &field = state->current_header_field_buf;
const auto &value = state->current_header_value_buf;
// Check for Connection header
if (field.size() == 10 && strncasecmp(field.data(), "connection", 10) == 0) {
if (value.size() == 5 && strncasecmp(value.data(), "close", 5) == 0) {
state->connection_close = true;
}
}
// Check for X-Request-Id header
if (field.size() == 12 &&
strncasecmp(field.data(), "x-request-id", 12) == 0) {
int64_t id = 0;
for (char c : value) {
if (c >= '0' && c <= '9') {
id = id * 10 + (c - '0');
}
}
state->http_request_id = id;
}
// Clear buffers for next header
state->current_header_field_buf.clear();
state->current_header_value_buf.clear();
state->header_field_complete = false;
return 0;
}
int HttpHandler::onHeadersComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->headers_complete = true;
// Get HTTP method
const char *method_str =
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
state->method = std::string_view(method_str);
// Check if this looks like a POST to /v1/commit to initialize streaming
// parser
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
// Initialize streaming commit request parsing
state->commit_parser = state->arena.construct<JsonCommitRequestParser>();
state->commit_request = state->arena.construct<CommitRequest>();
state->parsing_commit =
state->commit_parser->begin_streaming_parse(*state->commit_request);
if (!state->parsing_commit) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
if (state->parsing_commit && state->commit_parser) {
// Stream data to commit request parser
auto status =
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->message_complete = true;
if (state->parsing_commit && state->commit_parser) {
// Finish streaming parse
auto status = state->commit_parser->finish_streaming_parse();
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
// Pipeline stage implementations (batch-based)
bool HttpHandler::process_sequence_batch(BatchType &batch) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
// - Request ID banned list management
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
if (!commit_entry.commit_request) {
// Should not happen - basic validation was done on I/O thread
send_error_response(*conn_ref, 500, "Internal server error",
Arena{}, commit_entry.http_request_id, true);
return false;
}
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
if (commit_entry.commit_request &&
commit_entry.commit_request->request_id().has_value()) {
auto commit_request_id =
commit_entry.commit_request->request_id().value();
if (banned_request_ids.find(commit_request_id) !=
banned_request_ids.end()) {
// Request ID is banned, this commit should fail
send_json_response(
*conn_ref, 409,
R"({"status": "not_committed", "error": "request_id_banned"})",
Arena{}, commit_entry.http_request_id,
commit_entry.connection_close);
return false;
}
}
// Assign sequential version number
commit_entry.assigned_version = next_version++;
TRACE_EVENT("http", "sequence_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version
// upper bound
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
if (!status_entry.status_request_id.empty()) {
// Add request_id to banned list - store the string in arena and
// use string_view
std::string_view request_id_view =
banned_request_arena.copy_string(
status_entry.status_request_id);
banned_request_ids.insert(request_id_view);
// Update memory usage metric
banned_request_ids_memory_gauge.set(
banned_request_arena.total_allocated());
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version - 1;
}
TRACE_EVENT("http", "sequence_status",
perfetto::Flow::Global(status_entry.http_request_id));
// TODO: Transfer to status threadpool - for now just respond
// not_committed
send_json_response(*conn_ref, 200, R"({"status": "not_committed"})",
Arena{}, status_entry.http_request_id,
status_entry.connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: noop in sequence stage
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT(
"http", "sequence_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Stage 1: Precondition resolution
// This stage must be serialized to maintain consistent database state view
// - Validate preconditions against current database state
// - Check for conflicts with other transactions
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
if (!commit_entry.commit_request) {
// Skip processing for failed sequence stage
return false;
}
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
TRACE_EVENT("http", "resolve_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: perform configurable CPU work
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT(
"http", "resolve_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_persist_batch(BatchType &batch) {
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
// Check if connection is still alive first
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
// Skip if resolve failed or connection is in error state
if (!commit_entry.commit_request || !commit_entry.resolve_success) {
return false;
}
// Mark as persisted and update committed version high water mark
commit_entry.persist_success = true;
committed_version.store(commit_entry.assigned_version,
std::memory_order_seq_cst);
TRACE_EVENT("http", "persist_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
const CommitRequest &commit_request = *commit_entry.commit_request;
// Generate success response with actual assigned version using
// request arena
std::string_view response;
if (commit_request.request_id().has_value()) {
response = format(
commit_entry.request_arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response = format(
commit_entry.request_arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
// Format response but don't send yet - store for release stage
commit_entry.response_message = format_json_response(
200, response, commit_entry.request_arena,
commit_entry.http_request_id, commit_entry.connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT(
"http", "persist_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Format OK response but don't send yet - store for release stage
health_check_entry.response_message = format_response(
200, "text/plain", "OK", health_check_entry.request_arena,
health_check_entry.http_request_id,
health_check_entry.connection_close);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
// Return connections to server for response transmission
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT("http", "release_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
// Send the response that was formatted in persist stage
conn_ref->append_message(commit_entry.response_message,
std::move(commit_entry.request_arena),
commit_entry.connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT("http", "release_status",
perfetto::Flow::Global(status_entry.http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: return connection to server
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently and increment
// metric
// TODO: Add dropped_pipeline_entries metric
return false; // Skip this entry and continue processing
}
TRACE_EVENT(
"http", "release_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Send the response that was formatted in persist stage
conn_ref->append_message(
health_check_entry.response_message,
std::move(health_check_entry.request_arena),
health_check_entry.connection_close);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}