Consolidate into one send_ordered_response

This commit is contained in:
2025-09-15 13:01:56 -04:00
parent 6717b70772
commit a67d7a8531
3 changed files with 66 additions and 86 deletions

View File

@@ -115,7 +115,5 @@ public:
* @note Called from this connection's io thread. * @note Called from this connection's io thread.
* @note Called when EPOLLOUT event occurs * @note Called when EPOLLOUT event occurs
*/ */
virtual void virtual void on_preprocess_writes(Connection &, std::span<PendingResponse>) {}
on_preprocess_writes(Connection &conn,
std::span<PendingResponse> pending_responses) {}
}; };

View File

@@ -93,20 +93,9 @@ void HttpHandler::on_preprocess_writes(
status_code, content_type, pending.response_json, pending.arena, status_code, content_type, pending.response_json, pending.arena,
ctx->http_request_id, ctx->connection_close); ctx->http_request_id, ctx->connection_close);
state->ready_responses[ctx->sequence_id] = ResponseData{ state->send_ordered_response(conn, ctx->sequence_id, http_response,
http_response, std::move(pending.arena), ctx->connection_close}; std::move(pending.arena),
} ctx->connection_close);
// Send responses in sequential order
auto iter = state->ready_responses.begin();
while (iter != state->ready_responses.end() &&
iter->first == state->next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter;
conn.append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close);
state->next_sequence_to_send++;
iter = state->ready_responses.erase(iter);
} }
} }
} }
@@ -121,7 +110,7 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
for (auto &req : state->queue) { for (auto &req : state->queue) {
// Assign sequence ID for response ordering // Assign sequence ID for response ordering
int64_t sequence_id = state->next_sequence_id++; int64_t sequence_id = state->get_next_sequence_id();
req.sequence_id = sequence_id; req.sequence_id = sequence_id;
// Create HttpResponseContext for this request // Create HttpResponseContext for this request
@@ -138,30 +127,13 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
static_cast<int>(req.url.size()), route_match); static_cast<int>(req.url.size()), route_match);
if (parse_result != ParseResult::Success) { if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding // Handle malformed URL encoding
// Assign sequence ID for this error response
int64_t error_sequence_id = state->next_sequence_id++;
req.sequence_id = error_sequence_id;
auto json_response = R"({"error":"Malformed URL encoding"})"; auto json_response = R"({"error":"Malformed URL encoding"})";
auto http_response = auto http_response =
format_json_response(400, json_response, req.arena, 0, true); format_json_response(400, json_response, req.arena, 0, true);
state->send_ordered_response(*conn, ctx->sequence_id, http_response,
// Add to reorder queue and process immediately since this is an error std::move(req.arena),
state->ready_responses[error_sequence_id] = ctx->connection_close);
ResponseData{http_response, std::move(req.arena), true};
// Process ready responses in order and send via append_bytes
auto iter = state->ready_responses.begin();
while (iter != state->ready_responses.end() &&
iter->first == state->next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter;
// Send through append_bytes which handles write interest
conn->append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close);
state->next_sequence_to_send++;
iter = state->ready_responses.erase(iter);
}
break; break;
} }
req.route = route_match.route; req.route = route_match.route;
@@ -296,11 +268,10 @@ void HttpHandler::handle_post_commit(Connection &conn,
format_json_response(400, json_response, state.arena, format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{ conn_state->send_ordered_response(conn, state.sequence_id, http_response,
http_response, std::move(state.arena), state.connection_close}; std::move(state.arena),
state.connection_close);
return; return;
} }
@@ -347,11 +318,10 @@ void HttpHandler::handle_post_commit(Connection &conn,
format_json_response(400, json_response, state.arena, format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{ conn_state->send_ordered_response(conn, state.sequence_id, http_response,
http_response, std::move(state.arena), state.connection_close}; std::move(state.arena),
state.connection_close);
return; return;
} }
@@ -369,8 +339,10 @@ void HttpHandler::handle_get_subscribe(Connection &conn,
format_json_response(200, json_response, state.arena, format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
send_ordered_response(conn, state, http_response, std::move(state.arena)); conn_state->send_ordered_response(conn, state.sequence_id, http_response,
std::move(state.arena),
state.connection_close);
} }
void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
@@ -392,8 +364,9 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
// Add directly to response queue with proper sequencing // Add directly to response queue with proper sequencing
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{ conn_state->send_ordered_response(conn, state.sequence_id, http_response,
http_response, std::move(state.arena), state.connection_close}; std::move(state.arena),
state.connection_close);
return; return;
} }
@@ -405,8 +378,9 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
// Add directly to response queue with proper sequencing // Add directly to response queue with proper sequencing
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{ conn_state->send_ordered_response(conn, state.sequence_id, http_response,
http_response, std::move(state.arena), state.connection_close}; std::move(state.arena),
state.connection_close);
return; return;
} }
@@ -426,7 +400,10 @@ void HttpHandler::handle_put_retention(Connection &conn,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering // Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena)); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->send_ordered_response(conn, state.sequence_id, http_response,
std::move(state.arena),
state.connection_close);
} }
void HttpHandler::handle_get_retention(Connection &conn, void HttpHandler::handle_get_retention(Connection &conn,
@@ -439,7 +416,10 @@ void HttpHandler::handle_get_retention(Connection &conn,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering // Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena)); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->send_ordered_response(conn, state.sequence_id, http_response,
std::move(state.arena),
state.connection_close);
} }
void HttpHandler::handle_delete_retention(Connection &conn, void HttpHandler::handle_delete_retention(Connection &conn,
@@ -452,7 +432,10 @@ void HttpHandler::handle_delete_retention(Connection &conn,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering // Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena)); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->send_ordered_response(conn, state.sequence_id, http_response,
std::move(state.arena),
state.connection_close);
} }
void HttpHandler::handle_get_metrics(Connection &conn, void HttpHandler::handle_get_metrics(Connection &conn,
@@ -494,11 +477,10 @@ void HttpHandler::handle_get_metrics(Connection &conn,
*out++ = sv; *out++ = sv;
} }
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = conn_state->send_ordered_response(conn, state.sequence_id, result,
ResponseData{result, std::move(state.arena), state.connection_close}; std::move(state.arena),
state.connection_close);
} }
void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) { void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) {
@@ -514,8 +496,10 @@ void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) {
format_json_response(404, json_response, state.arena, format_json_response(404, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
send_ordered_response(conn, state, http_response, std::move(state.arena)); conn_state->send_ordered_response(conn, state.sequence_id, http_response,
std::move(state.arena),
state.connection_close);
} }
// HTTP utility methods // HTTP utility methods
@@ -587,26 +571,26 @@ void HttpHandler::send_error_response(Connection &conn, int status_code,
http_request_id, close_connection); http_request_id, close_connection);
} }
void HttpHandler::send_ordered_response( void HttpConnectionState::send_ordered_response(
Connection &conn, HttpRequestState &state, Connection &conn, int64_t sequence_id,
std::span<std::string_view> http_response, Arena arena) { std::span<std::string_view> http_response, Arena arena,
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data); bool close_connection) {
// Add to reorder queue with proper sequencing // Add to reorder queue with proper sequencing
conn_state->ready_responses[state.sequence_id] = ready_responses[sequence_id] =
ResponseData{http_response, std::move(arena), state.connection_close}; ResponseData{http_response, std::move(arena), close_connection};
// Process ready responses in order and send via append_bytes // Process ready responses in order and send via append_bytes
auto iter = conn_state->ready_responses.begin(); auto iter = ready_responses.begin();
while (iter != conn_state->ready_responses.end() && while (iter != ready_responses.end() &&
iter->first == conn_state->next_sequence_to_send) { iter->first == next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter; auto &[sequence_id, response_data] = *iter;
// Send through append_bytes which handles write interest // Send through append_bytes which handles write interest
conn.append_bytes(response_data.data, std::move(response_data.arena), conn.append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close); response_data.connection_close);
conn_state->next_sequence_to_send++; next_sequence_to_send++;
iter = conn_state->ready_responses.erase(iter); iter = ready_responses.erase(iter);
} }
} }

