diff --git a/CMakeLists.txt b/CMakeLists.txt index 94f4cd7..76f0df5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -156,13 +156,22 @@ add_executable( test_http_handler tests/test_http_handler.cpp src/http_handler.cpp + src/json_commit_request_parser.cpp src/arena_allocator.cpp src/format.cpp src/connection.cpp src/connection_registry.cpp - src/metric.cpp) -target_link_libraries(test_http_handler doctest::doctest llhttp_static - Threads::Threads perfetto simdutf::simdutf) + src/metric.cpp + ${CMAKE_BINARY_DIR}/json_tokens.cpp) +add_dependencies(test_http_handler generate_json_tokens) +target_link_libraries( + test_http_handler + doctest::doctest + llhttp_static + Threads::Threads + perfetto + simdutf::simdutf + weaseljson) target_include_directories(test_http_handler PRIVATE src) target_compile_definitions(test_http_handler PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN) @@ -177,6 +186,7 @@ add_executable( src/arena_allocator.cpp src/config.cpp src/http_handler.cpp + src/json_commit_request_parser.cpp src/format.cpp src/metric.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp) diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 8fddf48..0c1d85e 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -6,6 +6,7 @@ #include "arena_allocator.hpp" #include "format.hpp" +#include "json_commit_request_parser.hpp" #include "metric.hpp" #include "perfetto_categories.hpp" @@ -80,7 +81,7 @@ void HttpHandler::on_data_arrived(std::string_view data, std::unique_ptr &conn_ptr) { auto *state = static_cast(conn_ptr->user_data); if (!state) { - sendErrorResponse(*conn_ptr, 500, "Internal server error", true); + send_error_response(*conn_ptr, 500, "Internal server error", true); return; } @@ -94,7 +95,7 @@ void HttpHandler::on_data_arrived(std::string_view data, llhttp_execute(&state->parser, data.data(), data.size()); if (err != HPE_OK) { - sendErrorResponse(*conn_ptr, 400, "Bad request", true); + send_error_response(*conn_ptr, 400, "Bad request", true); return; } @@ -181,7 +182,7 @@ HttpRoute HttpHandler::parseRoute(std::string_view method, // Route handlers (basic implementations) void HttpHandler::handleGetVersion(Connection &conn, const HttpConnectionState &state) { - sendJsonResponse( + send_json_response( conn, 200, R"({"version":"0.0.1","leader":"node-1","committed_version":42})", state.connection_close); @@ -189,17 +190,43 @@ void HttpHandler::handleGetVersion(Connection &conn, void HttpHandler::handlePostCommit(Connection &conn, const HttpConnectionState &state) { - // TODO: Parse commit request from state.body and process - sendJsonResponse( - conn, 200, - R"({"request_id":"example","status":"committed","version":43})", - state.connection_close); + // Check if streaming parse was successful + if (!state.commit_request || !state.parsing_commit) { + const char *error = state.commit_parser + ? state.commit_parser->get_parse_error() + : "No parser initialized"; + std::string error_msg = "Parse failed: "; + error_msg += error ? error : "Unknown error"; + send_error_response(conn, 400, error_msg, state.connection_close); + return; + } + + const CommitRequest &commit_request = *state.commit_request; + + // TODO: Process the commit request with transaction engine + // For now, return a placeholder response with parsed request_id if available + ArenaAllocator &arena = conn.get_arena(); + std::string_view response; + + if (commit_request.request_id().has_value()) { + response = format( + arena, + R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})", + static_cast(commit_request.request_id().value().size()), + commit_request.request_id().value().data()); + } else { + response = static_format( + arena, + R"({"status":"committed","version":43,"leader_id":"leader123"})"); + } + + send_json_response(conn, 200, response, state.connection_close); } void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) { // TODO: Implement subscription streaming - sendJsonResponse( + send_json_response( conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})", state.connection_close); @@ -208,7 +235,7 @@ void HttpHandler::handleGetSubscribe(Connection &conn, void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) { // TODO: Extract request_id from URL and check status - sendJsonResponse( + send_json_response( conn, 200, R"({"request_id":"example","status":"committed","version":43})", state.connection_close); @@ -217,21 +244,21 @@ void HttpHandler::handleGetStatus(Connection &conn, void HttpHandler::handlePutRetention(Connection &conn, const HttpConnectionState &state) { // TODO: Parse retention policy from body and store - sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})", - state.connection_close); + send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", + state.connection_close); } void HttpHandler::handleGetRetention(Connection &conn, const HttpConnectionState &state) { // TODO: Extract policy_id from URL or return all policies - sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close); + send_json_response(conn, 200, R"({"policies":[]})", state.connection_close); } void HttpHandler::handleDeleteRetention(Connection &conn, const HttpConnectionState &state) { // TODO: Extract policy_id from URL and delete - sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})", - state.connection_close); + send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", + state.connection_close); } void HttpHandler::handleGetMetrics(Connection &conn, @@ -285,7 +312,7 @@ void HttpHandler::handleGetOk(Connection &conn, void HttpHandler::handleNotFound(Connection &conn, const HttpConnectionState &state) { - sendErrorResponse(conn, 404, "Not found", state.connection_close); + send_error_response(conn, 404, "Not found", state.connection_close); } // HTTP utility methods @@ -347,22 +374,22 @@ void HttpHandler::sendResponse(Connection &conn, int status_code, conn.append_message(response); } -void HttpHandler::sendJsonResponse(Connection &conn, int status_code, - std::string_view json, - bool close_connection) { +void HttpHandler::send_json_response(Connection &conn, int status_code, + std::string_view json, + bool close_connection) { sendResponse(conn, status_code, "application/json", json, close_connection); } -void HttpHandler::sendErrorResponse(Connection &conn, int status_code, - std::string_view message, - bool close_connection) { +void HttpHandler::send_error_response(Connection &conn, int status_code, + std::string_view message, + bool close_connection) { [[maybe_unused]] ArenaAllocator &arena = conn.get_arena(); std::string json = R"({"error":")"; json += message; json += R"("})"; - sendJsonResponse(conn, status_code, json, close_connection); + send_json_response(conn, status_code, json, close_connection); } // llhttp callbacks @@ -443,19 +470,49 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) { llhttp_method_name(static_cast(parser->method)); state->method = std::string_view(method_str); + // Check if this looks like a POST to /v1/commit to initialize streaming + // parser + if (state->method == "POST" && state->url.find("/v1/commit") == 0) { + // Initialize streaming commit request parsing + state->commit_parser = std::make_unique(); + state->commit_request = std::make_unique(); + state->parsing_commit = + state->commit_parser->begin_streaming_parse(*state->commit_request); + + if (!state->parsing_commit) { + return -1; // Signal parsing error to llhttp + } + } + return 0; } int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) { - [[maybe_unused]] auto *state = - static_cast(parser->data); - (void)at; - (void)length; + auto *state = static_cast(parser->data); + + if (state->parsing_commit && state->commit_parser) { + // Stream data to commit request parser + auto status = + state->commit_parser->parse_chunk(const_cast(at), length); + if (status == CommitRequestParser::ParseStatus::Error) { + return -1; // Signal parsing error to llhttp + } + } + return 0; } int HttpHandler::onMessageComplete(llhttp_t *parser) { auto *state = static_cast(parser->data); state->message_complete = true; + + if (state->parsing_commit && state->commit_parser) { + // Finish streaming parse + auto status = state->commit_parser->finish_streaming_parse(); + if (status == CommitRequestParser::ParseStatus::Error) { + return -1; // Signal parsing error to llhttp + } + } + return 0; } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 0653840..753ec43 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -13,6 +13,10 @@ #include "server.hpp" #include "thread_pipeline.hpp" +// Forward declarations +struct CommitRequest; +struct JsonCommitRequestParser; + /** * HTTP routes supported by WeaselDB server. * Using enum for efficient switch-based routing. @@ -56,6 +60,11 @@ struct HttpConnectionState { bool header_field_complete = false; int64_t request_id = 0; // X-Request-Id header value + // Streaming parser for POST requests + std::unique_ptr commit_parser; + std::unique_ptr commit_request; + bool parsing_commit = false; + explicit HttpConnectionState(ArenaAllocator &arena); }; @@ -176,10 +185,10 @@ private: static void sendResponse(Connection &conn, int status_code, std::string_view content_type, std::string_view body, bool close_connection = false); - static void sendJsonResponse(Connection &conn, int status_code, - std::string_view json, - bool close_connection = false); - static void sendErrorResponse(Connection &conn, int status_code, - std::string_view message, - bool close_connection = false); + static void send_json_response(Connection &conn, int status_code, + std::string_view json, + bool close_connection = false); + static void send_error_response(Connection &conn, int status_code, + std::string_view message, + bool close_connection = false); };