Consistently use state->arena for http handling

This commit is contained in:
2025-09-14 17:16:05 -04:00
parent 7ef54a2d08
commit 0389fd2c9f
3 changed files with 75 additions and 95 deletions

View File

@@ -159,29 +159,26 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
llhttp_execute(&state->parser, data.data(), data.size()); llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) { if (err != HPE_OK) {
send_error_response(conn, 400, "Bad request", Arena{}, 0, true); send_error_response(conn, 400, "Bad request", std::move(state->arena), 0,
true);
return; return;
} }
// If message is complete, route and handle the request // If message is complete, route and handle the request
if (state->message_complete) { if (state->message_complete) {
// Create request-scoped arena for URL parsing // Use connection state's arena for all request processing
// FIX: request_arena lifetime ends too soon. Should move arena into char *url_buffer = state->arena.allocate<char>(state->url.size());
// individual handlers and propagate it all the way through to
// append_message
Arena request_arena;
char *url_buffer = request_arena.allocate<char>(state->url.size());
std::memcpy(url_buffer, state->url.data(), state->url.size()); std::memcpy(url_buffer, state->url.data(), state->url.size());
RouteMatch route_match; RouteMatch route_match;
auto parse_result = auto parse_result = ApiUrlParser::parse(
ApiUrlParser::parse(state->method, url_buffer, state->method, const_cast<char *>(state->url.data()),
static_cast<int>(state->url.size()), route_match); static_cast<int>(state->url.size()), route_match);
if (parse_result != ParseResult::Success) { if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding // Handle malformed URL encoding
send_error_response(conn, 400, "Malformed URL encoding", Arena{}, 0, send_error_response(conn, 400, "Malformed URL encoding",
true); std::move(state->arena), 0, true);
return; return;
} }
@@ -190,36 +187,35 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
// Route to appropriate handler // Route to appropriate handler
switch (state->route) { switch (state->route) {
case HttpRoute::GetVersion: case HttpRoute::GetVersion:
handle_get_version(conn, *state, std::move(request_arena)); handle_get_version(conn, *state);
break; break;
case HttpRoute::PostCommit: case HttpRoute::PostCommit:
handle_post_commit(conn, *state, std::move(request_arena)); handle_post_commit(conn, *state);
break; break;
case HttpRoute::GetSubscribe: case HttpRoute::GetSubscribe:
handle_get_subscribe(conn, *state, std::move(request_arena)); handle_get_subscribe(conn, *state);
break; break;
case HttpRoute::GetStatus: case HttpRoute::GetStatus:
handle_get_status(conn, *state, route_match, std::move(request_arena)); handle_get_status(conn, *state, route_match);
break; break;
case HttpRoute::PutRetention: case HttpRoute::PutRetention:
handle_put_retention(conn, *state, route_match, std::move(request_arena)); handle_put_retention(conn, *state, route_match);
break; break;
case HttpRoute::GetRetention: case HttpRoute::GetRetention:
handle_get_retention(conn, *state, route_match, std::move(request_arena)); handle_get_retention(conn, *state, route_match);
break; break;
case HttpRoute::DeleteRetention: case HttpRoute::DeleteRetention:
handle_delete_retention(conn, *state, route_match, handle_delete_retention(conn, *state, route_match);
std::move(request_arena));
break; break;
case HttpRoute::GetMetrics: case HttpRoute::GetMetrics:
handle_get_metrics(conn, *state, std::move(request_arena)); handle_get_metrics(conn, *state);
break; break;
case HttpRoute::GetOk: case HttpRoute::GetOk:
handle_get_ok(conn, *state, std::move(request_arena)); handle_get_ok(conn, *state);
break; break;
case HttpRoute::NotFound: case HttpRoute::NotFound:
default: default:
handle_not_found(conn, *state, std::move(request_arena)); handle_not_found(conn, *state);
break; break;
} }
} }
@@ -227,28 +223,21 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
// Route handlers (basic implementations) // Route handlers (basic implementations)
void HttpHandler::handle_get_version(Connection &conn, void HttpHandler::handle_get_version(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state) {
Arena request_arena) {
version_counter.inc(); version_counter.inc();
send_json_response( send_json_response(
conn, 200, conn, 200,
format(request_arena, R"({"version":%ld,"leader":""})", format(state.arena, R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)), this->committed_version.load(std::memory_order_seq_cst)),
std::move(request_arena), state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
} }
void HttpHandler::handle_post_commit(Connection &conn, void HttpHandler::handle_post_commit(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state) {
Arena request_arena) {
commit_counter.inc(); commit_counter.inc();
// Check if streaming parse was successful // Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) { if (!state.commit_request || !state.parsing_commit) {
const char *error = state.commit_parser send_error_response(conn, 400, "Parse failed", std::move(state.arena),
? state.commit_parser->get_parse_error()
: "No parser initialized";
std::string_view error_msg = format(request_arena, "Parse failed: %s",
error ? error : "Unknown error");
send_error_response(conn, 400, error_msg, std::move(request_arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
return; return;
} }
@@ -289,7 +278,7 @@ void HttpHandler::handle_post_commit(Connection &conn,
} }
if (!valid) { if (!valid) {
send_error_response(conn, 400, error_msg, std::move(request_arena), send_error_response(conn, 400, error_msg, std::move(state.arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
return; return;
} }
@@ -300,18 +289,17 @@ void HttpHandler::handle_post_commit(Connection &conn,
} }
void HttpHandler::handle_get_subscribe(Connection &conn, void HttpHandler::handle_get_subscribe(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state) {
Arena) {
// TODO: Implement subscription streaming // TODO: Implement subscription streaming
send_json_response( send_json_response(
conn, 200, conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})", R"({"message":"Subscription endpoint - streaming not yet implemented"})",
Arena{}, state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
} }
void HttpHandler::handle_get_status(Connection &conn, void HttpHandler::handle_get_status(Connection &conn,
HttpConnectionState &state, HttpConnectionState &state,
const RouteMatch &route_match, Arena) { const RouteMatch &route_match) {
status_counter.inc(); status_counter.inc();
// Status requests are processed through the pipeline // Status requests are processed through the pipeline
// Response will be generated in the sequence stage // Response will be generated in the sequence stage
@@ -321,15 +309,16 @@ void HttpHandler::handle_get_status(Connection &conn,
const auto &request_id = const auto &request_id =
route_match.params[static_cast<int>(ApiParameterKey::RequestId)]; route_match.params[static_cast<int>(ApiParameterKey::RequestId)];
if (!request_id) { if (!request_id) {
send_error_response(conn, 400, send_error_response(
"Missing required query parameter: request_id", Arena{}, conn, 400, "Missing required query parameter: request_id",
state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
return; return;
} }
if (request_id->empty()) { if (request_id->empty()) {
send_error_response(conn, 400, "Empty request_id parameter", Arena{}, send_error_response(conn, 400, "Empty request_id parameter",
state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id,
state.connection_close);
return; return;
} }
@@ -340,34 +329,35 @@ void HttpHandler::handle_get_status(Connection &conn,
} }
void HttpHandler::handle_put_retention(Connection &conn, void HttpHandler::handle_put_retention(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state,
const RouteMatch &, Arena) { const RouteMatch &) {
// TODO: Parse retention policy from body and store // TODO: Parse retention policy from body and store
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
Arena{}, state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id,
state.connection_close);
} }
void HttpHandler::handle_get_retention(Connection &conn, void HttpHandler::handle_get_retention(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state,
const RouteMatch &, Arena) { const RouteMatch &) {
// TODO: Extract policy_id from URL or return all policies // TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", Arena{}, send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
} }
void HttpHandler::handle_delete_retention(Connection &conn, void HttpHandler::handle_delete_retention(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state,
const RouteMatch &, Arena) { const RouteMatch &) {
// TODO: Extract policy_id from URL and delete // TODO: Extract policy_id from URL and delete
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
Arena{}, state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id,
state.connection_close);
} }
void HttpHandler::handle_get_metrics(Connection &conn, void HttpHandler::handle_get_metrics(Connection &conn,
const HttpConnectionState &state, HttpConnectionState &state) {
Arena request_arena) {
metrics_counter.inc(); metrics_counter.inc();
auto metrics_span = metric::render(request_arena); auto metrics_span = metric::render(state.arena);
// Calculate total size for the response body // Calculate total size for the response body
size_t total_size = 0; size_t total_size = 0;
@@ -381,14 +371,14 @@ void HttpHandler::handle_get_metrics(Connection &conn,
std::string_view headers; std::string_view headers;
if (state.connection_close) { if (state.connection_close) {
headers = static_format( headers = static_format(
request_arena, "HTTP/1.1 200 OK\r\n", state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id), "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: close\r\n", "\r\n"); "\r\n", "Connection: close\r\n", "\r\n");
} else { } else {
headers = static_format( headers = static_format(
request_arena, "HTTP/1.1 200 OK\r\n", state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id), "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
@@ -396,18 +386,17 @@ void HttpHandler::handle_get_metrics(Connection &conn,
} }
auto result = std::span<std::string_view>{ auto result = std::span<std::string_view>{
request_arena.allocate<std::string_view>(metrics_span.size() + 1), state.arena.allocate<std::string_view>(metrics_span.size() + 1),
metrics_span.size() + 1}; metrics_span.size() + 1};
auto out = result.begin(); auto out = result.begin();
*out++ = headers; *out++ = headers;
for (auto sv : metrics_span) { for (auto sv : metrics_span) {
*out++ = sv; *out++ = sv;
} }
conn.append_message(result, std::move(request_arena)); conn.append_message(result, std::move(state.arena));
} }
void HttpHandler::handle_get_ok(Connection &, const HttpConnectionState &state, void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &state) {
Arena) {
ok_counter.inc(); ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
@@ -416,9 +405,9 @@ void HttpHandler::handle_get_ok(Connection &, const HttpConnectionState &state,
} }
void HttpHandler::handle_not_found(Connection &conn, void HttpHandler::handle_not_found(Connection &conn,
const HttpConnectionState &state, Arena) { HttpConnectionState &state) {
send_error_response(conn, 404, "Not found", Arena{}, state.http_request_id, send_error_response(conn, 404, "Not found", std::move(state.arena),
state.connection_close); state.http_request_id, state.connection_close);
} }
// HTTP utility methods // HTTP utility methods

View File

@@ -25,7 +25,7 @@ struct RouteMatch;
* Manages llhttp parser state and request data. * Manages llhttp parser state and request data.
*/ */
struct HttpConnectionState { struct HttpConnectionState {
Arena arena; // Request-scoped arena for parsing state Arena arena{16 << 10}; // Request-scoped arena for parsing state
llhttp_t parser; llhttp_t parser;
llhttp_settings_t settings; llhttp_settings_t settings;
@@ -168,10 +168,11 @@ private:
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
// (string_views into arena) // (string_views into arena)
constexpr static auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
// Main commit processing pipeline: sequence -> resolve -> persist -> release // Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfStageEmpty, 1, 1, 1, StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1> commitPipeline{
1> lg_size};
commitPipeline{lg_size};
// Pipeline stage threads // Pipeline stage threads
std::thread sequenceThread; std::thread sequenceThread;
@@ -181,36 +182,27 @@ private:
// Pipeline stage processing methods (batch-based) // Pipeline stage processing methods (batch-based)
using BatchType = using BatchType =
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfStageEmpty, 1, 1, StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1>::Batch;
1, 1>::Batch;
bool process_sequence_batch(BatchType &batch); bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch); bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch); bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch); bool process_release_batch(BatchType &batch);
// Route handlers // Route handlers
void handle_get_version(Connection &conn, const HttpConnectionState &state, void handle_get_version(Connection &conn, HttpConnectionState &state);
Arena request_arena); void handle_post_commit(Connection &conn, HttpConnectionState &state);
void handle_post_commit(Connection &conn, const HttpConnectionState &state, void handle_get_subscribe(Connection &conn, HttpConnectionState &state);
Arena request_arena);
void handle_get_subscribe(Connection &conn, const HttpConnectionState &state,
Arena request_arena);
void handle_get_status(Connection &conn, HttpConnectionState &state, void handle_get_status(Connection &conn, HttpConnectionState &state,
const RouteMatch &route_match, Arena request_arena); const RouteMatch &route_match);
void handle_put_retention(Connection &conn, const HttpConnectionState &state, void handle_put_retention(Connection &conn, HttpConnectionState &state,
const RouteMatch &route_match, Arena request_arena); const RouteMatch &route_match);
void handle_get_retention(Connection &conn, const HttpConnectionState &state, void handle_get_retention(Connection &conn, HttpConnectionState &state,
const RouteMatch &route_match, Arena request_arena); const RouteMatch &route_match);
void handle_delete_retention(Connection &conn, void handle_delete_retention(Connection &conn, HttpConnectionState &state,
const HttpConnectionState &state, const RouteMatch &route_match);
const RouteMatch &route_match, void handle_get_metrics(Connection &conn, HttpConnectionState &state);
Arena request_arena); void handle_get_ok(Connection &conn, HttpConnectionState &state);
void handle_get_metrics(Connection &conn, const HttpConnectionState &state, void handle_not_found(Connection &conn, HttpConnectionState &state);
Arena request_arena);
void handle_get_ok(Connection &conn, const HttpConnectionState &state,
Arena request_arena);
void handle_not_found(Connection &conn, const HttpConnectionState &state,
Arena request_arena);
// HTTP utilities // HTTP utilities
static void send_response(MessageSender &conn, int status_code, static void send_response(MessageSender &conn, int status_code,

View File

@@ -10,9 +10,8 @@ interfaces = [
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events # Number of I/O threads for handling connections and network events
io_threads = 8 io_threads = 8
epoll_instances = 8
# Event batch size for epoll processing # Event batch size for epoll processing
event_batch_size = 64 event_batch_size = 128
[commit] [commit]
# Minimum length for request_id to ensure sufficient entropy # Minimum length for request_id to ensure sufficient entropy