Separate HttpRequestState and HttpConnectionState

Now HttpConnectionState has a queue of HttpRequestState
This commit is contained in:
2025-09-14 23:49:32 -04:00
parent fac6b8de88
commit 022a79bf5b
2 changed files with 143 additions and 210 deletions

View File

@@ -36,10 +36,7 @@ auto banned_request_ids_memory_gauge =
"Memory used by banned request IDs arena")
.create({});
// HttpConnectionState implementation
HttpConnectionState::HttpConnectionState()
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
current_header_value_buf(ArenaStlAllocator<char>(&arena)) {
HttpConnectionState::HttpConnectionState() {
llhttp_settings_init(&settings);
// Set up llhttp callbacks
@@ -53,40 +50,13 @@ HttpConnectionState::HttpConnectionState()
settings.on_message_complete = HttpHandler::onMessageComplete;
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
parser.data = &pending;
}
void HttpConnectionState::reset() {
TRACE_EVENT("http", "reply", perfetto::Flow::Global(http_request_id));
// Reset request-specific state for next HTTP request
method = {};
url = {};
headers_complete = false;
message_complete = false;
connection_close = false;
route = HttpRoute::NotFound;
status_request_id = {};
// Reset header buffers - need to recreate with arena allocator
current_header_field_buf = ArenaString(ArenaStlAllocator<char>(&arena));
current_header_value_buf = ArenaString(ArenaStlAllocator<char>(&arena));
header_field_complete = false;
http_request_id = 0;
// Reset commit parsing state - safe to reset Arena::Ptr objects here
// since request processing is complete
commit_parser.reset();
commit_request.reset();
parsing_commit = false;
basic_validation_passed = false;
// Reset arena memory for next request to prevent memory growth
arena.reset();
// Reset llhttp parser for next request
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
}
// HttpConnectionState implementation
HttpRequestState::HttpRequestState()
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
current_header_value_buf(ArenaStlAllocator<char>(&arena)) {}
// HttpHandler implementation
void HttpHandler::on_connection_established(Connection &conn) {
@@ -101,68 +71,94 @@ void HttpHandler::on_connection_closed(Connection &conn) {
conn.user_data = nullptr;
}
static thread_local std::vector<PipelineEntry> g_batch_entries;
void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
// Collect commit, status, and health check requests for pipeline processing
int pipeline_count = 0;
// Count commit, status, and health check requests
for (auto conn : batch) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
for (auto &req : state->queue) {
// Count commit requests that passed basic validation
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
pipeline_count++;
char *url_buffer = req.arena.allocate<char>(req.url.size());
std::memcpy(url_buffer, req.url.data(), req.url.size());
RouteMatch route_match;
auto parse_result =
ApiUrlParser::parse(req.method, const_cast<char *>(req.url.data()),
static_cast<int>(req.url.size()), route_match);
if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding
send_error_response(*conn, 400, "Malformed URL encoding",
std::move(req.arena), 0, true);
break;
}
// Count status requests
else if (state->route == HttpRoute::GetStatus &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
req.route = route_match.route;
// Route to appropriate handler
switch (req.route) {
case HttpRoute::GetVersion:
handle_get_version(*conn, req);
break;
case HttpRoute::PostCommit:
handle_post_commit(*conn, req);
break;
case HttpRoute::GetSubscribe:
handle_get_subscribe(*conn, req);
break;
case HttpRoute::GetStatus:
handle_get_status(*conn, req, route_match);
break;
case HttpRoute::PutRetention:
handle_put_retention(*conn, req, route_match);
break;
case HttpRoute::GetRetention:
handle_get_retention(*conn, req, route_match);
break;
case HttpRoute::DeleteRetention:
handle_delete_retention(*conn, req, route_match);
break;
case HttpRoute::GetMetrics:
handle_get_metrics(*conn, req);
break;
case HttpRoute::GetOk:
handle_get_ok(*conn, req);
break;
case HttpRoute::NotFound:
default:
handle_not_found(*conn, req);
break;
}
// Count health check requests
else if (state->route == HttpRoute::GetOk &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
// Create CommitEntry for commit requests
if (req.route == HttpRoute::PostCommit && req.commit_request &&
req.parsing_commit && req.basic_validation_passed) {
g_batch_entries.push_back(CommitEntry{
conn->get_weak_ref(), req.http_request_id, req.connection_close,
req.commit_request.get(), std::move(req.arena)});
}
// Create StatusEntry for status requests
else if (req.route == HttpRoute::GetStatus) {
g_batch_entries.push_back(StatusEntry{
conn->get_weak_ref(), req.http_request_id, req.connection_close,
req.status_request_id, std::move(req.arena)});
}
// Create HealthCheckEntry for health check requests
else if (req.route == HttpRoute::GetOk) {
g_batch_entries.push_back(
HealthCheckEntry{conn->get_weak_ref(), req.http_request_id,
req.connection_close, std::move(req.arena)});
}
}
state->queue.clear();
}
// Send requests to 4-stage pipeline in batch. Batching here reduces
// contention on the way into the pipeline.
if (pipeline_count > 0) {
auto guard = commitPipeline.push(pipeline_count, true);
if (g_batch_entries.size() > 0) {
auto guard = commitPipeline.push(g_batch_entries.size(), true);
auto out_iter = guard.batch.begin();
for (auto conn : batch) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ =
CommitEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->commit_request.get(),
std::move(state->arena)};
state->reset();
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GetStatus) {
*out_iter++ =
StatusEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->status_request_id,
std::move(state->arena)};
state->reset();
}
// Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) {
*out_iter++ =
HealthCheckEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, std::move(state->arena)};
state->reset();
}
}
std::move(g_batch_entries.begin(), g_batch_entries.end(), out_iter);
}
g_batch_entries.clear();
}
void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
@@ -178,94 +174,45 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
// This prevents DoS attacks via oversized HTTP requests.
// Parse HTTP data with llhttp
for (;;) {
enum llhttp_errno err =
llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) {
send_error_response(conn, 400, "Bad request", std::move(state->arena), 0,
true);
state->reset();
if (err == HPE_PAUSED) {
assert(state->pending.message_complete);
state->queue.push_back(std::move(state->pending));
state->pending = {};
int consumed = llhttp_get_error_pos(&state->parser) - data.data();
data = data.substr(consumed, data.size() - consumed);
llhttp_resume(&state->parser);
continue;
}
if (err == HPE_OK) {
break;
}
send_error_response(conn, 400, "Bad request",
std::move(state->pending.arena), 0, true);
return;
}
// If message is complete, route and handle the request
if (state->message_complete) {
// Use connection state's arena for all request processing
char *url_buffer = state->arena.allocate<char>(state->url.size());
std::memcpy(url_buffer, state->url.data(), state->url.size());
RouteMatch route_match;
auto parse_result = ApiUrlParser::parse(
state->method, const_cast<char *>(state->url.data()),
static_cast<int>(state->url.size()), route_match);
if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding
send_error_response(conn, 400, "Malformed URL encoding",
std::move(state->arena), 0, true);
state->reset();
return;
}
state->route = route_match.route;
// Route to appropriate handler
switch (state->route) {
case HttpRoute::GetVersion:
handle_get_version(conn, *state);
break;
case HttpRoute::PostCommit:
handle_post_commit(conn, *state);
break;
case HttpRoute::GetSubscribe:
handle_get_subscribe(conn, *state);
break;
case HttpRoute::GetStatus:
handle_get_status(conn, *state, route_match);
break;
case HttpRoute::PutRetention:
handle_put_retention(conn, *state, route_match);
break;
case HttpRoute::GetRetention:
handle_get_retention(conn, *state, route_match);
break;
case HttpRoute::DeleteRetention:
handle_delete_retention(conn, *state, route_match);
break;
case HttpRoute::GetMetrics:
handle_get_metrics(conn, *state);
break;
case HttpRoute::GetOk:
handle_get_ok(conn, *state);
break;
case HttpRoute::NotFound:
default:
handle_not_found(conn, *state);
break;
}
}
}
// Route handlers (basic implementations)
void HttpHandler::handle_get_version(Connection &conn,
HttpConnectionState &state) {
HttpRequestState &state) {
version_counter.inc();
send_json_response(
conn, 200,
format(state.arena, R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)),
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_post_commit(Connection &conn,
HttpConnectionState &state) {
HttpRequestState &state) {
commit_counter.inc();
// Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) {
send_error_response(conn, 400, "Parse failed", std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
return;
}
@@ -307,27 +254,24 @@ void HttpHandler::handle_post_commit(Connection &conn,
if (!valid) {
send_error_response(conn, 400, error_msg, std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
return;
}
// Basic validation passed - mark for 4-stage pipeline processing
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
const_cast<HttpRequestState &>(state).basic_validation_passed = true;
// Response will be sent after 4-stage pipeline processing is complete
}
void HttpHandler::handle_get_subscribe(Connection &conn,
HttpConnectionState &state) {
HttpRequestState &state) {
// TODO: Implement subscription streaming
send_json_response(
conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_get_status(Connection &conn,
HttpConnectionState &state,
void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
const RouteMatch &route_match) {
status_counter.inc();
// Status requests are processed through the pipeline
@@ -341,7 +285,6 @@ void HttpHandler::handle_get_status(Connection &conn,
send_error_response(
conn, 400, "Missing required query parameter: request_id",
std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
return;
}
@@ -349,7 +292,6 @@ void HttpHandler::handle_get_status(Connection &conn,
send_error_response(conn, 400, "Empty request_id parameter",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
return;
}
@@ -360,36 +302,33 @@ void HttpHandler::handle_get_status(Connection &conn,
}
void HttpHandler::handle_put_retention(Connection &conn,
HttpConnectionState &state,
HttpRequestState &state,
const RouteMatch &) {
// TODO: Parse retention policy from body and store
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
}
void HttpHandler::handle_get_retention(Connection &conn,
HttpConnectionState &state,
HttpRequestState &state,
const RouteMatch &) {
// TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena),
state.http_request_id, state.connection_close);
state.reset();
}
void HttpHandler::handle_delete_retention(Connection &conn,
HttpConnectionState &state,
HttpRequestState &state,
const RouteMatch &) {
// TODO: Extract policy_id from URL and delete
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
std::move(state.arena), state.http_request_id,
state.connection_close);
state.reset();
}
void HttpHandler::handle_get_metrics(Connection &conn,
HttpConnectionState &state) {
HttpRequestState &state) {
metrics_counter.inc();
auto metrics_span = metric::render(state.arena);
@@ -399,8 +338,6 @@ void HttpHandler::handle_get_metrics(Connection &conn,
total_size += sv.size();
}
auto *http_state = static_cast<HttpConnectionState *>(conn.user_data);
// Build HTTP response headers using arena
std::string_view headers;
if (state.connection_close) {
@@ -408,15 +345,15 @@ void HttpHandler::handle_get_metrics(Connection &conn,
state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: close\r\n", "\r\n");
"X-Response-ID: ", static_cast<int64_t>(state.http_request_id), "\r\n",
"Connection: close\r\n", "\r\n");
} else {
headers = static_format(
state.arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: keep-alive\r\n", "\r\n");
"X-Response-ID: ", static_cast<int64_t>(state.http_request_id), "\r\n",
"Connection: keep-alive\r\n", "\r\n");
}
auto result =
@@ -427,10 +364,9 @@ void HttpHandler::handle_get_metrics(Connection &conn,
*out++ = sv;
}
conn.append_message(result, std::move(state.arena), state.connection_close);
state.reset();
}
void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) {
void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) {
ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
@@ -438,8 +374,7 @@ void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) {
// Response will be generated in the release stage after pipeline processing
}
void HttpHandler::handle_not_found(Connection &conn,
HttpConnectionState &state) {
void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) {
send_error_response(conn, 404, "Not found", std::move(state.arena),
state.http_request_id, state.connection_close);
}
@@ -566,7 +501,7 @@ std::span<std::string_view> HttpHandler::format_json_response(
// llhttp callbacks
int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
// Store URL in arena (simplified - would need to accumulate for streaming)
state->url = std::string_view(at, length);
return 0;
@@ -574,28 +509,28 @@ int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
int HttpHandler::onHeaderField(llhttp_t *parser, const char *at,
size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
// Accumulate header field data
state->current_header_field_buf.append(at, length);
return 0;
}
int HttpHandler::onHeaderFieldComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
state->header_field_complete = true;
return 0;
}
int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at,
size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
// Accumulate header value data
state->current_header_value_buf.append(at, length);
return 0;
}
int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
if (!state->header_field_complete) {
// Field is not complete yet, wait
@@ -634,7 +569,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
}
int HttpHandler::onHeadersComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
state->headers_complete = true;
// Get HTTP method
@@ -661,7 +596,7 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
}
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
if (state->parsing_commit && state->commit_parser) {
// Stream data to commit request parser
@@ -676,18 +611,9 @@ int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
}
int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
auto *state = static_cast<HttpRequestState *>(parser->data);
state->message_complete = true;
if (state->parsing_commit && state->commit_parser) {
// Finish streaming parse
auto status = state->commit_parser->finish_streaming_parse();
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
return HPE_PAUSED;
}
// Pipeline stage implementations (batch-based)

View File

@@ -24,10 +24,8 @@ struct RouteMatch;
* HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data.
*/
struct HttpConnectionState {
struct HttpRequestState {
Arena arena{16 << 10}; // Request-scoped arena for parsing state
llhttp_t parser;
llhttp_settings_t settings;
// Current request data (arena-allocated)
std::string_view method;
@@ -59,8 +57,17 @@ struct HttpConnectionState {
bool basic_validation_passed =
false; // Set to true if basic validation passes
HttpRequestState();
};
struct HttpConnectionState {
llhttp_t parser;
llhttp_settings_t settings;
HttpRequestState pending;
std::deque<HttpRequestState> queue;
HttpConnectionState();
void reset(); // Reset state for next HTTP request (keeps arena)
};
/**
@@ -189,20 +196,20 @@ private:
bool process_release_batch(BatchType &batch);
// Route handlers
void handle_get_version(Connection &conn, HttpConnectionState &state);
void handle_post_commit(Connection &conn, HttpConnectionState &state);
void handle_get_subscribe(Connection &conn, HttpConnectionState &state);
void handle_get_status(Connection &conn, HttpConnectionState &state,
void handle_get_version(Connection &conn, HttpRequestState &state);
void handle_post_commit(Connection &conn, HttpRequestState &state);
void handle_get_subscribe(Connection &conn, HttpRequestState &state);
void handle_get_status(Connection &conn, HttpRequestState &state,
const RouteMatch &route_match);
void handle_put_retention(Connection &conn, HttpConnectionState &state,
void handle_put_retention(Connection &conn, HttpRequestState &state,
const RouteMatch &route_match);
void handle_get_retention(Connection &conn, HttpConnectionState &state,
void handle_get_retention(Connection &conn, HttpRequestState &state,
const RouteMatch &route_match);
void handle_delete_retention(Connection &conn, HttpConnectionState &state,
void handle_delete_retention(Connection &conn, HttpRequestState &state,
const RouteMatch &route_match);
void handle_get_metrics(Connection &conn, HttpConnectionState &state);
void handle_get_ok(Connection &conn, HttpConnectionState &state);
void handle_not_found(Connection &conn, HttpConnectionState &state);
void handle_get_metrics(Connection &conn, HttpRequestState &state);
void handle_get_ok(Connection &conn, HttpRequestState &state);
void handle_not_found(Connection &conn, HttpRequestState &state);
// HTTP utilities
static void send_response(MessageSender &conn, int status_code,