diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 4f11f63..2115a27 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -122,19 +122,21 @@ void HttpHandler::on_batch_complete(std::span batch) { state->parsing_commit && state->basic_validation_passed) { *out_iter++ = CommitEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close, state->commit_request.get()}; + state->connection_close, state->commit_request.get(), + std::move(state->arena)}; } // Create StatusEntry for status requests else if (state->route == HttpRoute::GetStatus) { *out_iter++ = StatusEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close, state->status_request_id}; + state->connection_close, state->status_request_id, + std::move(state->arena)}; } // Create HealthCheckEntry for health check requests else if (state->route == HttpRoute::GetOk) { *out_iter++ = HealthCheckEntry{conn->get_weak_ref(), state->http_request_id, - state->connection_close}; + state->connection_close, std::move(state->arena)}; } } } @@ -488,6 +490,57 @@ void HttpHandler::send_error_response(MessageSender &conn, int status_code, http_request_id, close_connection); } +std::span +HttpHandler::format_response(int status_code, std::string_view content_type, + std::string_view body, Arena &response_arena, + int64_t http_request_id, bool close_connection) { + // Status text + std::string_view status_text; + switch (status_code) { + case 200: + status_text = "OK"; + break; + case 400: + status_text = "Bad Request"; + break; + case 404: + status_text = "Not Found"; + break; + case 500: + status_text = "Internal Server Error"; + break; + default: + status_text = "Unknown"; + break; + } + + const char *connection_header = close_connection ? "close" : "keep-alive"; + + auto response = std::span{response_arena.allocate(1), 1}; + + response[0] = + format(response_arena, + "HTTP/1.1 %d %.*s\r\n" + "Content-Type: %.*s\r\n" + "Content-Length: %zu\r\n" + "X-Response-ID: %ld\r\n" + "Connection: %s\r\n" + "\r\n%.*s", + status_code, static_cast(status_text.size()), + status_text.data(), static_cast(content_type.size()), + content_type.data(), body.size(), http_request_id, + connection_header, static_cast(body.size()), body.data()); + + return response; +} + +std::span HttpHandler::format_json_response( + int status_code, std::string_view json, Arena &response_arena, + int64_t http_request_id, bool close_connection) { + return format_response(status_code, "application/json", json, response_arena, + http_request_id, close_connection); +} + // llhttp callbacks int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) { auto *state = static_cast(parser->data); @@ -856,26 +909,26 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { const CommitRequest &commit_request = *commit_entry.commit_request; - Arena response_arena; + // Generate success response with actual assigned version using + // request arena std::string_view response; - - // Generate success response with actual assigned version if (commit_request.request_id().has_value()) { response = format( - response_arena, + commit_entry.request_arena, R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", static_cast(commit_request.request_id().value().size()), commit_request.request_id().value().data(), commit_entry.assigned_version); } else { response = format( - response_arena, + commit_entry.request_arena, R"({"status":"committed","version":%ld,"leader_id":"leader123"})", commit_entry.assigned_version); } - send_json_response( - *conn_ref, 200, response, std::move(response_arena), + // Format response but don't send yet - store for release stage + commit_entry.response_message = format_json_response( + 200, response, commit_entry.request_arena, commit_entry.http_request_id, commit_entry.connection_close); return false; // Continue processing @@ -898,10 +951,11 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { "http", "persist_health_check", perfetto::Flow::Global(health_check_entry.http_request_id)); - // Generate OK response - send_response(*conn_ref, 200, "text/plain", "OK", Arena{}, - health_check_entry.http_request_id, - health_check_entry.connection_close); + // Format OK response but don't send yet - store for release stage + health_check_entry.response_message = format_response( + 200, "text/plain", "OK", health_check_entry.request_arena, + health_check_entry.http_request_id, + health_check_entry.connection_close); return false; // Continue processing } @@ -944,6 +998,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) { TRACE_EVENT("http", "release_commit", perfetto::Flow::Global(commit_entry.http_request_id)); + // Send the response that was formatted in persist stage + conn_ref->append_message(commit_entry.response_message, + std::move(commit_entry.request_arena), + commit_entry.connection_close); + return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: return connection to server @@ -975,6 +1034,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) { "http", "release_health_check", perfetto::Flow::Global(health_check_entry.http_request_id)); + // Send the response that was formatted in persist stage + conn_ref->append_message( + health_check_entry.response_message, + std::move(health_check_entry.request_arena), + health_check_entry.connection_close); + return false; // Continue processing } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 9620a24..09c4b7b 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -169,7 +169,7 @@ private: // (string_views into arena) // Main commit processing pipeline: sequence -> resolve -> persist -> release - StaticThreadPipeline commitPipeline{lg_size}; @@ -181,8 +181,8 @@ private: // Pipeline stage processing methods (batch-based) using BatchType = - StaticThreadPipeline::Batch; + StaticThreadPipeline::Batch; bool process_sequence_batch(BatchType &batch); bool process_resolve_batch(BatchType &batch); bool process_persist_batch(BatchType &batch); @@ -225,4 +225,14 @@ private: std::string_view message, Arena response_arena, int64_t http_request_id, bool close_connection); + + // Helper functions for formatting responses without sending + static std::span + format_response(int status_code, std::string_view content_type, + std::string_view body, Arena &response_arena, + int64_t http_request_id, bool close_connection); + static std::span + format_json_response(int status_code, std::string_view json, + Arena &response_arena, int64_t http_request_id, + bool close_connection); }; diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index bffd8da..c3307b3 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -1,5 +1,6 @@ #pragma once +#include "arena.hpp" #include "connection.hpp" #include @@ -19,14 +20,21 @@ struct CommitEntry { // Copied HTTP state (pipeline threads cannot access connection user_data) int64_t http_request_id = 0; bool connection_close = false; - const CommitRequest *commit_request = - nullptr; // Points to connection's arena data + const CommitRequest *commit_request = nullptr; // Points to request_arena data + + // Request arena contains parsed request data and formatted response + Arena request_arena; + + // Response data (set by persist stage, consumed by release stage) + // Points to response formatted in request_arena + std::span response_message; CommitEntry() = default; // Default constructor for variant explicit CommitEntry(WeakRef conn, int64_t req_id, - bool close_conn, const CommitRequest *req) + bool close_conn, const CommitRequest *req, Arena arena) : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn), commit_request(req) {} + connection_close(close_conn), commit_request(req), + request_arena(std::move(arena)) {} }; /** @@ -40,13 +48,18 @@ struct StatusEntry { // Copied HTTP state int64_t http_request_id = 0; bool connection_close = false; - std::string_view status_request_id; // Points to connection's arena data + std::string_view status_request_id; // Points to request_arena data + + // Request arena for HTTP request data + Arena request_arena; StatusEntry() = default; // Default constructor for variant explicit StatusEntry(WeakRef conn, int64_t req_id, - bool close_conn, std::string_view request_id) + bool close_conn, std::string_view request_id, + Arena arena) : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn), status_request_id(request_id) {} + connection_close(close_conn), status_request_id(request_id), + request_arena(std::move(arena)) {} }; /** @@ -61,11 +74,18 @@ struct HealthCheckEntry { int64_t http_request_id = 0; bool connection_close = false; + // Request arena for formatting response + Arena request_arena; + + // Response data (set by persist stage, consumed by release stage) + // Points to response formatted in request_arena + std::span response_message; + HealthCheckEntry() = default; // Default constructor for variant explicit HealthCheckEntry(WeakRef conn, int64_t req_id, - bool close_conn) + bool close_conn, Arena arena) : connection(std::move(conn)), http_request_id(req_id), - connection_close(close_conn) {} + connection_close(close_conn), request_arena(std::move(arena)) {} }; /**