diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 90e8147..eb86fe6 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -36,10 +36,7 @@ auto banned_request_ids_memory_gauge = "Memory used by banned request IDs arena") .create({}); -// HttpConnectionState implementation -HttpConnectionState::HttpConnectionState() - : current_header_field_buf(ArenaStlAllocator(&arena)), - current_header_value_buf(ArenaStlAllocator(&arena)) { +HttpConnectionState::HttpConnectionState() { llhttp_settings_init(&settings); // Set up llhttp callbacks @@ -53,40 +50,13 @@ HttpConnectionState::HttpConnectionState() settings.on_message_complete = HttpHandler::onMessageComplete; llhttp_init(&parser, HTTP_REQUEST, &settings); - parser.data = this; + parser.data = &pending; } -void HttpConnectionState::reset() { - TRACE_EVENT("http", "reply", perfetto::Flow::Global(http_request_id)); - // Reset request-specific state for next HTTP request - method = {}; - url = {}; - headers_complete = false; - message_complete = false; - connection_close = false; - route = HttpRoute::NotFound; - status_request_id = {}; - - // Reset header buffers - need to recreate with arena allocator - current_header_field_buf = ArenaString(ArenaStlAllocator(&arena)); - current_header_value_buf = ArenaString(ArenaStlAllocator(&arena)); - header_field_complete = false; - http_request_id = 0; - - // Reset commit parsing state - safe to reset Arena::Ptr objects here - // since request processing is complete - commit_parser.reset(); - commit_request.reset(); - parsing_commit = false; - basic_validation_passed = false; - - // Reset arena memory for next request to prevent memory growth - arena.reset(); - - // Reset llhttp parser for next request - llhttp_init(&parser, HTTP_REQUEST, &settings); - parser.data = this; -} +// HttpConnectionState implementation +HttpRequestState::HttpRequestState() + : current_header_field_buf(ArenaStlAllocator(&arena)), + current_header_value_buf(ArenaStlAllocator(&arena)) {} // HttpHandler implementation void HttpHandler::on_connection_established(Connection &conn) { @@ -101,68 +71,94 @@ void HttpHandler::on_connection_closed(Connection &conn) { conn.user_data = nullptr; } +static thread_local std::vector g_batch_entries; + void HttpHandler::on_batch_complete(std::span batch) { - // Collect commit, status, and health check requests for pipeline processing - int pipeline_count = 0; // Count commit, status, and health check requests for (auto conn : batch) { auto *state = static_cast(conn->user_data); + for (auto &req : state->queue) { - // Count commit requests that passed basic validation - if (state->route == HttpRoute::PostCommit && state->commit_request && - state->parsing_commit && state->basic_validation_passed) { - pipeline_count++; - } - // Count status requests - else if (state->route == HttpRoute::GetStatus && - // Error message not already queued - conn->outgoing_bytes_queued() == 0) { - pipeline_count++; - } - // Count health check requests - else if (state->route == HttpRoute::GetOk && - // Error message not already queued - conn->outgoing_bytes_queued() == 0) { - pipeline_count++; + char *url_buffer = req.arena.allocate(req.url.size()); + std::memcpy(url_buffer, req.url.data(), req.url.size()); + RouteMatch route_match; + auto parse_result = + ApiUrlParser::parse(req.method, const_cast(req.url.data()), + static_cast(req.url.size()), route_match); + if (parse_result != ParseResult::Success) { + // Handle malformed URL encoding + send_error_response(*conn, 400, "Malformed URL encoding", + std::move(req.arena), 0, true); + break; + } + req.route = route_match.route; + // Route to appropriate handler + switch (req.route) { + case HttpRoute::GetVersion: + handle_get_version(*conn, req); + break; + case HttpRoute::PostCommit: + handle_post_commit(*conn, req); + break; + case HttpRoute::GetSubscribe: + handle_get_subscribe(*conn, req); + break; + case HttpRoute::GetStatus: + handle_get_status(*conn, req, route_match); + break; + case HttpRoute::PutRetention: + handle_put_retention(*conn, req, route_match); + break; + case HttpRoute::GetRetention: + handle_get_retention(*conn, req, route_match); + break; + case HttpRoute::DeleteRetention: + handle_delete_retention(*conn, req, route_match); + break; + case HttpRoute::GetMetrics: + handle_get_metrics(*conn, req); + break; + case HttpRoute::GetOk: + handle_get_ok(*conn, req); + break; + case HttpRoute::NotFound: + default: + handle_not_found(*conn, req); + break; + } + + // Create CommitEntry for commit requests + if (req.route == HttpRoute::PostCommit && req.commit_request && + req.parsing_commit && req.basic_validation_passed) { + g_batch_entries.push_back(CommitEntry{ + conn->get_weak_ref(), req.http_request_id, req.connection_close, + req.commit_request.get(), std::move(req.arena)}); + } + // Create StatusEntry for status requests + else if (req.route == HttpRoute::GetStatus) { + g_batch_entries.push_back(StatusEntry{ + conn->get_weak_ref(), req.http_request_id, req.connection_close, + req.status_request_id, std::move(req.arena)}); + } + // Create HealthCheckEntry for health check requests + else if (req.route == HttpRoute::GetOk) { + g_batch_entries.push_back( + HealthCheckEntry{conn->get_weak_ref(), req.http_request_id, + req.connection_close, std::move(req.arena)}); + } } + state->queue.clear(); } // Send requests to 4-stage pipeline in batch. Batching here reduces // contention on the way into the pipeline. - if (pipeline_count > 0) { - auto guard = commitPipeline.push(pipeline_count, true); + if (g_batch_entries.size() > 0) { + auto guard = commitPipeline.push(g_batch_entries.size(), true); auto out_iter = guard.batch.begin(); - - for (auto conn : batch) { - auto *state = static_cast(conn->user_data); - - // Create CommitEntry for commit requests - if (state->route == HttpRoute::PostCommit && state->commit_request && - state->parsing_commit && state->basic_validation_passed) { - *out_iter++ = - CommitEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close, state->commit_request.get(), - std::move(state->arena)}; - state->reset(); - } - // Create StatusEntry for status requests - else if (state->route == HttpRoute::GetStatus) { - *out_iter++ = - StatusEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close, state->status_request_id, - std::move(state->arena)}; - state->reset(); - } - // Create HealthCheckEntry for health check requests - else if (state->route == HttpRoute::GetOk) { - *out_iter++ = - HealthCheckEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close, std::move(state->arena)}; - state->reset(); - } - } + std::move(g_batch_entries.begin(), g_batch_entries.end(), out_iter); } + g_batch_entries.clear(); } void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { @@ -178,94 +174,45 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { // This prevents DoS attacks via oversized HTTP requests. // Parse HTTP data with llhttp - enum llhttp_errno err = - llhttp_execute(&state->parser, data.data(), data.size()); - - if (err != HPE_OK) { - send_error_response(conn, 400, "Bad request", std::move(state->arena), 0, - true); - state->reset(); + for (;;) { + enum llhttp_errno err = + llhttp_execute(&state->parser, data.data(), data.size()); + if (err == HPE_PAUSED) { + assert(state->pending.message_complete); + state->queue.push_back(std::move(state->pending)); + state->pending = {}; + int consumed = llhttp_get_error_pos(&state->parser) - data.data(); + data = data.substr(consumed, data.size() - consumed); + llhttp_resume(&state->parser); + continue; + } + if (err == HPE_OK) { + break; + } + send_error_response(conn, 400, "Bad request", + std::move(state->pending.arena), 0, true); return; } - - // If message is complete, route and handle the request - if (state->message_complete) { - // Use connection state's arena for all request processing - char *url_buffer = state->arena.allocate(state->url.size()); - std::memcpy(url_buffer, state->url.data(), state->url.size()); - - RouteMatch route_match; - auto parse_result = ApiUrlParser::parse( - state->method, const_cast(state->url.data()), - static_cast(state->url.size()), route_match); - - if (parse_result != ParseResult::Success) { - // Handle malformed URL encoding - send_error_response(conn, 400, "Malformed URL encoding", - std::move(state->arena), 0, true); - state->reset(); - return; - } - - state->route = route_match.route; - - // Route to appropriate handler - switch (state->route) { - case HttpRoute::GetVersion: - handle_get_version(conn, *state); - break; - case HttpRoute::PostCommit: - handle_post_commit(conn, *state); - break; - case HttpRoute::GetSubscribe: - handle_get_subscribe(conn, *state); - break; - case HttpRoute::GetStatus: - handle_get_status(conn, *state, route_match); - break; - case HttpRoute::PutRetention: - handle_put_retention(conn, *state, route_match); - break; - case HttpRoute::GetRetention: - handle_get_retention(conn, *state, route_match); - break; - case HttpRoute::DeleteRetention: - handle_delete_retention(conn, *state, route_match); - break; - case HttpRoute::GetMetrics: - handle_get_metrics(conn, *state); - break; - case HttpRoute::GetOk: - handle_get_ok(conn, *state); - break; - case HttpRoute::NotFound: - default: - handle_not_found(conn, *state); - break; - } - } } // Route handlers (basic implementations) void HttpHandler::handle_get_version(Connection &conn, - HttpConnectionState &state) { + HttpRequestState &state) { version_counter.inc(); send_json_response( conn, 200, format(state.arena, R"({"version":%ld,"leader":""})", this->committed_version.load(std::memory_order_seq_cst)), std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); } void HttpHandler::handle_post_commit(Connection &conn, - HttpConnectionState &state) { + HttpRequestState &state) { commit_counter.inc(); // Check if streaming parse was successful if (!state.commit_request || !state.parsing_commit) { send_error_response(conn, 400, "Parse failed", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); return; } @@ -307,27 +254,24 @@ void HttpHandler::handle_post_commit(Connection &conn, if (!valid) { send_error_response(conn, 400, error_msg, std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); return; } // Basic validation passed - mark for 4-stage pipeline processing - const_cast(state).basic_validation_passed = true; + const_cast(state).basic_validation_passed = true; // Response will be sent after 4-stage pipeline processing is complete } void HttpHandler::handle_get_subscribe(Connection &conn, - HttpConnectionState &state) { + HttpRequestState &state) { // TODO: Implement subscription streaming send_json_response( conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); } -void HttpHandler::handle_get_status(Connection &conn, - HttpConnectionState &state, +void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, const RouteMatch &route_match) { status_counter.inc(); // Status requests are processed through the pipeline @@ -341,7 +285,6 @@ void HttpHandler::handle_get_status(Connection &conn, send_error_response( conn, 400, "Missing required query parameter: request_id", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); return; } @@ -349,7 +292,6 @@ void HttpHandler::handle_get_status(Connection &conn, send_error_response(conn, 400, "Empty request_id parameter", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); return; } @@ -360,36 +302,33 @@ void HttpHandler::handle_get_status(Connection &conn, } void HttpHandler::handle_put_retention(Connection &conn, - HttpConnectionState &state, + HttpRequestState &state, const RouteMatch &) { // TODO: Parse retention policy from body and store send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); } void HttpHandler::handle_get_retention(Connection &conn, - HttpConnectionState &state, + HttpRequestState &state, const RouteMatch &) { // TODO: Extract policy_id from URL or return all policies send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); } void HttpHandler::handle_delete_retention(Connection &conn, - HttpConnectionState &state, + HttpRequestState &state, const RouteMatch &) { // TODO: Extract policy_id from URL and delete send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", std::move(state.arena), state.http_request_id, state.connection_close); - state.reset(); } void HttpHandler::handle_get_metrics(Connection &conn, - HttpConnectionState &state) { + HttpRequestState &state) { metrics_counter.inc(); auto metrics_span = metric::render(state.arena); @@ -399,8 +338,6 @@ void HttpHandler::handle_get_metrics(Connection &conn, total_size += sv.size(); } - auto *http_state = static_cast(conn.user_data); - // Build HTTP response headers using arena std::string_view headers; if (state.connection_close) { @@ -408,15 +345,15 @@ void HttpHandler::handle_get_metrics(Connection &conn, state.arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", - "X-Response-ID: ", static_cast(http_state->http_request_id), - "\r\n", "Connection: close\r\n", "\r\n"); + "X-Response-ID: ", static_cast(state.http_request_id), "\r\n", + "Connection: close\r\n", "\r\n"); } else { headers = static_format( state.arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", - "X-Response-ID: ", static_cast(http_state->http_request_id), - "\r\n", "Connection: keep-alive\r\n", "\r\n"); + "X-Response-ID: ", static_cast(state.http_request_id), "\r\n", + "Connection: keep-alive\r\n", "\r\n"); } auto result = @@ -427,10 +364,9 @@ void HttpHandler::handle_get_metrics(Connection &conn, *out++ = sv; } conn.append_message(result, std::move(state.arena), state.connection_close); - state.reset(); } -void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) { +void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) { ok_counter.inc(); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); @@ -438,8 +374,7 @@ void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) { // Response will be generated in the release stage after pipeline processing } -void HttpHandler::handle_not_found(Connection &conn, - HttpConnectionState &state) { +void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) { send_error_response(conn, 404, "Not found", std::move(state.arena), state.http_request_id, state.connection_close); } @@ -566,7 +501,7 @@ std::span HttpHandler::format_json_response( // llhttp callbacks int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); // Store URL in arena (simplified - would need to accumulate for streaming) state->url = std::string_view(at, length); return 0; @@ -574,28 +509,28 @@ int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) { int HttpHandler::onHeaderField(llhttp_t *parser, const char *at, size_t length) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); // Accumulate header field data state->current_header_field_buf.append(at, length); return 0; } int HttpHandler::onHeaderFieldComplete(llhttp_t *parser) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); state->header_field_complete = true; return 0; } int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at, size_t length) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); // Accumulate header value data state->current_header_value_buf.append(at, length); return 0; } int HttpHandler::onHeaderValueComplete(llhttp_t *parser) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); if (!state->header_field_complete) { // Field is not complete yet, wait @@ -634,7 +569,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) { } int HttpHandler::onHeadersComplete(llhttp_t *parser) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); state->headers_complete = true; // Get HTTP method @@ -661,7 +596,7 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) { } int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); if (state->parsing_commit && state->commit_parser) { // Stream data to commit request parser @@ -676,18 +611,9 @@ int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) { } int HttpHandler::onMessageComplete(llhttp_t *parser) { - auto *state = static_cast(parser->data); + auto *state = static_cast(parser->data); state->message_complete = true; - - if (state->parsing_commit && state->commit_parser) { - // Finish streaming parse - auto status = state->commit_parser->finish_streaming_parse(); - if (status == CommitRequestParser::ParseStatus::Error) { - return -1; // Signal parsing error to llhttp - } - } - - return 0; + return HPE_PAUSED; } // Pipeline stage implementations (batch-based) diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 0b38b88..c35090b 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -24,10 +24,8 @@ struct RouteMatch; * HTTP connection state stored in Connection::user_data. * Manages llhttp parser state and request data. */ -struct HttpConnectionState { +struct HttpRequestState { Arena arena{16 << 10}; // Request-scoped arena for parsing state - llhttp_t parser; - llhttp_settings_t settings; // Current request data (arena-allocated) std::string_view method; @@ -59,8 +57,17 @@ struct HttpConnectionState { bool basic_validation_passed = false; // Set to true if basic validation passes + HttpRequestState(); +}; + +struct HttpConnectionState { + llhttp_t parser; + llhttp_settings_t settings; + + HttpRequestState pending; + std::deque queue; + HttpConnectionState(); - void reset(); // Reset state for next HTTP request (keeps arena) }; /** @@ -189,20 +196,20 @@ private: bool process_release_batch(BatchType &batch); // Route handlers - void handle_get_version(Connection &conn, HttpConnectionState &state); - void handle_post_commit(Connection &conn, HttpConnectionState &state); - void handle_get_subscribe(Connection &conn, HttpConnectionState &state); - void handle_get_status(Connection &conn, HttpConnectionState &state, + void handle_get_version(Connection &conn, HttpRequestState &state); + void handle_post_commit(Connection &conn, HttpRequestState &state); + void handle_get_subscribe(Connection &conn, HttpRequestState &state); + void handle_get_status(Connection &conn, HttpRequestState &state, const RouteMatch &route_match); - void handle_put_retention(Connection &conn, HttpConnectionState &state, + void handle_put_retention(Connection &conn, HttpRequestState &state, const RouteMatch &route_match); - void handle_get_retention(Connection &conn, HttpConnectionState &state, + void handle_get_retention(Connection &conn, HttpRequestState &state, const RouteMatch &route_match); - void handle_delete_retention(Connection &conn, HttpConnectionState &state, + void handle_delete_retention(Connection &conn, HttpRequestState &state, const RouteMatch &route_match); - void handle_get_metrics(Connection &conn, HttpConnectionState &state); - void handle_get_ok(Connection &conn, HttpConnectionState &state); - void handle_not_found(Connection &conn, HttpConnectionState &state); + void handle_get_metrics(Connection &conn, HttpRequestState &state); + void handle_get_ok(Connection &conn, HttpRequestState &state); + void handle_not_found(Connection &conn, HttpRequestState &state); // HTTP utilities static void send_response(MessageSender &conn, int status_code,