This commit is contained in:
2025-09-15 10:28:17 -04:00
parent ec2ad27e33
commit 1b220d0d1c
8 changed files with 479 additions and 193 deletions

View File

@@ -87,8 +87,8 @@ void Connection::close() {
connections_active.dec(); connections_active.dec();
} }
// May be called off the io thread! // Called from I/O thread only
void Connection::append_message(std::span<std::string_view> data_parts, void Connection::append_bytes(std::span<std::string_view> data_parts,
Arena arena, bool close_after_send) { Arena arena, bool close_after_send) {
// Calculate total bytes for this message. Don't need to hold the lock yet. // Calculate total bytes for this message. Don't need to hold the lock yet.
size_t total_bytes = 0; size_t total_bytes = 0;
@@ -124,6 +124,30 @@ void Connection::append_message(std::span<std::string_view> data_parts,
} }
} }
void Connection::send_response(void *protocol_context,
std::string_view response_json, Arena arena) {
std::unique_lock lock(mutex_);
// Store response in queue for protocol handler processing
pending_response_queue_.emplace_back(
PendingResponse{protocol_context, response_json, std::move(arena)});
// Mark that we have pending responses and trigger epoll interest
if (!has_pending_responses_) {
has_pending_responses_ = true;
auto server = server_.lock();
if (fd_ >= 0 && server) {
// Add EPOLLOUT interest to trigger on_preprocess_writes
struct epoll_event event;
event.data.fd = fd_;
event.events = EPOLLIN | EPOLLOUT;
tsan_release();
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
}
}
}
int Connection::readBytes(char *buf, size_t buffer_size) { int Connection::readBytes(char *buf, size_t buffer_size) {
int r; int r;
for (;;) { for (;;) {

View File

@@ -30,28 +30,26 @@ struct Server;
*/ */
struct MessageSender { struct MessageSender {
/** /**
* @brief Append message data to connection's outgoing message queue. * @brief Send response with protocol-specific context for ordering.
* *
* Thread-safe method that can be called from any thread, including * Thread-safe method for pipeline threads to send responses back to clients.
* pipeline processing threads. The arena is moved into the connection * Delegates to the connection's protocol handler for ordering logic.
* to maintain data lifetime until the message is sent. Messages appended * The protocol handler may queue the response or send it immediately.
* concurrently may be written in either order, but they will not be
* interleaved.
* *
* @param data_parts Span of string_view parts to send (arena-allocated) * @param protocol_context Arena-allocated protocol-specific context
* @param arena Arena containing the memory for data_parts string_views * @param data Response data parts (may be empty for deferred serialization)
* @param close_after_send Whether to close connection after sending * @param arena Arena containing response data and context
* *
* Example usage: * Example usage:
* ```cpp * ```cpp
* auto response_parts = arena.allocate_span<std::string_view>(2); * auto* ctx = arena.allocate<HttpResponseContext>();
* response_parts[0] = "HTTP/1.1 200 OK\r\n\r\n"; * ctx->sequence_id = 42;
* response_parts[1] = "Hello World"; * auto response_data = format_response(arena);
* conn.append_message(response_parts, std::move(arena)); * conn.send_response(ctx, response_data, std::move(arena));
* ``` * ```
*/ */
virtual void append_message(std::span<std::string_view> data_parts, virtual void send_response(void *protocol_context,
Arena arena, bool close_after_send = false) = 0; std::string_view response_json, Arena arena) = 0;
virtual ~MessageSender() = default; virtual ~MessageSender() = default;
}; };
@@ -104,31 +102,33 @@ struct Connection : MessageSender {
* @brief Queue an atomic message to be sent to the client. * @brief Queue an atomic message to be sent to the client.
* *
* Adds a complete message with all associated data to the connection's * Adds a complete message with all associated data to the connection's
* outgoing message queue. The message will be sent asynchronously by a * outgoing byte queue with guaranteed ordering.
* server I/O thread using efficient vectored I/O. *
* I/O thread only method for protocol handlers to queue bytes for sending.
* Bytes are queued in order and sent using efficient vectored I/O.
* *
* @param data_parts Span of string_views pointing to arena-allocated data * @param data_parts Span of string_views pointing to arena-allocated data
* @param arena Arena that owns all the memory referenced by data_parts * @param arena Arena that owns all the memory referenced by data_parts
* @param close_after_send Whether to close connection after sending this * @param close_after_send Whether to close connection after sending
* message
*
* @note Thread Safety: This method is thread-safe and can be called
* concurrently from multiple pipeline threads.
* *
* @note Thread Safety: Must be called from I/O thread only.
* @note Ordering: Bytes are sent in the order calls are made.
* @note The memory referenced by the data_parts span, must outlive @p arena. * @note The memory referenced by the data_parts span, must outlive @p arena.
* The arena will be moved and kept alive until the message is fully sent.
* *
* Example usage: * Example usage (from ConnectionHandler::on_preprocess_writes):
* ```cpp * ```cpp
* Arena arena; * Arena arena;
* auto parts = arena.allocate_span<std::string_view>(2); * auto parts = arena.allocate_span<std::string_view>(2);
* parts[0] = build_header(arena); * parts[0] = build_header(arena);
* parts[1] = build_body(arena); * parts[1] = build_body(arena);
* conn.append_message({parts, 2}, std::move(arena)); * conn.append_bytes({parts, 2}, std::move(arena), false);
* ``` * ```
*/ */
void append_message(std::span<std::string_view> data_parts, Arena arena, void append_bytes(std::span<std::string_view> data_parts, Arena arena,
bool close_after_send) override; bool close_after_send);
void send_response(void *protocol_context, std::string_view response_json,
Arena arena) override;
/** /**
* @brief Get a WeakRef to this connection for async operations. * @brief Get a WeakRef to this connection for async operations.
@@ -342,6 +342,10 @@ private:
// mutex_, but if non-empty mutex_ can be // mutex_, but if non-empty mutex_ can be
// dropped while server accesses existing elements. // dropped while server accesses existing elements.
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
bool has_pending_responses_{
false}; // True if protocol handler has responses to process
std::deque<PendingResponse>
pending_response_queue_; // Responses awaiting protocol processing
// Set to a negative number in `close` // Set to a negative number in `close`
int fd_; int fd_;

View File

@@ -3,9 +3,22 @@
#include <span> #include <span>
#include <string_view> #include <string_view>
// Forward declaration to avoid circular dependency // Forward declarations to avoid circular dependency
struct Connection; struct Connection;
// Include Arena header since PendingResponse uses Arena by value
#include "arena.hpp"
/**
* Represents a response queued by pipeline threads for protocol processing.
* Contains JSON response data that can be wrapped by any protocol.
*/
struct PendingResponse {
void *protocol_context; // Arena-allocated protocol-specific context
std::string_view response_json; // JSON response body (arena-allocated)
Arena arena; // Arena containing response data and context
};
/** /**
* Abstract interface for handling connection data processing. * Abstract interface for handling connection data processing.
* *
@@ -89,4 +102,20 @@ public:
* @note Called from this connection's io thread. * @note Called from this connection's io thread.
*/ */
virtual void on_batch_complete(std::span<Connection *const> /*batch*/) {} virtual void on_batch_complete(std::span<Connection *const> /*batch*/) {}
/**
* Called before processing outgoing writes on a connection.
*
* This hook allows protocol handlers to process queued responses
* before actual socket writes occur. Used for response ordering,
* serialization, and other preprocessing.
*
* @param conn Connection about to write data
* @param pending_responses Responses queued by pipeline threads
* @note Called from this connection's io thread.
* @note Called when EPOLLOUT event occurs
*/
virtual void
on_preprocess_writes(Connection &conn,
std::span<PendingResponse> pending_responses) {}
}; };

View File

@@ -71,6 +71,69 @@ void HttpHandler::on_connection_closed(Connection &conn) {
conn.user_data = nullptr; conn.user_data = nullptr;
} }
void HttpHandler::on_preprocess_writes(
Connection &conn, std::span<PendingResponse> pending_responses) {
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
// Process incoming responses and add to reorder queue
{
std::lock_guard lock(state->response_queue_mutex);
for (auto &pending : pending_responses) {
auto *ctx = static_cast<HttpResponseContext *>(pending.protocol_context);
printf(
"Processing response: sequence_id=%ld, request_id=%ld, json='%.*s'\n",
ctx->sequence_id, ctx->http_request_id,
(int)pending.response_json.size(), pending.response_json.data());
// Determine HTTP status code and content type from response content
int status_code = 200;
std::string_view content_type = "application/json";
// For health checks, detect plain text responses
if (pending.response_json == "OK") {
content_type = "text/plain";
}
// For metrics, detect Prometheus format (starts with # or contains metric
// names)
else if (pending.response_json.starts_with("#") ||
pending.response_json.find("_total") != std::string_view::npos ||
pending.response_json.find("_counter") !=
std::string_view::npos) {
content_type = "text/plain; version=0.0.4";
}
// Format HTTP response from JSON
auto http_response = format_response(
status_code, content_type, pending.response_json, pending.arena,
ctx->http_request_id, ctx->connection_close);
printf("Adding response to queue: sequence_id=%ld\n", ctx->sequence_id);
state->ready_responses[ctx->sequence_id] = ResponseData{
http_response, std::move(pending.arena), ctx->connection_close};
}
// Send responses in sequential order
printf("Checking for sequential responses, next_sequence_to_send=%ld\n",
state->next_sequence_to_send);
auto iter = state->ready_responses.begin();
while (iter != state->ready_responses.end() &&
iter->first == state->next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter;
printf("Sending response: sequence_id=%ld\n", sequence_id);
conn.append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close);
state->next_sequence_to_send++;
iter = state->ready_responses.erase(iter);
}
printf("After processing, next_sequence_to_send=%ld, "
"ready_responses.size()=%zu\n",
state->next_sequence_to_send, state->ready_responses.size());
}
}
static thread_local std::vector<PipelineEntry> g_batch_entries; static thread_local std::vector<PipelineEntry> g_batch_entries;
void HttpHandler::on_batch_complete(std::span<Connection *const> batch) { void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
@@ -80,6 +143,16 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data); auto *state = static_cast<HttpConnectionState *>(conn->user_data);
for (auto &req : state->queue) { for (auto &req : state->queue) {
// Assign sequence ID for response ordering
int64_t sequence_id = state->next_sequence_id++;
req.sequence_id = sequence_id;
// Create HttpResponseContext for this request
auto *ctx = req.arena.allocate<HttpResponseContext>(1);
ctx->sequence_id = sequence_id;
ctx->http_request_id = req.http_request_id;
ctx->connection_close = req.connection_close;
char *url_buffer = req.arena.allocate<char>(req.url.size()); char *url_buffer = req.arena.allocate<char>(req.url.size());
std::memcpy(url_buffer, req.url.data(), req.url.size()); std::memcpy(url_buffer, req.url.data(), req.url.size());
RouteMatch route_match; RouteMatch route_match;
@@ -88,8 +161,30 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
static_cast<int>(req.url.size()), route_match); static_cast<int>(req.url.size()), route_match);
if (parse_result != ParseResult::Success) { if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding // Handle malformed URL encoding
send_error_response(*conn, 400, "Malformed URL encoding", // Assign sequence ID for this error response
std::move(req.arena), 0, true); int64_t error_sequence_id = state->next_sequence_id++;
req.sequence_id = error_sequence_id;
auto json_response = R"({"error":"Malformed URL encoding"})";
auto http_response =
format_json_response(400, json_response, req.arena, 0, true);
// Add to reorder queue and process immediately since this is an error
state->ready_responses[error_sequence_id] =
ResponseData{http_response, std::move(req.arena), true};
// Process ready responses in order and send via append_bytes
auto iter = state->ready_responses.begin();
while (iter != state->ready_responses.end() &&
iter->first == state->next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter;
// Send through append_bytes which handles write interest
conn->append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close);
state->next_sequence_to_send++;
iter = state->ready_responses.erase(iter);
}
break; break;
} }
req.route = route_match.route; req.route = route_match.route;
@@ -131,21 +226,20 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
// Create CommitEntry for commit requests // Create CommitEntry for commit requests
if (req.route == HttpRoute::PostCommit && req.commit_request && if (req.route == HttpRoute::PostCommit && req.commit_request &&
req.parsing_commit && req.basic_validation_passed) { req.parsing_commit && req.basic_validation_passed) {
g_batch_entries.push_back(CommitEntry{ g_batch_entries.emplace_back(CommitEntry(conn->get_weak_ref(), ctx,
conn->get_weak_ref(), req.http_request_id, req.connection_close, req.commit_request.get(),
req.commit_request.get(), std::move(req.arena)}); std::move(req.arena)));
} }
// Create StatusEntry for status requests // Create StatusEntry for status requests
else if (req.route == HttpRoute::GetStatus) { else if (req.route == HttpRoute::GetStatus) {
g_batch_entries.push_back(StatusEntry{ g_batch_entries.emplace_back(StatusEntry(conn->get_weak_ref(), ctx,
conn->get_weak_ref(), req.http_request_id, req.connection_close, req.status_request_id,
req.status_request_id, std::move(req.arena)}); std::move(req.arena)));
} }
// Create HealthCheckEntry for health check requests // Create HealthCheckEntry for health check requests
else if (req.route == HttpRoute::GetOk) { else if (req.route == HttpRoute::GetOk) {
g_batch_entries.push_back( g_batch_entries.emplace_back(
HealthCheckEntry{conn->get_weak_ref(), req.http_request_id, HealthCheckEntry(conn->get_weak_ref(), ctx, std::move(req.arena)));
req.connection_close, std::move(req.arena)});
} }
} }
state->queue.clear(); state->queue.clear();
@@ -155,8 +249,8 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
// contention on the way into the pipeline. // contention on the way into the pipeline.
if (g_batch_entries.size() > 0) { if (g_batch_entries.size() > 0) {
auto guard = commitPipeline.push(g_batch_entries.size(), true); auto guard = commitPipeline.push(g_batch_entries.size(), true);
auto out_iter = guard.batch.begin(); std::move(g_batch_entries.begin(), g_batch_entries.end(),
std::move(g_batch_entries.begin(), g_batch_entries.end(), out_iter); guard.batch.begin());
} }
g_batch_entries.clear(); g_batch_entries.clear();
} }
@@ -164,7 +258,13 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
auto *state = static_cast<HttpConnectionState *>(conn.user_data); auto *state = static_cast<HttpConnectionState *>(conn.user_data);
if (!state) { if (!state) {
send_error_response(conn, 500, "Internal server error", Arena{}, 0, true); // Create a temporary arena and add error response directly to avoid
// sequence issues
Arena error_arena;
auto json_response = R"({"error":"Internal server error"})";
auto http_response =
format_json_response(500, json_response, error_arena, 0, true);
conn.append_bytes(http_response, std::move(error_arena), true);
return; return;
} }
@@ -189,8 +289,12 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
if (err == HPE_OK) { if (err == HPE_OK) {
break; break;
} }
send_error_response(conn, 400, "Bad request", // Parse error - send response directly since this is before sequence
std::move(state->pending.arena), 0, true); // assignment
auto json_response = R"({"error":"Bad request"})";
auto http_response =
format_json_response(400, json_response, state->pending.arena, 0, true);
conn.append_bytes(http_response, std::move(state->pending.arena), true);
return; return;
} }
} }
@@ -199,11 +303,19 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
void HttpHandler::handle_get_version(Connection &conn, void HttpHandler::handle_get_version(Connection &conn,
HttpRequestState &state) { HttpRequestState &state) {
version_counter.inc(); version_counter.inc();
send_json_response(
conn, 200, // Generate JSON response
auto json_response =
format(state.arena, R"({"version":%ld,"leader":""})", format(state.arena, R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)), this->committed_version.load(std::memory_order_seq_cst));
std::move(state.arena), state.http_request_id, state.connection_close);
// Format HTTP response
auto http_response =
format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_post_commit(Connection &conn, void HttpHandler::handle_post_commit(Connection &conn,
@@ -211,8 +323,16 @@ void HttpHandler::handle_post_commit(Connection &conn,
commit_counter.inc(); commit_counter.inc();
// Check if streaming parse was successful // Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) { if (!state.commit_request || !state.parsing_commit) {
send_error_response(conn, 400, "Parse failed", std::move(state.arena), auto json_response = R"({"error":"Parse failed"})";
auto http_response =
format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{
http_response, std::move(state.arena), state.connection_close};
return; return;
} }
@@ -252,8 +372,18 @@ void HttpHandler::handle_post_commit(Connection &conn,
} }
if (!valid) { if (!valid) {
send_error_response(conn, 400, error_msg, std::move(state.arena), auto json_response =
format(state.arena, R"({"error":"%.*s"})",
static_cast<int>(error_msg.size()), error_msg.data());
auto http_response =
format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{
http_response, std::move(state.arena), state.connection_close};
return; return;
} }
@@ -265,10 +395,14 @@ void HttpHandler::handle_post_commit(Connection &conn,
void HttpHandler::handle_get_subscribe(Connection &conn, void HttpHandler::handle_get_subscribe(Connection &conn,
HttpRequestState &state) { HttpRequestState &state) {
// TODO: Implement subscription streaming // TODO: Implement subscription streaming
send_json_response( auto json_response =
conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})";
R"({"message":"Subscription endpoint - streaming not yet implemented"})", auto http_response =
std::move(state.arena), state.http_request_id, state.connection_close); format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
@@ -282,16 +416,29 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state,
const auto &request_id = const auto &request_id =
route_match.params[static_cast<int>(ApiParameterKey::RequestId)]; route_match.params[static_cast<int>(ApiParameterKey::RequestId)];
if (!request_id) { if (!request_id) {
send_error_response( auto json_response =
conn, 400, "Missing required query parameter: request_id", R"({"error":"Missing required query parameter: request_id"})";
std::move(state.arena), state.http_request_id, state.connection_close); auto http_response =
format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{
http_response, std::move(state.arena), state.connection_close};
return; return;
} }
if (request_id->empty()) { if (request_id->empty()) {
send_error_response(conn, 400, "Empty request_id parameter", auto json_response = R"({"error":"Empty request_id parameter"})";
std::move(state.arena), state.http_request_id, auto http_response =
state.connection_close); format_json_response(400, json_response, state.arena,
state.http_request_id, state.connection_close);
// Add directly to response queue with proper sequencing
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] = ResponseData{
http_response, std::move(state.arena), state.connection_close};
return; return;
} }
@@ -305,26 +452,39 @@ void HttpHandler::handle_put_retention(Connection &conn,
HttpRequestState &state, HttpRequestState &state,
const RouteMatch &) { const RouteMatch &) {
// TODO: Parse retention policy from body and store // TODO: Parse retention policy from body and store
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", auto json_response = R"({"policy_id":"example","status":"created"})";
std::move(state.arena), state.http_request_id, auto http_response =
state.connection_close); format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_get_retention(Connection &conn, void HttpHandler::handle_get_retention(Connection &conn,
HttpRequestState &state, HttpRequestState &state,
const RouteMatch &) { const RouteMatch &) {
// TODO: Extract policy_id from URL or return all policies // TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), auto json_response = R"({"policies":[]})";
auto http_response =
format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_delete_retention(Connection &conn, void HttpHandler::handle_delete_retention(Connection &conn,
HttpRequestState &state, HttpRequestState &state,
const RouteMatch &) { const RouteMatch &) {
// TODO: Extract policy_id from URL and delete // TODO: Extract policy_id from URL and delete
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", auto json_response = R"({"policy_id":"example","status":"deleted"})";
std::move(state.arena), state.http_request_id, auto http_response =
state.connection_close); format_json_response(200, json_response, state.arena,
state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
void HttpHandler::handle_get_metrics(Connection &conn, void HttpHandler::handle_get_metrics(Connection &conn,
@@ -338,7 +498,12 @@ void HttpHandler::handle_get_metrics(Connection &conn,
total_size += sv.size(); total_size += sv.size();
} }
// Build HTTP response headers using arena // Build HTTP response with metrics data
auto result =
state.arena.allocate_span<std::string_view>(metrics_span.size() + 1);
auto out = result.begin();
// Build HTTP headers
std::string_view headers; std::string_view headers;
if (state.connection_close) { if (state.connection_close) {
headers = static_format( headers = static_format(
@@ -356,31 +521,37 @@ void HttpHandler::handle_get_metrics(Connection &conn,
"Connection: keep-alive\r\n", "\r\n"); "Connection: keep-alive\r\n", "\r\n");
} }
auto result =
state.arena.allocate_span<std::string_view>(metrics_span.size() + 1);
auto out = result.begin();
*out++ = headers; *out++ = headers;
for (auto sv : metrics_span) { for (auto sv : metrics_span) {
*out++ = sv; *out++ = sv;
} }
conn.append_message(result, std::move(state.arena), state.connection_close);
// Add directly to response queue with proper sequencing (no lock needed -
// same I/O thread)
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
conn_state->ready_responses[state.sequence_id] =
ResponseData{result, std::move(state.arena), state.connection_close};
} }
void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) { void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) {
ok_counter.inc(); ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
// Health check requests are processed through the pipeline // Health check requests are processed through the pipeline
// Response will be generated in the release stage after pipeline processing // Response will be generated in the release stage after pipeline processing
} }
void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) { void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) {
send_error_response(conn, 404, "Not found", std::move(state.arena), auto json_response = R"({"error":"Not found"})";
auto http_response =
format_json_response(404, json_response, state.arena,
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
// Send through reorder queue and preprocessing to maintain proper ordering
send_ordered_response(conn, state, http_response, std::move(state.arena));
} }
// HTTP utility methods // HTTP utility methods
void HttpHandler::send_response(MessageSender &conn, int status_code, void HttpHandler::send_response(Connection &conn, int status_code,
std::string_view content_type, std::string_view content_type,
std::string_view body, Arena response_arena, std::string_view body, Arena response_arena,
int64_t http_request_id, int64_t http_request_id,
@@ -423,10 +594,10 @@ void HttpHandler::send_response(MessageSender &conn, int status_code,
content_type.data(), body.size(), http_request_id, content_type.data(), body.size(), http_request_id,
connection_header, static_cast<int>(body.size()), body.data()); connection_header, static_cast<int>(body.size()), body.data());
conn.append_message(response, std::move(response_arena), close_connection); conn.append_bytes(response, std::move(response_arena), close_connection);
} }
void HttpHandler::send_json_response(MessageSender &conn, int status_code, void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json, std::string_view json,
Arena response_arena, Arena response_arena,
int64_t http_request_id, int64_t http_request_id,
@@ -435,7 +606,7 @@ void HttpHandler::send_json_response(MessageSender &conn, int status_code,
std::move(response_arena), http_request_id, close_connection); std::move(response_arena), http_request_id, close_connection);
} }
void HttpHandler::send_error_response(MessageSender &conn, int status_code, void HttpHandler::send_error_response(Connection &conn, int status_code,
std::string_view message, std::string_view message,
Arena response_arena, Arena response_arena,
int64_t http_request_id, int64_t http_request_id,
@@ -448,6 +619,29 @@ void HttpHandler::send_error_response(MessageSender &conn, int status_code,
http_request_id, close_connection); http_request_id, close_connection);
} }
void HttpHandler::send_ordered_response(
Connection &conn, HttpRequestState &state,
std::span<std::string_view> http_response, Arena arena) {
auto *conn_state = static_cast<HttpConnectionState *>(conn.user_data);
// Add to reorder queue with proper sequencing
conn_state->ready_responses[state.sequence_id] =
ResponseData{http_response, std::move(arena), state.connection_close};
// Process ready responses in order and send via append_bytes
auto iter = conn_state->ready_responses.begin();
while (iter != conn_state->ready_responses.end() &&
iter->first == conn_state->next_sequence_to_send) {
auto &[sequence_id, response_data] = *iter;
// Send through append_bytes which handles write interest
conn.append_bytes(response_data.data, std::move(response_data.arena),
response_data.connection_close);
conn_state->next_sequence_to_send++;
iter = conn_state->ready_responses.erase(iter);
}
}
std::span<std::string_view> std::span<std::string_view>
HttpHandler::format_response(int status_code, std::string_view content_type, HttpHandler::format_response(int status_code, std::string_view content_type,
std::string_view body, Arena &response_arena, std::string_view body, Arena &response_arena,
@@ -644,8 +838,9 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
if (!commit_entry.commit_request) { if (!commit_entry.commit_request) {
// Should not happen - basic validation was done on I/O thread // Should not happen - basic validation was done on I/O thread
send_error_response(*conn_ref, 500, "Internal server error", conn_ref->send_response(commit_entry.protocol_context,
Arena{}, commit_entry.http_request_id, true); R"({"error":"Internal server error"})",
Arena{});
return false; return false;
} }
@@ -658,11 +853,10 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
if (banned_request_ids.find(commit_request_id) != if (banned_request_ids.find(commit_request_id) !=
banned_request_ids.end()) { banned_request_ids.end()) {
// Request ID is banned, this commit should fail // Request ID is banned, this commit should fail
send_json_response( conn_ref->send_response(
*conn_ref, 409, commit_entry.protocol_context,
R"({"status": "not_committed", "error": "request_id_banned"})", R"({"status": "not_committed", "error": "request_id_banned"})",
Arena{}, commit_entry.http_request_id, Arena{});
commit_entry.connection_close);
return false; return false;
} }
} }
@@ -670,9 +864,6 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
// Assign sequential version number // Assign sequential version number
commit_entry.assigned_version = next_version++; commit_entry.assigned_version = next_version++;
TRACE_EVENT("http", "sequence_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) { } else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version // Process status entry: add request_id to banned list, get version
@@ -702,14 +893,8 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
status_entry.version_upper_bound = next_version - 1; status_entry.version_upper_bound = next_version - 1;
} }
TRACE_EVENT("http", "sequence_status", // TODO: Transfer to status threadpool - for now mark as processed
perfetto::Flow::Global(status_entry.http_request_id)); // Response will be generated in persist stage
// TODO: Transfer to status threadpool - for now just respond
// not_committed
send_json_response(*conn_ref, 200, R"({"status": "not_committed"})",
Arena{}, status_entry.http_request_id,
status_entry.connection_close);
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) { } else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
@@ -723,10 +908,6 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT(
"http", "sequence_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
return false; // Continue processing return false; // Continue processing
} }
@@ -775,9 +956,6 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Accept all commits (simplified implementation) // Accept all commits (simplified implementation)
commit_entry.resolve_success = true; commit_entry.resolve_success = true;
TRACE_EVENT("http", "resolve_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) { } else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage // Status entries are not processed in resolve stage
@@ -794,10 +972,6 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT(
"http", "resolve_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Perform configurable CPU-intensive work for benchmarking // Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
@@ -850,37 +1024,46 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
committed_version.store(commit_entry.assigned_version, committed_version.store(commit_entry.assigned_version,
std::memory_order_seq_cst); std::memory_order_seq_cst);
TRACE_EVENT("http", "persist_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
const CommitRequest &commit_request = *commit_entry.commit_request; const CommitRequest &commit_request = *commit_entry.commit_request;
// Generate success response with actual assigned version using // Generate success JSON response with actual assigned version
// request arena std::string_view response_json;
std::string_view response;
if (commit_request.request_id().has_value()) { if (commit_request.request_id().has_value()) {
response = format( response_json = format(
commit_entry.request_arena, commit_entry.request_arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()), static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(), commit_request.request_id().value().data(),
commit_entry.assigned_version); commit_entry.assigned_version);
} else { } else {
response = format( response_json = format(
commit_entry.request_arena, commit_entry.request_arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})", R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version); commit_entry.assigned_version);
} }
// Format response but don't send yet - store for release stage // Store JSON response in arena for release stage
commit_entry.response_message = format_json_response( char *json_buffer =
200, response, commit_entry.request_arena, commit_entry.request_arena.template allocate<char>(
commit_entry.http_request_id, commit_entry.connection_close); response_json.size());
std::memcpy(json_buffer, response_json.data(),
response_json.size());
commit_entry.response_json =
std::string_view(json_buffer, response_json.size());
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) { } else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in persist stage // Process status entry: generate not_committed response
// They were already handled in sequence stage auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false;
}
// Store JSON response for release stage
status_entry.response_json = R"({"status": "not_committed"})";
return false; return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) { } else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response // Process health check entry: generate OK response
@@ -893,15 +1076,8 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT( // Store plain text "OK" response for release stage
"http", "persist_health_check", health_check_entry.response_json = "OK";
perfetto::Flow::Global(health_check_entry.http_request_id));
// Format OK response but don't send yet - store for release stage
health_check_entry.response_message = format_response(
200, "text/plain", "OK", health_check_entry.request_arena,
health_check_entry.http_request_id,
health_check_entry.connection_close);
return false; // Continue processing return false; // Continue processing
} }
@@ -941,13 +1117,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT("http", "release_commit", // Send the JSON response using protocol-agnostic interface
perfetto::Flow::Global(commit_entry.http_request_id)); // HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(commit_entry.protocol_context,
// Send the response that was formatted in persist stage commit_entry.response_json,
conn_ref->append_message(commit_entry.response_message, std::move(commit_entry.request_arena));
std::move(commit_entry.request_arena),
commit_entry.connection_close);
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) { } else if constexpr (std::is_same_v<T, StatusEntry>) {
@@ -961,8 +1135,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT("http", "release_status", // Send the JSON response using protocol-agnostic interface
perfetto::Flow::Global(status_entry.http_request_id)); // HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(status_entry.protocol_context,
status_entry.response_json,
std::move(status_entry.request_arena));
return false; // Continue processing return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) { } else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
@@ -976,15 +1153,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
return false; // Skip this entry and continue processing return false; // Skip this entry and continue processing
} }
TRACE_EVENT( // Send the response using protocol-agnostic interface
"http", "release_health_check", // HTTP formatting will happen in on_preprocess_writes()
perfetto::Flow::Global(health_check_entry.http_request_id)); conn_ref->send_response(
health_check_entry.protocol_context,
// Send the response that was formatted in persist stage health_check_entry.response_json,
conn_ref->append_message( std::move(health_check_entry.request_arena));
health_check_entry.response_message,
std::move(health_check_entry.request_arena),
health_check_entry.connection_close);
return false; // Continue processing return false; // Continue processing
} }

