From a67d7a853100015fa256674c815867ac4f41e79f Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 13:01:56 -0400 Subject: [PATCH] Consolidate into one send_ordered_response --- src/connection_handler.hpp | 4 +- src/http_handler.cpp | 122 ++++++++++++++++--------------------- src/http_handler.hpp | 26 ++++---- 3 files changed, 66 insertions(+), 86 deletions(-) diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 28713e2..bbcfefd 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -115,7 +115,5 @@ public: * @note Called from this connection's io thread. * @note Called when EPOLLOUT event occurs */ - virtual void - on_preprocess_writes(Connection &conn, - std::span pending_responses) {} + virtual void on_preprocess_writes(Connection &, std::span) {} }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index ed18d03..181560d 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -93,20 +93,9 @@ void HttpHandler::on_preprocess_writes( status_code, content_type, pending.response_json, pending.arena, ctx->http_request_id, ctx->connection_close); - state->ready_responses[ctx->sequence_id] = ResponseData{ - http_response, std::move(pending.arena), ctx->connection_close}; - } - - // Send responses in sequential order - auto iter = state->ready_responses.begin(); - while (iter != state->ready_responses.end() && - iter->first == state->next_sequence_to_send) { - auto &[sequence_id, response_data] = *iter; - - conn.append_bytes(response_data.data, std::move(response_data.arena), - response_data.connection_close); - state->next_sequence_to_send++; - iter = state->ready_responses.erase(iter); + state->send_ordered_response(conn, ctx->sequence_id, http_response, + std::move(pending.arena), + ctx->connection_close); } } } @@ -121,7 +110,7 @@ void HttpHandler::on_batch_complete(std::span batch) { for (auto &req : state->queue) { // Assign sequence ID for response ordering - int64_t sequence_id = state->next_sequence_id++; + int64_t sequence_id = state->get_next_sequence_id(); req.sequence_id = sequence_id; // Create HttpResponseContext for this request @@ -138,30 +127,13 @@ void HttpHandler::on_batch_complete(std::span batch) { static_cast(req.url.size()), route_match); if (parse_result != ParseResult::Success) { // Handle malformed URL encoding - // Assign sequence ID for this error response - int64_t error_sequence_id = state->next_sequence_id++; - req.sequence_id = error_sequence_id; auto json_response = R"({"error":"Malformed URL encoding"})"; auto http_response = format_json_response(400, json_response, req.arena, 0, true); - - // Add to reorder queue and process immediately since this is an error - state->ready_responses[error_sequence_id] = - ResponseData{http_response, std::move(req.arena), true}; - - // Process ready responses in order and send via append_bytes - auto iter = state->ready_responses.begin(); - while (iter != state->ready_responses.end() && - iter->first == state->next_sequence_to_send) { - auto &[sequence_id, response_data] = *iter; - - // Send through append_bytes which handles write interest - conn->append_bytes(response_data.data, std::move(response_data.arena), - response_data.connection_close); - state->next_sequence_to_send++; - iter = state->ready_responses.erase(iter); - } + state->send_ordered_response(*conn, ctx->sequence_id, http_response, + std::move(req.arena), + ctx->connection_close); break; } req.route = route_match.route; @@ -296,11 +268,10 @@ void HttpHandler::handle_post_commit(Connection &conn, format_json_response(400, json_response, state.arena, state.http_request_id, state.connection_close); - // Add directly to response queue with proper sequencing (no lock needed - - // same I/O thread) auto *conn_state = static_cast(conn.user_data); - conn_state->ready_responses[state.sequence_id] = ResponseData{ - http_response, std::move(state.arena), state.connection_close}; + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); return; } @@ -347,11 +318,10 @@ void HttpHandler::handle_post_commit(Connection &conn, format_json_response(400, json_response, state.arena, state.http_request_id, state.connection_close); - // Add directly to response queue with proper sequencing (no lock needed - - // same I/O thread) auto *conn_state = static_cast(conn.user_data); - conn_state->ready_responses[state.sequence_id] = ResponseData{ - http_response, std::move(state.arena), state.connection_close}; + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); return; } @@ -369,8 +339,10 @@ void HttpHandler::handle_get_subscribe(Connection &conn, format_json_response(200, json_response, state.arena, state.http_request_id, state.connection_close); - // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + auto *conn_state = static_cast(conn.user_data); + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); } void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, @@ -392,8 +364,9 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, // Add directly to response queue with proper sequencing auto *conn_state = static_cast(conn.user_data); - conn_state->ready_responses[state.sequence_id] = ResponseData{ - http_response, std::move(state.arena), state.connection_close}; + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); return; } @@ -405,8 +378,9 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, // Add directly to response queue with proper sequencing auto *conn_state = static_cast(conn.user_data); - conn_state->ready_responses[state.sequence_id] = ResponseData{ - http_response, std::move(state.arena), state.connection_close}; + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); return; } @@ -426,7 +400,10 @@ void HttpHandler::handle_put_retention(Connection &conn, state.http_request_id, state.connection_close); // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + auto *conn_state = static_cast(conn.user_data); + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); } void HttpHandler::handle_get_retention(Connection &conn, @@ -439,7 +416,10 @@ void HttpHandler::handle_get_retention(Connection &conn, state.http_request_id, state.connection_close); // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + auto *conn_state = static_cast(conn.user_data); + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); } void HttpHandler::handle_delete_retention(Connection &conn, @@ -452,7 +432,10 @@ void HttpHandler::handle_delete_retention(Connection &conn, state.http_request_id, state.connection_close); // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + auto *conn_state = static_cast(conn.user_data); + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); } void HttpHandler::handle_get_metrics(Connection &conn, @@ -494,11 +477,10 @@ void HttpHandler::handle_get_metrics(Connection &conn, *out++ = sv; } - // Add directly to response queue with proper sequencing (no lock needed - - // same I/O thread) auto *conn_state = static_cast(conn.user_data); - conn_state->ready_responses[state.sequence_id] = - ResponseData{result, std::move(state.arena), state.connection_close}; + conn_state->send_ordered_response(conn, state.sequence_id, result, + std::move(state.arena), + state.connection_close); } void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) { @@ -514,8 +496,10 @@ void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) { format_json_response(404, json_response, state.arena, state.http_request_id, state.connection_close); - // Send through reorder queue and preprocessing to maintain proper ordering - send_ordered_response(conn, state, http_response, std::move(state.arena)); + auto *conn_state = static_cast(conn.user_data); + conn_state->send_ordered_response(conn, state.sequence_id, http_response, + std::move(state.arena), + state.connection_close); } // HTTP utility methods @@ -587,26 +571,26 @@ void HttpHandler::send_error_response(Connection &conn, int status_code, http_request_id, close_connection); } -void HttpHandler::send_ordered_response( - Connection &conn, HttpRequestState &state, - std::span http_response, Arena arena) { - auto *conn_state = static_cast(conn.user_data); +void HttpConnectionState::send_ordered_response( + Connection &conn, int64_t sequence_id, + std::span http_response, Arena arena, + bool close_connection) { // Add to reorder queue with proper sequencing - conn_state->ready_responses[state.sequence_id] = - ResponseData{http_response, std::move(arena), state.connection_close}; + ready_responses[sequence_id] = + ResponseData{http_response, std::move(arena), close_connection}; // Process ready responses in order and send via append_bytes - auto iter = conn_state->ready_responses.begin(); - while (iter != conn_state->ready_responses.end() && - iter->first == conn_state->next_sequence_to_send) { + auto iter = ready_responses.begin(); + while (iter != ready_responses.end() && + iter->first == next_sequence_to_send) { auto &[sequence_id, response_data] = *iter; // Send through append_bytes which handles write interest conn.append_bytes(response_data.data, std::move(response_data.arena), response_data.connection_close); - conn_state->next_sequence_to_send++; - iter = conn_state->ready_responses.erase(iter); + next_sequence_to_send++; + iter = ready_responses.erase(iter); } } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 7144dff..2c4532f 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -1,13 +1,7 @@ #pragma once -#include #include -#include -#include -#include #include -#include -#include #include @@ -30,7 +24,9 @@ struct RouteMatch; struct HttpResponseContext { int64_t sequence_id; // For response ordering in pipelining int64_t http_request_id; // For X-Response-ID header - bool connection_close; // Whether to close connection after response + std::string_view content_type; + int status_code; + bool connection_close; // Whether to close connection after response }; /** @@ -91,13 +87,20 @@ struct HttpConnectionState { HttpRequestState pending; std::deque queue; + int64_t get_next_sequence_id() { return next_sequence_id++; } + + HttpConnectionState(); + + void send_ordered_response(Connection &conn, int64_t sequence_id, + std::span http_response, + Arena arena, bool close_connection); + +private: // Response ordering for HTTP pipelining std::map ready_responses; // sequence_id -> response data int64_t next_sequence_to_send = 0; int64_t next_sequence_id = 0; - - HttpConnectionState(); }; /** @@ -172,9 +175,4 @@ private: format_json_response(int status_code, std::string_view json, Arena &response_arena, int64_t http_request_id, bool close_connection); - - // Helper function to send response through reorder queue and preprocessing - void send_ordered_response(Connection &conn, HttpRequestState &state, - std::span http_response, - Arena arena); };