From 1b220d0d1cbb9f4e2327ef0612a069f70ab96552 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 10:28:17 -0400 Subject: [PATCH] WIP --- src/connection.cpp | 30 ++- src/connection.hpp | 58 ++--- src/connection_handler.hpp | 31 ++- src/http_handler.cpp | 420 ++++++++++++++++++++++++++----------- src/http_handler.hpp | 48 ++++- src/pipeline_entry.hpp | 58 +++-- src/server.cpp | 20 +- tests/test_server.cpp | 7 +- 8 files changed, 479 insertions(+), 193 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 1be9328..8f5d0fe 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -87,9 +87,9 @@ void Connection::close() { connections_active.dec(); } -// May be called off the io thread! -void Connection::append_message(std::span data_parts, - Arena arena, bool close_after_send) { +// Called from I/O thread only +void Connection::append_bytes(std::span data_parts, + Arena arena, bool close_after_send) { // Calculate total bytes for this message. Don't need to hold the lock yet. size_t total_bytes = 0; for (const auto &part : data_parts) { @@ -124,6 +124,30 @@ void Connection::append_message(std::span data_parts, } } +void Connection::send_response(void *protocol_context, + std::string_view response_json, Arena arena) { + std::unique_lock lock(mutex_); + + // Store response in queue for protocol handler processing + pending_response_queue_.emplace_back( + PendingResponse{protocol_context, response_json, std::move(arena)}); + + // Mark that we have pending responses and trigger epoll interest + if (!has_pending_responses_) { + has_pending_responses_ = true; + + auto server = server_.lock(); + if (fd_ >= 0 && server) { + // Add EPOLLOUT interest to trigger on_preprocess_writes + struct epoll_event event; + event.data.fd = fd_; + event.events = EPOLLIN | EPOLLOUT; + tsan_release(); + epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); + } + } +} + int Connection::readBytes(char *buf, size_t buffer_size) { int r; for (;;) { diff --git a/src/connection.hpp b/src/connection.hpp index e12f0e4..d288f62 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -30,28 +30,26 @@ struct Server; */ struct MessageSender { /** - * @brief Append message data to connection's outgoing message queue. + * @brief Send response with protocol-specific context for ordering. * - * Thread-safe method that can be called from any thread, including - * pipeline processing threads. The arena is moved into the connection - * to maintain data lifetime until the message is sent. Messages appended - * concurrently may be written in either order, but they will not be - * interleaved. + * Thread-safe method for pipeline threads to send responses back to clients. + * Delegates to the connection's protocol handler for ordering logic. + * The protocol handler may queue the response or send it immediately. * - * @param data_parts Span of string_view parts to send (arena-allocated) - * @param arena Arena containing the memory for data_parts string_views - * @param close_after_send Whether to close connection after sending + * @param protocol_context Arena-allocated protocol-specific context + * @param data Response data parts (may be empty for deferred serialization) + * @param arena Arena containing response data and context * * Example usage: * ```cpp - * auto response_parts = arena.allocate_span(2); - * response_parts[0] = "HTTP/1.1 200 OK\r\n\r\n"; - * response_parts[1] = "Hello World"; - * conn.append_message(response_parts, std::move(arena)); + * auto* ctx = arena.allocate(); + * ctx->sequence_id = 42; + * auto response_data = format_response(arena); + * conn.send_response(ctx, response_data, std::move(arena)); * ``` */ - virtual void append_message(std::span data_parts, - Arena arena, bool close_after_send = false) = 0; + virtual void send_response(void *protocol_context, + std::string_view response_json, Arena arena) = 0; virtual ~MessageSender() = default; }; @@ -104,31 +102,33 @@ struct Connection : MessageSender { * @brief Queue an atomic message to be sent to the client. * * Adds a complete message with all associated data to the connection's - * outgoing message queue. The message will be sent asynchronously by a - * server I/O thread using efficient vectored I/O. + * outgoing byte queue with guaranteed ordering. + * + * I/O thread only method for protocol handlers to queue bytes for sending. + * Bytes are queued in order and sent using efficient vectored I/O. * * @param data_parts Span of string_views pointing to arena-allocated data * @param arena Arena that owns all the memory referenced by data_parts - * @param close_after_send Whether to close connection after sending this - * message - * - * @note Thread Safety: This method is thread-safe and can be called - * concurrently from multiple pipeline threads. + * @param close_after_send Whether to close connection after sending * + * @note Thread Safety: Must be called from I/O thread only. + * @note Ordering: Bytes are sent in the order calls are made. * @note The memory referenced by the data_parts span, must outlive @p arena. - * The arena will be moved and kept alive until the message is fully sent. * - * Example usage: + * Example usage (from ConnectionHandler::on_preprocess_writes): * ```cpp * Arena arena; * auto parts = arena.allocate_span(2); * parts[0] = build_header(arena); * parts[1] = build_body(arena); - * conn.append_message({parts, 2}, std::move(arena)); + * conn.append_bytes({parts, 2}, std::move(arena), false); * ``` */ - void append_message(std::span data_parts, Arena arena, - bool close_after_send) override; + void append_bytes(std::span data_parts, Arena arena, + bool close_after_send); + + void send_response(void *protocol_context, std::string_view response_json, + Arena arena) override; /** * @brief Get a WeakRef to this connection for async operations. @@ -342,6 +342,10 @@ private: // mutex_, but if non-empty mutex_ can be // dropped while server accesses existing elements. int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes + bool has_pending_responses_{ + false}; // True if protocol handler has responses to process + std::deque + pending_response_queue_; // Responses awaiting protocol processing // Set to a negative number in `close` int fd_; diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 3306f2a..28713e2 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -3,9 +3,22 @@ #include #include -// Forward declaration to avoid circular dependency +// Forward declarations to avoid circular dependency struct Connection; +// Include Arena header since PendingResponse uses Arena by value +#include "arena.hpp" + +/** + * Represents a response queued by pipeline threads for protocol processing. + * Contains JSON response data that can be wrapped by any protocol. + */ +struct PendingResponse { + void *protocol_context; // Arena-allocated protocol-specific context + std::string_view response_json; // JSON response body (arena-allocated) + Arena arena; // Arena containing response data and context +}; + /** * Abstract interface for handling connection data processing. * @@ -89,4 +102,20 @@ public: * @note Called from this connection's io thread. */ virtual void on_batch_complete(std::span /*batch*/) {} + + /** + * Called before processing outgoing writes on a connection. + * + * This hook allows protocol handlers to process queued responses + * before actual socket writes occur. Used for response ordering, + * serialization, and other preprocessing. + * + * @param conn Connection about to write data + * @param pending_responses Responses queued by pipeline threads + * @note Called from this connection's io thread. + * @note Called when EPOLLOUT event occurs + */ + virtual void + on_preprocess_writes(Connection &conn, + std::span pending_responses) {} }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index eb86fe6..e17d43a 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -71,6 +71,69 @@ void HttpHandler::on_connection_closed(Connection &conn) { conn.user_data = nullptr; } +void HttpHandler::on_preprocess_writes( + Connection &conn, std::span pending_responses) { + auto *state = static_cast(conn.user_data); + + // Process incoming responses and add to reorder queue + { + std::lock_guard lock(state->response_queue_mutex); + + for (auto &pending : pending_responses) { + auto *ctx = static_cast(pending.protocol_context); + + printf( + "Processing response: sequence_id=%ld, request_id=%ld, json='%.*s'\n", + ctx->sequence_id, ctx->http_request_id, + (int)pending.response_json.size(), pending.response_json.data()); + + // Determine HTTP status code and content type from response content + int status_code = 200; + std::string_view content_type = "application/json"; + + // For health checks, detect plain text responses + if (pending.response_json == "OK") { + content_type = "text/plain"; + } + // For metrics, detect Prometheus format (starts with # or contains metric + // names) + else if (pending.response_json.starts_with("#") || + pending.response_json.find("_total") != std::string_view::npos || + pending.response_json.find("_counter") != + std::string_view::npos) { + content_type = "text/plain; version=0.0.4"; + } + + // Format HTTP response from JSON + auto http_response = format_response( + status_code, content_type, pending.response_json, pending.arena, + ctx->http_request_id, ctx->connection_close); + + printf("Adding response to queue: sequence_id=%ld\n", ctx->sequence_id); + state->ready_responses[ctx->sequence_id] = ResponseData{ + http_response, std::move(pending.arena), ctx->connection_close}; + } + + // Send responses in sequential order + printf("Checking for sequential responses, next_sequence_to_send=%ld\n", + state->next_sequence_to_send); + auto iter = state->ready_responses.begin(); + while (iter != state->ready_responses.end() && + iter->first == state->next_sequence_to_send) { + auto &[sequence_id, response_data] = *iter; + + printf("Sending response: sequence_id=%ld\n", sequence_id); + conn.append_bytes(response_data.data, std::move(response_data.arena), + response_data.connection_close); + state->next_sequence_to_send++; + iter = state->ready_responses.erase(iter); + } + printf("After processing, next_sequence_to_send=%ld, " + "ready_responses.size()=%zu\n", + state->next_sequence_to_send, state->ready_responses.size()); + } +} + static thread_local std::vector g_batch_entries; void HttpHandler::on_batch_complete(std::span batch) { @@ -80,6 +143,16 @@ void HttpHandler::on_batch_complete(std::span batch) { auto *state = static_cast(conn->user_data); for (auto &req : state->queue) { + // Assign sequence ID for response ordering + int64_t sequence_id = state->next_sequence_id++; + req.sequence_id = sequence_id; + + // Create HttpResponseContext for this request + auto *ctx = req.arena.allocate(1); + ctx->sequence_id = sequence_id; + ctx->http_request_id = req.http_request_id; + ctx->connection_close = req.connection_close; + char *url_buffer = req.arena.allocate(req.url.size()); std::memcpy(url_buffer, req.url.data(), req.url.size()); RouteMatch route_match; @@ -88,8 +161,30 @@ void HttpHandler::on_batch_complete(std::span batch) { static_cast(req.url.size()), route_match); if (parse_result != ParseResult::Success) { // Handle malformed URL encoding - send_error_response(*conn, 400, "Malformed URL encoding", - std::move(req.arena), 0, true); + // Assign sequence ID for this error response + int64_t error_sequence_id = state->next_sequence_id++; + req.sequence_id = error_sequence_id; + + auto json_response = R"({"error":"Malformed URL encoding"})"; + auto http_response = + format_json_response(400, json_response, req.arena, 0, true); + + // Add to reorder queue and process immediately since this is an error + state->ready_responses[error_sequence_id] = + ResponseData{http_response, std::move(req.arena), true}; + + // Process ready responses in order and send via append_bytes + auto iter = state->ready_responses.begin(); + while (iter != state->ready_responses.end() && + iter->first == state->next_sequence_to_send) { + auto &[sequence_id, response_data] = *iter; + + // Send through append_bytes which handles write interest + conn->append_bytes(response_data.data, std::move(response_data.arena), + response_data.connection_close); + state->next_sequence_to_send++; + iter = state->ready_responses.erase(iter); + } break; } req.route = route_match.route; @@ -131,21 +226,20 @@ void HttpHandler::on_batch_complete(std::span batch) { // Create CommitEntry for commit requests if (req.route == HttpRoute::PostCommit && req.commit_request && req.parsing_commit && req.basic_validation_passed) { - g_batch_entries.push_back(CommitEntry{ - conn->get_weak_ref(), req.http_request_id, req.connection_close, - req.commit_request.get(), std::move(req.arena)}); + g_batch_entries.emplace_back(CommitEntry(conn->get_weak_ref(), ctx, + req.commit_request.get(), + std::move(req.arena))); } // Create StatusEntry for status requests else if (req.route == HttpRoute::GetStatus) { - g_batch_entries.push_back(StatusEntry{ - conn->get_weak_ref(), req.http_request_id, req.connection_close, - req.status_request_id, std::move(req.arena)}); + g_batch_entries.emplace_back(StatusEntry(conn->get_weak_ref(), ctx, + req.status_request_id, + std::move(req.arena))); } // Create HealthCheckEntry for health check requests else if (req.route == HttpRoute::GetOk) { - g_batch_entries.push_back( - HealthCheckEntry{conn->get_weak_ref(), req.http_request_id, - req.connection_close, std::move(req.arena)}); + g_batch_entries.emplace_back( + HealthCheckEntry(conn->get_weak_ref(), ctx, std::move(req.arena))); } } state->queue.clear(); @@ -155,8 +249,8 @@ void HttpHandler::on_batch_complete(std::span batch) { // contention on the way into the pipeline. if (g_batch_entries.size() > 0) { auto guard = commitPipeline.push(g_batch_entries.size(), true); - auto out_iter = guard.batch.begin(); - std::move(g_batch_entries.begin(), g_batch_entries.end(), out_iter); + std::move(g_batch_entries.begin(), g_batch_entries.end(), + guard.batch.begin()); } g_batch_entries.clear(); } @@ -164,7 +258,13 @@ void HttpHandler::on_batch_complete(std::span batch) { void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { auto *state = static_cast(conn.user_data); if (!state) { - send_error_response(conn, 500, "Internal server error", Arena{}, 0, true); + // Create a temporary arena and add error response directly to avoid + // sequence issues + Arena error_arena; + auto json_response = R"({"error":"Internal server error"})"; + auto http_response = + format_json_response(500, json_response, error_arena, 0, true); + conn.append_bytes(http_response, std::move(error_arena), true); return; } @@ -189,8 +289,12 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { if (err == HPE_OK) { break; } - send_error_response(conn, 400, "Bad request", - std::move(state->pending.arena), 0, true); + // Parse error - send response directly since this is before sequence + // assignment + auto json_response = R"({"error":"Bad request"})"; + auto http_response = + format_json_response(400, json_response, state->pending.arena, 0, true); + conn.append_bytes(http_response, std::move(state->pending.arena), true); return; } } @@ -199,11 +303,19 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { void HttpHandler::handle_get_version(Connection &conn, HttpRequestState &state) { version_counter.inc(); - send_json_response( - conn, 200, + + // Generate JSON response + auto json_response = format(state.arena, R"({"version":%ld,"leader":""})", - this->committed_version.load(std::memory_order_seq_cst)), - std::move(state.arena), state.http_request_id, state.connection_close); + this->committed_version.load(std::memory_order_seq_cst)); + + // Format HTTP response + auto http_response = + format_json_response(200, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } void HttpHandler::handle_post_commit(Connection &conn, @@ -211,8 +323,16 @@ void HttpHandler::handle_post_commit(Connection &conn, commit_counter.inc(); // Check if streaming parse was successful if (!state.commit_request || !state.parsing_commit) { - send_error_response(conn, 400, "Parse failed", std::move(state.arena), - state.http_request_id, state.connection_close); + auto json_response = R"({"error":"Parse failed"})"; + auto http_response = + format_json_response(400, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Add directly to response queue with proper sequencing (no lock needed - + // same I/O thread) + auto *conn_state = static_cast(conn.user_data); + conn_state->ready_responses[state.sequence_id] = ResponseData{ + http_response, std::move(state.arena), state.connection_close}; return; } @@ -252,8 +372,18 @@ void HttpHandler::handle_post_commit(Connection &conn, } if (!valid) { - send_error_response(conn, 400, error_msg, std::move(state.arena), - state.http_request_id, state.connection_close); + auto json_response = + format(state.arena, R"({"error":"%.*s"})", + static_cast(error_msg.size()), error_msg.data()); + auto http_response = + format_json_response(400, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Add directly to response queue with proper sequencing (no lock needed - + // same I/O thread) + auto *conn_state = static_cast(conn.user_data); + conn_state->ready_responses[state.sequence_id] = ResponseData{ + http_response, std::move(state.arena), state.connection_close}; return; } @@ -265,10 +395,14 @@ void HttpHandler::handle_post_commit(Connection &conn, void HttpHandler::handle_get_subscribe(Connection &conn, HttpRequestState &state) { // TODO: Implement subscription streaming - send_json_response( - conn, 200, - R"({"message":"Subscription endpoint - streaming not yet implemented"})", - std::move(state.arena), state.http_request_id, state.connection_close); + auto json_response = + R"({"message":"Subscription endpoint - streaming not yet implemented"})"; + auto http_response = + format_json_response(200, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, @@ -282,16 +416,29 @@ void HttpHandler::handle_get_status(Connection &conn, HttpRequestState &state, const auto &request_id = route_match.params[static_cast(ApiParameterKey::RequestId)]; if (!request_id) { - send_error_response( - conn, 400, "Missing required query parameter: request_id", - std::move(state.arena), state.http_request_id, state.connection_close); + auto json_response = + R"({"error":"Missing required query parameter: request_id"})"; + auto http_response = + format_json_response(400, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Add directly to response queue with proper sequencing + auto *conn_state = static_cast(conn.user_data); + conn_state->ready_responses[state.sequence_id] = ResponseData{ + http_response, std::move(state.arena), state.connection_close}; return; } if (request_id->empty()) { - send_error_response(conn, 400, "Empty request_id parameter", - std::move(state.arena), state.http_request_id, - state.connection_close); + auto json_response = R"({"error":"Empty request_id parameter"})"; + auto http_response = + format_json_response(400, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Add directly to response queue with proper sequencing + auto *conn_state = static_cast(conn.user_data); + conn_state->ready_responses[state.sequence_id] = ResponseData{ + http_response, std::move(state.arena), state.connection_close}; return; } @@ -305,26 +452,39 @@ void HttpHandler::handle_put_retention(Connection &conn, HttpRequestState &state, const RouteMatch &) { // TODO: Parse retention policy from body and store - send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", - std::move(state.arena), state.http_request_id, - state.connection_close); + auto json_response = R"({"policy_id":"example","status":"created"})"; + auto http_response = + format_json_response(200, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } void HttpHandler::handle_get_retention(Connection &conn, HttpRequestState &state, const RouteMatch &) { // TODO: Extract policy_id from URL or return all policies - send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), - state.http_request_id, state.connection_close); + auto json_response = R"({"policies":[]})"; + auto http_response = + format_json_response(200, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } void HttpHandler::handle_delete_retention(Connection &conn, HttpRequestState &state, const RouteMatch &) { // TODO: Extract policy_id from URL and delete - send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", - std::move(state.arena), state.http_request_id, - state.connection_close); + auto json_response = R"({"policy_id":"example","status":"deleted"})"; + auto http_response = + format_json_response(200, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } void HttpHandler::handle_get_metrics(Connection &conn, @@ -338,7 +498,12 @@ void HttpHandler::handle_get_metrics(Connection &conn, total_size += sv.size(); } - // Build HTTP response headers using arena + // Build HTTP response with metrics data + auto result = + state.arena.allocate_span(metrics_span.size() + 1); + auto out = result.begin(); + + // Build HTTP headers std::string_view headers; if (state.connection_close) { headers = static_format( @@ -356,31 +521,37 @@ void HttpHandler::handle_get_metrics(Connection &conn, "Connection: keep-alive\r\n", "\r\n"); } - auto result = - state.arena.allocate_span(metrics_span.size() + 1); - auto out = result.begin(); *out++ = headers; for (auto sv : metrics_span) { *out++ = sv; } - conn.append_message(result, std::move(state.arena), state.connection_close); + + // Add directly to response queue with proper sequencing (no lock needed - + // same I/O thread) + auto *conn_state = static_cast(conn.user_data); + conn_state->ready_responses[state.sequence_id] = + ResponseData{result, std::move(state.arena), state.connection_close}; } void HttpHandler::handle_get_ok(Connection &, HttpRequestState &) { ok_counter.inc(); - TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); // Health check requests are processed through the pipeline // Response will be generated in the release stage after pipeline processing } void HttpHandler::handle_not_found(Connection &conn, HttpRequestState &state) { - send_error_response(conn, 404, "Not found", std::move(state.arena), - state.http_request_id, state.connection_close); + auto json_response = R"({"error":"Not found"})"; + auto http_response = + format_json_response(404, json_response, state.arena, + state.http_request_id, state.connection_close); + + // Send through reorder queue and preprocessing to maintain proper ordering + send_ordered_response(conn, state, http_response, std::move(state.arena)); } // HTTP utility methods -void HttpHandler::send_response(MessageSender &conn, int status_code, +void HttpHandler::send_response(Connection &conn, int status_code, std::string_view content_type, std::string_view body, Arena response_arena, int64_t http_request_id, @@ -423,10 +594,10 @@ void HttpHandler::send_response(MessageSender &conn, int status_code, content_type.data(), body.size(), http_request_id, connection_header, static_cast(body.size()), body.data()); - conn.append_message(response, std::move(response_arena), close_connection); + conn.append_bytes(response, std::move(response_arena), close_connection); } -void HttpHandler::send_json_response(MessageSender &conn, int status_code, +void HttpHandler::send_json_response(Connection &conn, int status_code, std::string_view json, Arena response_arena, int64_t http_request_id, @@ -435,7 +606,7 @@ void HttpHandler::send_json_response(MessageSender &conn, int status_code, std::move(response_arena), http_request_id, close_connection); } -void HttpHandler::send_error_response(MessageSender &conn, int status_code, +void HttpHandler::send_error_response(Connection &conn, int status_code, std::string_view message, Arena response_arena, int64_t http_request_id, @@ -448,6 +619,29 @@ void HttpHandler::send_error_response(MessageSender &conn, int status_code, http_request_id, close_connection); } +void HttpHandler::send_ordered_response( + Connection &conn, HttpRequestState &state, + std::span http_response, Arena arena) { + auto *conn_state = static_cast(conn.user_data); + + // Add to reorder queue with proper sequencing + conn_state->ready_responses[state.sequence_id] = + ResponseData{http_response, std::move(arena), state.connection_close}; + + // Process ready responses in order and send via append_bytes + auto iter = conn_state->ready_responses.begin(); + while (iter != conn_state->ready_responses.end() && + iter->first == conn_state->next_sequence_to_send) { + auto &[sequence_id, response_data] = *iter; + + // Send through append_bytes which handles write interest + conn.append_bytes(response_data.data, std::move(response_data.arena), + response_data.connection_close); + conn_state->next_sequence_to_send++; + iter = conn_state->ready_responses.erase(iter); + } +} + std::span HttpHandler::format_response(int status_code, std::string_view content_type, std::string_view body, Arena &response_arena, @@ -644,8 +838,9 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { if (!commit_entry.commit_request) { // Should not happen - basic validation was done on I/O thread - send_error_response(*conn_ref, 500, "Internal server error", - Arena{}, commit_entry.http_request_id, true); + conn_ref->send_response(commit_entry.protocol_context, + R"({"error":"Internal server error"})", + Arena{}); return false; } @@ -658,11 +853,10 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { if (banned_request_ids.find(commit_request_id) != banned_request_ids.end()) { // Request ID is banned, this commit should fail - send_json_response( - *conn_ref, 409, + conn_ref->send_response( + commit_entry.protocol_context, R"({"status": "not_committed", "error": "request_id_banned"})", - Arena{}, commit_entry.http_request_id, - commit_entry.connection_close); + Arena{}); return false; } } @@ -670,9 +864,6 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { // Assign sequential version number commit_entry.assigned_version = next_version++; - TRACE_EVENT("http", "sequence_commit", - perfetto::Flow::Global(commit_entry.http_request_id)); - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: add request_id to banned list, get version @@ -702,14 +893,8 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { status_entry.version_upper_bound = next_version - 1; } - TRACE_EVENT("http", "sequence_status", - perfetto::Flow::Global(status_entry.http_request_id)); - - // TODO: Transfer to status threadpool - for now just respond - // not_committed - send_json_response(*conn_ref, 200, R"({"status": "not_committed"})", - Arena{}, status_entry.http_request_id, - status_entry.connection_close); + // TODO: Transfer to status threadpool - for now mark as processed + // Response will be generated in persist stage return false; // Continue processing } else if constexpr (std::is_same_v) { @@ -723,10 +908,6 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT( - "http", "sequence_health_check", - perfetto::Flow::Global(health_check_entry.http_request_id)); - return false; // Continue processing } @@ -775,9 +956,6 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { // Accept all commits (simplified implementation) commit_entry.resolve_success = true; - TRACE_EVENT("http", "resolve_commit", - perfetto::Flow::Global(commit_entry.http_request_id)); - return false; // Continue processing } else if constexpr (std::is_same_v) { // Status entries are not processed in resolve stage @@ -794,10 +972,6 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT( - "http", "resolve_health_check", - perfetto::Flow::Global(health_check_entry.http_request_id)); - // Perform configurable CPU-intensive work for benchmarking spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); @@ -850,37 +1024,46 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { committed_version.store(commit_entry.assigned_version, std::memory_order_seq_cst); - TRACE_EVENT("http", "persist_commit", - perfetto::Flow::Global(commit_entry.http_request_id)); - const CommitRequest &commit_request = *commit_entry.commit_request; - // Generate success response with actual assigned version using - // request arena - std::string_view response; + // Generate success JSON response with actual assigned version + std::string_view response_json; if (commit_request.request_id().has_value()) { - response = format( + response_json = format( commit_entry.request_arena, R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", static_cast(commit_request.request_id().value().size()), commit_request.request_id().value().data(), commit_entry.assigned_version); } else { - response = format( + response_json = format( commit_entry.request_arena, R"({"status":"committed","version":%ld,"leader_id":"leader123"})", commit_entry.assigned_version); } - // Format response but don't send yet - store for release stage - commit_entry.response_message = format_json_response( - 200, response, commit_entry.request_arena, - commit_entry.http_request_id, commit_entry.connection_close); + // Store JSON response in arena for release stage + char *json_buffer = + commit_entry.request_arena.template allocate( + response_json.size()); + std::memcpy(json_buffer, response_json.data(), + response_json.size()); + commit_entry.response_json = + std::string_view(json_buffer, response_json.size()); return false; // Continue processing } else if constexpr (std::is_same_v) { - // Status entries are not processed in persist stage - // They were already handled in sequence stage + // Process status entry: generate not_committed response + auto &status_entry = e; + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return false; + } + + // Store JSON response for release stage + status_entry.response_json = R"({"status": "not_committed"})"; + return false; } else if constexpr (std::is_same_v) { // Process health check entry: generate OK response @@ -893,15 +1076,8 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT( - "http", "persist_health_check", - perfetto::Flow::Global(health_check_entry.http_request_id)); - - // Format OK response but don't send yet - store for release stage - health_check_entry.response_message = format_response( - 200, "text/plain", "OK", health_check_entry.request_arena, - health_check_entry.http_request_id, - health_check_entry.connection_close); + // Store plain text "OK" response for release stage + health_check_entry.response_json = "OK"; return false; // Continue processing } @@ -941,13 +1117,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT("http", "release_commit", - perfetto::Flow::Global(commit_entry.http_request_id)); - - // Send the response that was formatted in persist stage - conn_ref->append_message(commit_entry.response_message, - std::move(commit_entry.request_arena), - commit_entry.connection_close); + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(commit_entry.protocol_context, + commit_entry.response_json, + std::move(commit_entry.request_arena)); return false; // Continue processing } else if constexpr (std::is_same_v) { @@ -961,8 +1135,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT("http", "release_status", - perfetto::Flow::Global(status_entry.http_request_id)); + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(status_entry.protocol_context, + status_entry.response_json, + std::move(status_entry.request_arena)); return false; // Continue processing } else if constexpr (std::is_same_v) { @@ -976,15 +1153,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) { return false; // Skip this entry and continue processing } - TRACE_EVENT( - "http", "release_health_check", - perfetto::Flow::Global(health_check_entry.http_request_id)); - - // Send the response that was formatted in persist stage - conn_ref->append_message( - health_check_entry.response_message, - std::move(health_check_entry.request_arena), - health_check_entry.connection_close); + // Send the response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response( + health_check_entry.protocol_context, + health_check_entry.response_json, + std::move(health_check_entry.request_arena)); return false; // Continue processing } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index c35090b..b077775 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -1,6 +1,10 @@ #pragma once #include +#include +#include +#include +#include #include #include #include @@ -20,6 +24,26 @@ struct CommitRequest; struct JsonCommitRequestParser; struct RouteMatch; +/** + * HTTP-specific response context stored in pipeline entries. + * Arena-allocated and passed through pipeline for response correlation. + */ +struct HttpResponseContext { + int64_t sequence_id; // For response ordering in pipelining + int64_t http_request_id; // For X-Response-ID header + bool connection_close; // Whether to close connection after response +}; + +/** + * Response data ready to send (sequence_id -> response data). + * Absence from map indicates response not ready yet. + */ +struct ResponseData { + std::span data; + Arena arena; + bool connection_close; +}; + /** * HTTP connection state stored in Connection::user_data. * Manages llhttp parser state and request data. @@ -48,7 +72,8 @@ struct HttpRequestState { ArenaString current_header_value_buf; bool header_field_complete = false; int64_t http_request_id = - 0; // X-Request-Id header value (for tracing/logging) + 0; // X-Request-Id header value (for tracing/logging) + int64_t sequence_id = 0; // Assigned for response ordering in pipelining // Streaming parser for POST requests Arena::Ptr commit_parser; @@ -67,6 +92,13 @@ struct HttpConnectionState { HttpRequestState pending; std::deque queue; + // Response ordering for HTTP pipelining + std::mutex response_queue_mutex; + std::map + ready_responses; // sequence_id -> response data + int64_t next_sequence_to_send = 0; + int64_t next_sequence_id = 0; + HttpConnectionState(); }; @@ -140,6 +172,9 @@ struct HttpHandler : ConnectionHandler { void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; void on_data_arrived(std::string_view data, Connection &conn) override; + void + on_preprocess_writes(Connection &conn, + std::span pending_responses) override; void on_batch_complete(std::span batch) override; // llhttp callbacks (public for HttpConnectionState access) @@ -212,15 +247,15 @@ private: void handle_not_found(Connection &conn, HttpRequestState &state); // HTTP utilities - static void send_response(MessageSender &conn, int status_code, + static void send_response(Connection &conn, int status_code, std::string_view content_type, std::string_view body, Arena response_arena, int64_t http_request_id, bool close_connection); - static void send_json_response(MessageSender &conn, int status_code, + static void send_json_response(Connection &conn, int status_code, std::string_view json, Arena response_arena, int64_t http_request_id, bool close_connection); - static void send_error_response(MessageSender &conn, int status_code, + static void send_error_response(Connection &conn, int status_code, std::string_view message, Arena response_arena, int64_t http_request_id, bool close_connection); @@ -234,4 +269,9 @@ private: format_json_response(int status_code, std::string_view json, Arena &response_arena, int64_t http_request_id, bool close_connection); + + // Helper function to send response through reorder queue and preprocessing + void send_ordered_response(Connection &conn, HttpRequestState &state, + std::span http_response, + Arena arena); }; diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index c3307b3..824384d 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -17,23 +17,20 @@ struct CommitEntry { bool resolve_success = false; // Set by resolve stage bool persist_success = false; // Set by persist stage - // Copied HTTP state (pipeline threads cannot access connection user_data) - int64_t http_request_id = 0; - bool connection_close = false; + // Protocol-agnostic context (arena-allocated, protocol-specific) + void *protocol_context = nullptr; const CommitRequest *commit_request = nullptr; // Points to request_arena data - // Request arena contains parsed request data and formatted response + // Request arena contains parsed request data and response data Arena request_arena; - // Response data (set by persist stage, consumed by release stage) - // Points to response formatted in request_arena - std::span response_message; + // JSON response body (set by persist stage, arena-allocated) + std::string_view response_json; CommitEntry() = default; // Default constructor for variant - explicit CommitEntry(WeakRef conn, int64_t req_id, - bool close_conn, const CommitRequest *req, Arena arena) - : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn), commit_request(req), + explicit CommitEntry(WeakRef conn, void *ctx, + const CommitRequest *req, Arena arena) + : connection(std::move(conn)), protocol_context(ctx), commit_request(req), request_arena(std::move(arena)) {} }; @@ -45,21 +42,21 @@ struct StatusEntry { WeakRef connection; int64_t version_upper_bound = 0; // Set by sequence stage - // Copied HTTP state - int64_t http_request_id = 0; - bool connection_close = false; + // Protocol-agnostic context (arena-allocated, protocol-specific) + void *protocol_context = nullptr; std::string_view status_request_id; // Points to request_arena data - // Request arena for HTTP request data + // Request arena for request data Arena request_arena; + // JSON response body (set by persist stage, arena-allocated) + std::string_view response_json; + StatusEntry() = default; // Default constructor for variant - explicit StatusEntry(WeakRef conn, int64_t req_id, - bool close_conn, std::string_view request_id, - Arena arena) - : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn), status_request_id(request_id), - request_arena(std::move(arena)) {} + explicit StatusEntry(WeakRef conn, void *ctx, + std::string_view request_id, Arena arena) + : connection(std::move(conn)), protocol_context(ctx), + status_request_id(request_id), request_arena(std::move(arena)) {} }; /** @@ -70,22 +67,19 @@ struct StatusEntry { struct HealthCheckEntry { WeakRef connection; - // Copied HTTP state - int64_t http_request_id = 0; - bool connection_close = false; + // Protocol-agnostic context (arena-allocated, protocol-specific) + void *protocol_context = nullptr; - // Request arena for formatting response + // Request arena for response data Arena request_arena; - // Response data (set by persist stage, consumed by release stage) - // Points to response formatted in request_arena - std::span response_message; + // JSON response body (set by persist stage, arena-allocated) + std::string_view response_json; HealthCheckEntry() = default; // Default constructor for variant - explicit HealthCheckEntry(WeakRef conn, int64_t req_id, - bool close_conn, Arena arena) - : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn), request_arena(std::move(arena)) {} + explicit HealthCheckEntry(WeakRef conn, void *ctx, Arena arena) + : connection(std::move(conn)), protocol_context(ctx), + request_arena(std::move(arena)) {} }; /** diff --git a/src/server.cpp b/src/server.cpp index 76fcd96..3b37658 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -432,8 +432,26 @@ void Server::process_connection_reads(Ref &conn, int events) { } } -void Server::process_connection_writes(Ref &conn, int /*events*/) { +void Server::process_connection_writes(Ref &conn, int events) { assert(conn); + + // Process pending responses first if this is an EPOLLOUT event + if (events & EPOLLOUT) { + std::unique_lock lock(conn->mutex_); + if (conn->has_pending_responses_) { + std::vector pending_vec; + pending_vec.reserve(conn->pending_response_queue_.size()); + for (auto &response : conn->pending_response_queue_) { + pending_vec.push_back(std::move(response)); + } + conn->pending_response_queue_.clear(); + conn->has_pending_responses_ = false; + lock.unlock(); + + handler_.on_preprocess_writes(*conn, std::span{pending_vec}); + } + } + auto result = conn->write_bytes(); if (result & Connection::WriteBytesResult::Error) { diff --git a/tests/test_server.cpp b/tests/test_server.cpp index 7e91baa..f38b9a7 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -35,8 +35,11 @@ TEST_CASE("Echo test") { handler.done.wait(); if (auto conn = handler.wconn.lock()) { - conn->append_message(std::exchange(handler.reply, {}), - std::move(handler.arena)); + // Cast to Connection* to access append_bytes (not available on + // MessageSender) + auto *conn_ptr = static_cast(conn.get()); + conn_ptr->append_bytes(std::exchange(handler.reply, {}), + std::move(handler.arena), false); } else { REQUIRE(false); }