View File

@@ -1,6 +1,10 @@
#pragma once #pragma once
#include <atomic> #include <atomic>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <string_view> #include <string_view>
#include <thread> #include <thread>
#include <unordered_set> #include <unordered_set>
@@ -20,6 +24,26 @@ struct CommitRequest;
struct JsonCommitRequestParser; struct JsonCommitRequestParser;
struct RouteMatch; struct RouteMatch;
/**
* HTTP-specific response context stored in pipeline entries.
* Arena-allocated and passed through pipeline for response correlation.
*/
struct HttpResponseContext {
int64_t sequence_id; // For response ordering in pipelining
int64_t http_request_id; // For X-Response-ID header
bool connection_close; // Whether to close connection after response
};
/**
* Response data ready to send (sequence_id -> response data).
* Absence from map indicates response not ready yet.
*/
struct ResponseData {
std::span<std::string_view> data;
Arena arena;
bool connection_close;
};
/** /**
* HTTP connection state stored in Connection::user_data. * HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data. * Manages llhttp parser state and request data.
@@ -49,6 +73,7 @@ struct HttpRequestState {
bool header_field_complete = false; bool header_field_complete = false;
int64_t http_request_id = int64_t http_request_id =
0; // X-Request-Id header value (for tracing/logging) 0; // X-Request-Id header value (for tracing/logging)
int64_t sequence_id = 0; // Assigned for response ordering in pipelining
// Streaming parser for POST requests // Streaming parser for POST requests
Arena::Ptr<JsonCommitRequestParser> commit_parser; Arena::Ptr<JsonCommitRequestParser> commit_parser;
@@ -67,6 +92,13 @@ struct HttpConnectionState {
HttpRequestState pending; HttpRequestState pending;
std::deque<HttpRequestState> queue; std::deque<HttpRequestState> queue;
// Response ordering for HTTP pipelining
std::mutex response_queue_mutex;
std::map<int64_t, ResponseData>
ready_responses; // sequence_id -> response data
int64_t next_sequence_to_send = 0;
int64_t next_sequence_id = 0;
HttpConnectionState(); HttpConnectionState();
}; };
@@ -140,6 +172,9 @@ struct HttpHandler : ConnectionHandler {
void on_connection_established(Connection &conn) override; void on_connection_established(Connection &conn) override;
void on_connection_closed(Connection &conn) override; void on_connection_closed(Connection &conn) override;
void on_data_arrived(std::string_view data, Connection &conn) override; void on_data_arrived(std::string_view data, Connection &conn) override;
void
on_preprocess_writes(Connection &conn,
std::span<PendingResponse> pending_responses) override;
void on_batch_complete(std::span<Connection *const> batch) override; void on_batch_complete(std::span<Connection *const> batch) override;
// llhttp callbacks (public for HttpConnectionState access) // llhttp callbacks (public for HttpConnectionState access)
@@ -212,15 +247,15 @@ private:
void handle_not_found(Connection &conn, HttpRequestState &state); void handle_not_found(Connection &conn, HttpRequestState &state);
// HTTP utilities // HTTP utilities
static void send_response(MessageSender &conn, int status_code, static void send_response(Connection &conn, int status_code,
std::string_view content_type, std::string_view content_type,
std::string_view body, Arena response_arena, std::string_view body, Arena response_arena,
int64_t http_request_id, bool close_connection); int64_t http_request_id, bool close_connection);
static void send_json_response(MessageSender &conn, int status_code, static void send_json_response(Connection &conn, int status_code,
std::string_view json, Arena response_arena, std::string_view json, Arena response_arena,
int64_t http_request_id, int64_t http_request_id,
bool close_connection); bool close_connection);
static void send_error_response(MessageSender &conn, int status_code, static void send_error_response(Connection &conn, int status_code,
std::string_view message, std::string_view message,
Arena response_arena, int64_t http_request_id, Arena response_arena, int64_t http_request_id,
bool close_connection); bool close_connection);
@@ -234,4 +269,9 @@ private:
format_json_response(int status_code, std::string_view json, format_json_response(int status_code, std::string_view json,
Arena &response_arena, int64_t http_request_id, Arena &response_arena, int64_t http_request_id,
bool close_connection); bool close_connection);
// Helper function to send response through reorder queue and preprocessing
void send_ordered_response(Connection &conn, HttpRequestState &state,
std::span<std::string_view> http_response,
Arena arena);
}; };

View File

@@ -17,23 +17,20 @@ struct CommitEntry {
bool resolve_success = false; // Set by resolve stage bool resolve_success = false; // Set by resolve stage
bool persist_success = false; // Set by persist stage bool persist_success = false; // Set by persist stage
// Copied HTTP state (pipeline threads cannot access connection user_data) // Protocol-agnostic context (arena-allocated, protocol-specific)
int64_t http_request_id = 0; void *protocol_context = nullptr;
bool connection_close = false;
const CommitRequest *commit_request = nullptr; // Points to request_arena data const CommitRequest *commit_request = nullptr; // Points to request_arena data
// Request arena contains parsed request data and formatted response // Request arena contains parsed request data and response data
Arena request_arena; Arena request_arena;
// Response data (set by persist stage, consumed by release stage) // JSON response body (set by persist stage, arena-allocated)
// Points to response formatted in request_arena std::string_view response_json;
std::span<std::string_view> response_message;
CommitEntry() = default; // Default constructor for variant CommitEntry() = default; // Default constructor for variant
explicit CommitEntry(WeakRef<MessageSender> conn, int64_t req_id, explicit CommitEntry(WeakRef<MessageSender> conn, void *ctx,
bool close_conn, const CommitRequest *req, Arena arena) const CommitRequest *req, Arena arena)
: connection(std::move(conn)), http_request_id(req_id), : connection(std::move(conn)), protocol_context(ctx), commit_request(req),
connection_close(close_conn), commit_request(req),
request_arena(std::move(arena)) {} request_arena(std::move(arena)) {}
}; };
@@ -45,21 +42,21 @@ struct StatusEntry {
WeakRef<MessageSender> connection; WeakRef<MessageSender> connection;
int64_t version_upper_bound = 0; // Set by sequence stage int64_t version_upper_bound = 0; // Set by sequence stage
// Copied HTTP state // Protocol-agnostic context (arena-allocated, protocol-specific)
int64_t http_request_id = 0; void *protocol_context = nullptr;
bool connection_close = false;
std::string_view status_request_id; // Points to request_arena data std::string_view status_request_id; // Points to request_arena data
// Request arena for HTTP request data // Request arena for request data
Arena request_arena; Arena request_arena;
// JSON response body (set by persist stage, arena-allocated)
std::string_view response_json;
StatusEntry() = default; // Default constructor for variant StatusEntry() = default; // Default constructor for variant
explicit StatusEntry(WeakRef<MessageSender> conn, int64_t req_id, explicit StatusEntry(WeakRef<MessageSender> conn, void *ctx,
bool close_conn, std::string_view request_id, std::string_view request_id, Arena arena)
Arena arena) : connection(std::move(conn)), protocol_context(ctx),
: connection(std::move(conn)), http_request_id(req_id), status_request_id(request_id), request_arena(std::move(arena)) {}
connection_close(close_conn), status_request_id(request_id),
request_arena(std::move(arena)) {}
}; };
/** /**
@@ -70,22 +67,19 @@ struct StatusEntry {
struct HealthCheckEntry { struct HealthCheckEntry {
WeakRef<MessageSender> connection; WeakRef<MessageSender> connection;
// Copied HTTP state // Protocol-agnostic context (arena-allocated, protocol-specific)
int64_t http_request_id = 0; void *protocol_context = nullptr;
bool connection_close = false;
// Request arena for formatting response // Request arena for response data
Arena request_arena; Arena request_arena;
// Response data (set by persist stage, consumed by release stage) // JSON response body (set by persist stage, arena-allocated)
// Points to response formatted in request_arena std::string_view response_json;
std::span<std::string_view> response_message;
HealthCheckEntry() = default; // Default constructor for variant HealthCheckEntry() = default; // Default constructor for variant
explicit HealthCheckEntry(WeakRef<MessageSender> conn, int64_t req_id, explicit HealthCheckEntry(WeakRef<MessageSender> conn, void *ctx, Arena arena)
bool close_conn, Arena arena) : connection(std::move(conn)), protocol_context(ctx),
: connection(std::move(conn)), http_request_id(req_id), request_arena(std::move(arena)) {}
connection_close(close_conn), request_arena(std::move(arena)) {}
}; };
/** /**

View File

@@ -432,8 +432,26 @@ void Server::process_connection_reads(Ref<Connection> &conn, int events) {
} }
} }
void Server::process_connection_writes(Ref<Connection> &conn, int /*events*/) { void Server::process_connection_writes(Ref<Connection> &conn, int events) {
assert(conn); assert(conn);
// Process pending responses first if this is an EPOLLOUT event
if (events & EPOLLOUT) {
std::unique_lock lock(conn->mutex_);
if (conn->has_pending_responses_) {
std::vector<PendingResponse> pending_vec;
pending_vec.reserve(conn->pending_response_queue_.size());
for (auto &response : conn->pending_response_queue_) {
pending_vec.push_back(std::move(response));
}
conn->pending_response_queue_.clear();
conn->has_pending_responses_ = false;
lock.unlock();
handler_.on_preprocess_writes(*conn, std::span{pending_vec});
}
}
auto result = conn->write_bytes(); auto result = conn->write_bytes();
if (result & Connection::WriteBytesResult::Error) { if (result & Connection::WriteBytesResult::Error) {

View File

@@ -35,8 +35,11 @@ TEST_CASE("Echo test") {
handler.done.wait(); handler.done.wait();
if (auto conn = handler.wconn.lock()) { if (auto conn = handler.wconn.lock()) {
conn->append_message(std::exchange(handler.reply, {}), // Cast to Connection* to access append_bytes (not available on
std::move(handler.arena)); // MessageSender)
auto *conn_ptr = static_cast<Connection *>(conn.get());
conn_ptr->append_bytes(std::exchange(handler.reply, {}),
std::move(handler.arena), false);
} else { } else {
REQUIRE(false); REQUIRE(false);
} }