View File

@@ -1,13 +1,7 @@
#pragma once #pragma once
#include <atomic>
#include <map> #include <map>
#include <mutex>
#include <optional>
#include <set>
#include <string_view> #include <string_view>
#include <thread>
#include <unordered_set>
#include <llhttp.h> #include <llhttp.h>
@@ -30,6 +24,8 @@ struct RouteMatch;
struct HttpResponseContext { struct HttpResponseContext {
int64_t sequence_id; // For response ordering in pipelining int64_t sequence_id; // For response ordering in pipelining
int64_t http_request_id; // For X-Response-ID header int64_t http_request_id; // For X-Response-ID header
std::string_view content_type;
int status_code;
bool connection_close; // Whether to close connection after response bool connection_close; // Whether to close connection after response
}; };
@@ -91,13 +87,20 @@ struct HttpConnectionState {
HttpRequestState pending; HttpRequestState pending;
std::deque<HttpRequestState> queue; std::deque<HttpRequestState> queue;
int64_t get_next_sequence_id() { return next_sequence_id++; }
HttpConnectionState();
void send_ordered_response(Connection &conn, int64_t sequence_id,
std::span<std::string_view> http_response,
Arena arena, bool close_connection);
private:
// Response ordering for HTTP pipelining // Response ordering for HTTP pipelining
std::map<int64_t, ResponseData> std::map<int64_t, ResponseData>
ready_responses; // sequence_id -> response data ready_responses; // sequence_id -> response data
int64_t next_sequence_to_send = 0; int64_t next_sequence_to_send = 0;
int64_t next_sequence_id = 0; int64_t next_sequence_id = 0;
HttpConnectionState();
}; };
/** /**
@@ -172,9 +175,4 @@ private:
format_json_response(int status_code, std::string_view json, format_json_response(int status_code, std::string_view json,
Arena &response_arena, int64_t http_request_id, Arena &response_arena, int64_t http_request_id,
bool close_connection); bool close_connection);
// Helper function to send response through reorder queue and preprocessing
void send_ordered_response(Connection &conn, HttpRequestState &state,
std::span<std::string_view> http_response,
Arena arena);
}; };