diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 2115a27..2db1eab 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -159,29 +159,26 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { llhttp_execute(&state->parser, data.data(), data.size()); if (err != HPE_OK) { - send_error_response(conn, 400, "Bad request", Arena{}, 0, true); + send_error_response(conn, 400, "Bad request", std::move(state->arena), 0, + true); return; } // If message is complete, route and handle the request if (state->message_complete) { - // Create request-scoped arena for URL parsing - // FIX: request_arena lifetime ends too soon. Should move arena into - // individual handlers and propagate it all the way through to - // append_message - Arena request_arena; - char *url_buffer = request_arena.allocate(state->url.size()); + // Use connection state's arena for all request processing + char *url_buffer = state->arena.allocate(state->url.size()); std::memcpy(url_buffer, state->url.data(), state->url.size()); RouteMatch route_match; - auto parse_result = - ApiUrlParser::parse(state->method, url_buffer, - static_cast(state->url.size()), route_match); + auto parse_result = ApiUrlParser::parse( + state->method, const_cast(state->url.data()), + static_cast(state->url.size()), route_match); if (parse_result != ParseResult::Success) { // Handle malformed URL encoding - send_error_response(conn, 400, "Malformed URL encoding", Arena{}, 0, - true); + send_error_response(conn, 400, "Malformed URL encoding", + std::move(state->arena), 0, true); return; } @@ -190,36 +187,35 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { // Route to appropriate handler switch (state->route) { case HttpRoute::GetVersion: - handle_get_version(conn, *state, std::move(request_arena)); + handle_get_version(conn, *state); break; case HttpRoute::PostCommit: - handle_post_commit(conn, *state, std::move(request_arena)); + handle_post_commit(conn, *state); break; case HttpRoute::GetSubscribe: - handle_get_subscribe(conn, *state, std::move(request_arena)); + handle_get_subscribe(conn, *state); break; case HttpRoute::GetStatus: - handle_get_status(conn, *state, route_match, std::move(request_arena)); + handle_get_status(conn, *state, route_match); break; case HttpRoute::PutRetention: - handle_put_retention(conn, *state, route_match, std::move(request_arena)); + handle_put_retention(conn, *state, route_match); break; case HttpRoute::GetRetention: - handle_get_retention(conn, *state, route_match, std::move(request_arena)); + handle_get_retention(conn, *state, route_match); break; case HttpRoute::DeleteRetention: - handle_delete_retention(conn, *state, route_match, - std::move(request_arena)); + handle_delete_retention(conn, *state, route_match); break; case HttpRoute::GetMetrics: - handle_get_metrics(conn, *state, std::move(request_arena)); + handle_get_metrics(conn, *state); break; case HttpRoute::GetOk: - handle_get_ok(conn, *state, std::move(request_arena)); + handle_get_ok(conn, *state); break; case HttpRoute::NotFound: default: - handle_not_found(conn, *state, std::move(request_arena)); + handle_not_found(conn, *state); break; } } @@ -227,28 +223,21 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { // Route handlers (basic implementations) void HttpHandler::handle_get_version(Connection &conn, - const HttpConnectionState &state, - Arena request_arena) { + HttpConnectionState &state) { version_counter.inc(); send_json_response( conn, 200, - format(request_arena, R"({"version":%ld,"leader":""})", + format(state.arena, R"({"version":%ld,"leader":""})", this->committed_version.load(std::memory_order_seq_cst)), - std::move(request_arena), state.http_request_id, state.connection_close); + std::move(state.arena), state.http_request_id, state.connection_close); } void HttpHandler::handle_post_commit(Connection &conn, - const HttpConnectionState &state, - Arena request_arena) { + HttpConnectionState &state) { commit_counter.inc(); // Check if streaming parse was successful if (!state.commit_request || !state.parsing_commit) { - const char *error = state.commit_parser - ? state.commit_parser->get_parse_error() - : "No parser initialized"; - std::string_view error_msg = format(request_arena, "Parse failed: %s", - error ? error : "Unknown error"); - send_error_response(conn, 400, error_msg, std::move(request_arena), + send_error_response(conn, 400, "Parse failed", std::move(state.arena), state.http_request_id, state.connection_close); return; } @@ -289,7 +278,7 @@ void HttpHandler::handle_post_commit(Connection &conn, } if (!valid) { - send_error_response(conn, 400, error_msg, std::move(request_arena), + send_error_response(conn, 400, error_msg, std::move(state.arena), state.http_request_id, state.connection_close); return; } @@ -300,18 +289,17 @@ void HttpHandler::handle_post_commit(Connection &conn, } void HttpHandler::handle_get_subscribe(Connection &conn, - const HttpConnectionState &state, - Arena) { + HttpConnectionState &state) { // TODO: Implement subscription streaming send_json_response( conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})", - Arena{}, state.http_request_id, state.connection_close); + std::move(state.arena), state.http_request_id, state.connection_close); } void HttpHandler::handle_get_status(Connection &conn, HttpConnectionState &state, - const RouteMatch &route_match, Arena) { + const RouteMatch &route_match) { status_counter.inc(); // Status requests are processed through the pipeline // Response will be generated in the sequence stage @@ -321,15 +309,16 @@ void HttpHandler::handle_get_status(Connection &conn, const auto &request_id = route_match.params[static_cast(ApiParameterKey::RequestId)]; if (!request_id) { - send_error_response(conn, 400, - "Missing required query parameter: request_id", Arena{}, - state.http_request_id, state.connection_close); + send_error_response( + conn, 400, "Missing required query parameter: request_id", + std::move(state.arena), state.http_request_id, state.connection_close); return; } if (request_id->empty()) { - send_error_response(conn, 400, "Empty request_id parameter", Arena{}, - state.http_request_id, state.connection_close); + send_error_response(conn, 400, "Empty request_id parameter", + std::move(state.arena), state.http_request_id, + state.connection_close); return; } @@ -340,34 +329,35 @@ void HttpHandler::handle_get_status(Connection &conn, } void HttpHandler::handle_put_retention(Connection &conn, - const HttpConnectionState &state, - const RouteMatch &, Arena) { + HttpConnectionState &state, + const RouteMatch &) { // TODO: Parse retention policy from body and store send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", - Arena{}, state.http_request_id, state.connection_close); + std::move(state.arena), state.http_request_id, + state.connection_close); } void HttpHandler::handle_get_retention(Connection &conn, - const HttpConnectionState &state, - const RouteMatch &, Arena) { + HttpConnectionState &state, + const RouteMatch &) { // TODO: Extract policy_id from URL or return all policies - send_json_response(conn, 200, R"({"policies":[]})", Arena{}, + send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), state.http_request_id, state.connection_close); } void HttpHandler::handle_delete_retention(Connection &conn, - const HttpConnectionState &state, - const RouteMatch &, Arena) { + HttpConnectionState &state, + const RouteMatch &) { // TODO: Extract policy_id from URL and delete send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", - Arena{}, state.http_request_id, state.connection_close); + std::move(state.arena), state.http_request_id, + state.connection_close); } void HttpHandler::handle_get_metrics(Connection &conn, - const HttpConnectionState &state, - Arena request_arena) { + HttpConnectionState &state) { metrics_counter.inc(); - auto metrics_span = metric::render(request_arena); + auto metrics_span = metric::render(state.arena); // Calculate total size for the response body size_t total_size = 0; @@ -381,14 +371,14 @@ void HttpHandler::handle_get_metrics(Connection &conn, std::string_view headers; if (state.connection_close) { headers = static_format( - request_arena, "HTTP/1.1 200 OK\r\n", + state.arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", "X-Response-ID: ", static_cast(http_state->http_request_id), "\r\n", "Connection: close\r\n", "\r\n"); } else { headers = static_format( - request_arena, "HTTP/1.1 200 OK\r\n", + state.arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", "X-Response-ID: ", static_cast(http_state->http_request_id), @@ -396,18 +386,17 @@ void HttpHandler::handle_get_metrics(Connection &conn, } auto result = std::span{ - request_arena.allocate(metrics_span.size() + 1), + state.arena.allocate(metrics_span.size() + 1), metrics_span.size() + 1}; auto out = result.begin(); *out++ = headers; for (auto sv : metrics_span) { *out++ = sv; } - conn.append_message(result, std::move(request_arena)); + conn.append_message(result, std::move(state.arena)); } -void HttpHandler::handle_get_ok(Connection &, const HttpConnectionState &state, - Arena) { +void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &state) { ok_counter.inc(); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); @@ -416,9 +405,9 @@ void HttpHandler::handle_get_ok(Connection &, const HttpConnectionState &state, } void HttpHandler::handle_not_found(Connection &conn, - const HttpConnectionState &state, Arena) { - send_error_response(conn, 404, "Not found", Arena{}, state.http_request_id, - state.connection_close); + HttpConnectionState &state) { + send_error_response(conn, 404, "Not found", std::move(state.arena), + state.http_request_id, state.connection_close); } // HTTP utility methods diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 09c4b7b..03fd4c4 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -25,7 +25,7 @@ struct RouteMatch; * Manages llhttp parser state and request data. */ struct HttpConnectionState { - Arena arena; // Request-scoped arena for parsing state + Arena arena{16 << 10}; // Request-scoped arena for parsing state llhttp_t parser; llhttp_settings_t settings; @@ -168,10 +168,11 @@ private: BannedRequestIdSet banned_request_ids; // Request IDs that should not commit // (string_views into arena) + constexpr static auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle; + // Main commit processing pipeline: sequence -> resolve -> persist -> release - StaticThreadPipeline - commitPipeline{lg_size}; + StaticThreadPipeline commitPipeline{ + lg_size}; // Pipeline stage threads std::thread sequenceThread; @@ -181,36 +182,27 @@ private: // Pipeline stage processing methods (batch-based) using BatchType = - StaticThreadPipeline::Batch; + StaticThreadPipeline::Batch; bool process_sequence_batch(BatchType &batch); bool process_resolve_batch(BatchType &batch); bool process_persist_batch(BatchType &batch); bool process_release_batch(BatchType &batch); // Route handlers - void handle_get_version(Connection &conn, const HttpConnectionState &state, - Arena request_arena); - void handle_post_commit(Connection &conn, const HttpConnectionState &state, - Arena request_arena); - void handle_get_subscribe(Connection &conn, const HttpConnectionState &state, - Arena request_arena); + void handle_get_version(Connection &conn, HttpConnectionState &state); + void handle_post_commit(Connection &conn, HttpConnectionState &state); + void handle_get_subscribe(Connection &conn, HttpConnectionState &state); void handle_get_status(Connection &conn, HttpConnectionState &state, - const RouteMatch &route_match, Arena request_arena); - void handle_put_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match, Arena request_arena); - void handle_get_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match, Arena request_arena); - void handle_delete_retention(Connection &conn, - const HttpConnectionState &state, - const RouteMatch &route_match, - Arena request_arena); - void handle_get_metrics(Connection &conn, const HttpConnectionState &state, - Arena request_arena); - void handle_get_ok(Connection &conn, const HttpConnectionState &state, - Arena request_arena); - void handle_not_found(Connection &conn, const HttpConnectionState &state, - Arena request_arena); + const RouteMatch &route_match); + void handle_put_retention(Connection &conn, HttpConnectionState &state, + const RouteMatch &route_match); + void handle_get_retention(Connection &conn, HttpConnectionState &state, + const RouteMatch &route_match); + void handle_delete_retention(Connection &conn, HttpConnectionState &state, + const RouteMatch &route_match); + void handle_get_metrics(Connection &conn, HttpConnectionState &state); + void handle_get_ok(Connection &conn, HttpConnectionState &state); + void handle_not_found(Connection &conn, HttpConnectionState &state); // HTTP utilities static void send_response(MessageSender &conn, int status_code, diff --git a/test_benchmark_config.toml b/test_benchmark_config.toml index 81ddfe9..55ae2d6 100644 --- a/test_benchmark_config.toml +++ b/test_benchmark_config.toml @@ -10,9 +10,8 @@ interfaces = [ max_request_size_bytes = 1048576 # 1MB # Number of I/O threads for handling connections and network events io_threads = 8 -epoll_instances = 8 # Event batch size for epoll processing -event_batch_size = 64 +event_batch_size = 128 [commit] # Minimum length for request_id to ensure sufficient entropy