From b8d735f0744752c5479005178486cdfc44fbbd4f Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 19 Aug 2025 16:50:06 -0400 Subject: [PATCH] Initial http implementation --- CMakeLists.txt | 13 +- src/http_handler.cpp | 309 ++++++++++++++++++++++++++++++++++++ src/http_handler.hpp | 92 +++++++++++ src/main.cpp | 22 +-- tests/test_http_handler.cpp | 85 ++++++++++ 5 files changed, 502 insertions(+), 19 deletions(-) create mode 100644 src/http_handler.cpp create mode 100644 src/http_handler.hpp create mode 100644 tests/test_http_handler.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 695de84..b41d002 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -128,13 +128,14 @@ set(SOURCES src/connection.cpp src/server.cpp src/json_commit_request_parser.cpp + src/http_handler.cpp src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp) add_executable(weaseldb ${SOURCES}) add_dependencies(weaseldb generate_json_tokens) target_link_libraries(weaseldb Threads::Threads toml11::toml11 weaseljson - simdutf::simdutf) + simdutf::simdutf llhttp_static) enable_testing() @@ -158,6 +159,15 @@ target_link_libraries(test_commit_request doctest::doctest weaseljson test_data nlohmann_json::nlohmann_json simdutf::simdutf) target_include_directories(test_commit_request PRIVATE src tests) +add_executable( + test_http_handler tests/test_http_handler.cpp src/http_handler.cpp + src/arena_allocator.cpp src/connection.cpp) +target_link_libraries(test_http_handler doctest::doctest llhttp_static + Threads::Threads) +target_include_directories(test_http_handler PRIVATE src) +target_compile_definitions(test_http_handler + PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN) + add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp src/arena_allocator.cpp) target_link_libraries(bench_arena_allocator nanobench) @@ -192,6 +202,7 @@ target_include_directories(debug_arena PRIVATE src) add_test(NAME arena_allocator_tests COMMAND test_arena_allocator) add_test(NAME commit_request_tests COMMAND test_commit_request) +add_test(NAME http_handler_tests COMMAND test_http_handler) add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator) add_test(NAME commit_request_benchmarks COMMAND bench_commit_request) add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison) diff --git a/src/http_handler.cpp b/src/http_handler.cpp new file mode 100644 index 0000000..ca0dd0a --- /dev/null +++ b/src/http_handler.cpp @@ -0,0 +1,309 @@ +#include "http_handler.hpp" +#include "arena_allocator.hpp" +#include +#include + +// HttpConnectionState implementation +HttpConnectionState::HttpConnectionState( + [[maybe_unused]] ArenaAllocator &arena) { + llhttp_settings_init(&settings); + + // Set up llhttp callbacks + settings.on_url = HttpHandler::onUrl; + settings.on_header_field = HttpHandler::onHeaderField; + settings.on_header_value = HttpHandler::onHeaderValue; + settings.on_headers_complete = HttpHandler::onHeadersComplete; + settings.on_body = HttpHandler::onBody; + settings.on_message_complete = HttpHandler::onMessageComplete; + + llhttp_init(&parser, HTTP_REQUEST, &settings); + parser.data = this; +} + +// HttpHandler implementation +void HttpHandler::on_connection_established(Connection &conn) { + // Allocate HTTP state in connection's arena + ArenaAllocator &arena = conn.getArena(); + auto *state = arena.construct(arena); + conn.user_data = state; +} + +void HttpHandler::on_connection_closed(Connection &conn) { + // Arena cleanup happens automatically when connection is destroyed + conn.user_data = nullptr; +} + +void HttpHandler::on_write_progress(std::unique_ptr &conn_ptr) { + // Reset arena after all messages have been written for the next request + if (conn_ptr->outgoingBytesQueued() == 0) { + conn_ptr->reset(); + on_connection_established(*conn_ptr); + } +} + +void HttpHandler::on_data_arrived(std::string_view data, + std::unique_ptr &conn_ptr) { + auto *state = static_cast(conn_ptr->user_data); + if (!state) { + sendErrorResponse(*conn_ptr, 500, "Internal server error"); + return; + } + + // Parse HTTP data with llhttp + enum llhttp_errno err = + llhttp_execute(&state->parser, data.data(), data.size()); + + if (err != HPE_OK) { + sendErrorResponse(*conn_ptr, 400, "Bad request"); + return; + } + + // If message is complete, route and handle the request + if (state->message_complete) { + // Parse route from method and URL + state->route = parseRoute(state->method, state->url); + + // Route to appropriate handler + switch (state->route) { + case HttpRoute::GET_version: + handleGetVersion(*conn_ptr, *state); + break; + case HttpRoute::POST_commit: + handlePostCommit(*conn_ptr, *state); + break; + case HttpRoute::GET_subscribe: + handleGetSubscribe(*conn_ptr, *state); + break; + case HttpRoute::GET_status: + handleGetStatus(*conn_ptr, *state); + break; + case HttpRoute::PUT_retention: + handlePutRetention(*conn_ptr, *state); + break; + case HttpRoute::GET_retention: + handleGetRetention(*conn_ptr, *state); + break; + case HttpRoute::DELETE_retention: + handleDeleteRetention(*conn_ptr, *state); + break; + case HttpRoute::GET_metrics: + handleGetMetrics(*conn_ptr, *state); + break; + case HttpRoute::NotFound: + default: + handleNotFound(*conn_ptr, *state); + break; + } + + // Connection state will be reset when appropriate by the server + // For now, we don't reset immediately since messages may still be queued + } +} + +HttpRoute HttpHandler::parseRoute(std::string_view method, + std::string_view url) { + // Strip query parameters if present + size_t query_pos = url.find('?'); + if (query_pos != std::string_view::npos) { + url = url.substr(0, query_pos); + } + + // Route based on method and path + if (method == "GET") { + if (url == "/v1/version") + return HttpRoute::GET_version; + if (url == "/v1/subscribe") + return HttpRoute::GET_subscribe; + if (url.starts_with("/v1/status")) + return HttpRoute::GET_status; + if (url.starts_with("/v1/retention")) { + // Check if it's a specific retention policy or list all + return HttpRoute::GET_retention; + } + if (url == "/metrics") + return HttpRoute::GET_metrics; + } else if (method == "POST") { + if (url == "/v1/commit") + return HttpRoute::POST_commit; + } else if (method == "PUT") { + if (url.starts_with("/v1/retention/")) + return HttpRoute::PUT_retention; + } else if (method == "DELETE") { + if (url.starts_with("/v1/retention/")) + return HttpRoute::DELETE_retention; + } + + return HttpRoute::NotFound; +} + +// Route handlers (basic implementations) +void HttpHandler::handleGetVersion( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + sendJsonResponse( + conn, 200, + R"({"version":"0.0.1","leader":"node-1","committed_version":42})"); +} + +void HttpHandler::handlePostCommit( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Parse commit request from state.body and process + sendJsonResponse( + conn, 200, + R"({"request_id":"example","status":"committed","version":43})"); +} + +void HttpHandler::handleGetSubscribe( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Implement subscription streaming + sendJsonResponse( + conn, 200, + R"({"message":"Subscription endpoint - streaming not yet implemented"})"); +} + +void HttpHandler::handleGetStatus( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Extract request_id from URL and check status + sendJsonResponse( + conn, 200, + R"({"request_id":"example","status":"committed","version":43})"); +} + +void HttpHandler::handlePutRetention( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Parse retention policy from body and store + sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})"); +} + +void HttpHandler::handleGetRetention( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Extract policy_id from URL or return all policies + sendJsonResponse(conn, 200, R"({"policies":[]})"); +} + +void HttpHandler::handleDeleteRetention( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Extract policy_id from URL and delete + sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})"); +} + +void HttpHandler::handleGetMetrics( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + // TODO: Implement metrics collection and formatting + sendResponse(conn, 200, "text/plain", + "# WeaselDB metrics\nweaseldb_requests_total 0\n"); +} + +void HttpHandler::handleNotFound( + Connection &conn, [[maybe_unused]] const HttpConnectionState &state) { + sendErrorResponse(conn, 404, "Not found"); +} + +// HTTP utility methods +void HttpHandler::sendResponse(Connection &conn, int status_code, + std::string_view content_type, + std::string_view body) { + [[maybe_unused]] ArenaAllocator &arena = conn.getArena(); + + // Build HTTP response using arena + std::string response; + response.reserve(256 + body.size()); + + response += "HTTP/1.1 "; + response += std::to_string(status_code); + response += " "; + + // Status text + switch (status_code) { + case 200: + response += "OK"; + break; + case 400: + response += "Bad Request"; + break; + case 404: + response += "Not Found"; + break; + case 500: + response += "Internal Server Error"; + break; + default: + response += "Unknown"; + break; + } + + response += "\r\n"; + response += "Content-Type: "; + response += content_type; + response += "\r\n"; + response += "Content-Length: "; + response += std::to_string(body.size()); + response += "\r\n"; + response += "Connection: keep-alive\r\n"; + response += "\r\n"; + response += body; + + conn.appendMessage(response); +} + +void HttpHandler::sendJsonResponse(Connection &conn, int status_code, + std::string_view json) { + sendResponse(conn, status_code, "application/json", json); +} + +void HttpHandler::sendErrorResponse(Connection &conn, int status_code, + std::string_view message) { + [[maybe_unused]] ArenaAllocator &arena = conn.getArena(); + + std::string json = R"({"error":")"; + json += message; + json += R"("})"; + + sendJsonResponse(conn, status_code, json); +} + +// llhttp callbacks +int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) { + auto *state = static_cast(parser->data); + // Store URL in arena (simplified - would need to accumulate for streaming) + state->url = std::string_view(at, length); + return 0; +} + +int HttpHandler::onHeaderField([[maybe_unused]] llhttp_t *parser, + [[maybe_unused]] const char *at, + [[maybe_unused]] size_t length) { + // TODO: Store headers if needed + return 0; +} + +int HttpHandler::onHeaderValue([[maybe_unused]] llhttp_t *parser, + [[maybe_unused]] const char *at, + [[maybe_unused]] size_t length) { + // TODO: Store header values if needed + return 0; +} + +int HttpHandler::onHeadersComplete(llhttp_t *parser) { + auto *state = static_cast(parser->data); + state->headers_complete = true; + + // Get HTTP method + const char *method_str = + llhttp_method_name(static_cast(parser->method)); + state->method = std::string_view(method_str); + + return 0; +} + +int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) { + auto *state = static_cast(parser->data); + // Store body (simplified - would need to accumulate for streaming) + state->body = std::string_view(at, length); + return 0; +} + +int HttpHandler::onMessageComplete(llhttp_t *parser) { + auto *state = static_cast(parser->data); + state->message_complete = true; + return 0; +} \ No newline at end of file diff --git a/src/http_handler.hpp b/src/http_handler.hpp new file mode 100644 index 0000000..50b5b93 --- /dev/null +++ b/src/http_handler.hpp @@ -0,0 +1,92 @@ +#pragma once + +#include "connection.hpp" +#include "connection_handler.hpp" +#include +#include +#include + +/** + * HTTP routes supported by WeaselDB server. + * Using enum for efficient switch-based routing. + */ +enum class HttpRoute { + GET_version, + POST_commit, + GET_subscribe, + GET_status, + PUT_retention, + GET_retention, + DELETE_retention, + GET_metrics, + NotFound +}; + +/** + * HTTP connection state stored in Connection::user_data. + * Manages llhttp parser state and request data. + */ +struct HttpConnectionState { + llhttp_t parser; + llhttp_settings_t settings; + + // Current request data (arena-allocated) + std::string_view method; + std::string_view url; + std::string_view body; + + // Parse state + bool headers_complete = false; + bool message_complete = false; + HttpRoute route = HttpRoute::NotFound; + + explicit HttpConnectionState(ArenaAllocator &arena); +}; + +/** + * HTTP/1.1 server implementation using llhttp for parsing. + * Supports the WeaselDB REST API endpoints with enum-based routing. + */ +class HttpHandler : public ConnectionHandler { +public: + HttpHandler() = default; + + void on_connection_established(Connection &conn) override; + void on_connection_closed(Connection &conn) override; + void on_data_arrived(std::string_view data, + std::unique_ptr &conn_ptr) override; + void on_write_progress(std::unique_ptr &conn_ptr) override; + + // Route parsing (public for testing) + static HttpRoute parseRoute(std::string_view method, std::string_view url); + + // llhttp callbacks (public for HttpConnectionState access) + static int onUrl(llhttp_t *parser, const char *at, size_t length); + static int onHeaderField(llhttp_t *parser, const char *at, size_t length); + static int onHeaderValue(llhttp_t *parser, const char *at, size_t length); + static int onHeadersComplete(llhttp_t *parser); + static int onBody(llhttp_t *parser, const char *at, size_t length); + static int onMessageComplete(llhttp_t *parser); + +private: + // Route handlers + void handleGetVersion(Connection &conn, const HttpConnectionState &state); + void handlePostCommit(Connection &conn, const HttpConnectionState &state); + void handleGetSubscribe(Connection &conn, const HttpConnectionState &state); + void handleGetStatus(Connection &conn, const HttpConnectionState &state); + void handlePutRetention(Connection &conn, const HttpConnectionState &state); + void handleGetRetention(Connection &conn, const HttpConnectionState &state); + void handleDeleteRetention(Connection &conn, + const HttpConnectionState &state); + void handleGetMetrics(Connection &conn, const HttpConnectionState &state); + void handleNotFound(Connection &conn, const HttpConnectionState &state); + + // HTTP utilities + static void sendResponse(Connection &conn, int status_code, + std::string_view content_type, + std::string_view body); + static void sendJsonResponse(Connection &conn, int status_code, + std::string_view json); + static void sendErrorResponse(Connection &conn, int status_code, + std::string_view message); +}; \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 6c3c90f..1203a62 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,6 +1,7 @@ #include "config.hpp" #include "connection.hpp" #include "connection_handler.hpp" +#include "http_handler.hpp" #include "server.hpp" #include #include @@ -21,21 +22,6 @@ void signal_handler(int sig) { } } -/** - * Simple echo handler that mirrors received data back to the client. - * - * This implementation preserves the current behavior of the server - * while demonstrating the ConnectionHandler interface pattern. - */ -class EchoHandler : public ConnectionHandler { -public: - void on_data_arrived(std::string_view data, - std::unique_ptr &conn_ptr) override { - // Echo the received data back to the client - conn_ptr->appendMessage(data); - } -}; - void print_help(const char *program_name) { std::cout << "WeaselDB - High-performance write-side database component\n\n"; std::cout << "Usage: " << program_name << " [OPTIONS]\n\n"; @@ -121,8 +107,8 @@ int main(int argc, char *argv[]) { << std::endl; // Create handler and server - EchoHandler echo_handler; - auto server = Server::create(*config, echo_handler); + HttpHandler http_handler; + auto server = Server::create(*config, http_handler); g_server = server.get(); // Setup signal handling @@ -130,7 +116,7 @@ int main(int argc, char *argv[]) { signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); - std::cout << "Starting WeaselDB server..." << std::endl; + std::cout << "Starting WeaselDB HTTP server..." << std::endl; server->run(); std::cout << "Server shutdown complete." << std::endl; diff --git a/tests/test_http_handler.cpp b/tests/test_http_handler.cpp new file mode 100644 index 0000000..c46fafd --- /dev/null +++ b/tests/test_http_handler.cpp @@ -0,0 +1,85 @@ +#include "arena_allocator.hpp" +#include "http_handler.hpp" +#include +#include + +// Global variable needed by Connection +std::atomic activeConnections{0}; + +// Simple test helper since Connection has complex constructor requirements +struct TestConnectionData { + ArenaAllocator arena; + std::string message_buffer; + void *user_data = nullptr; + + void appendMessage(std::string_view data) { message_buffer += data; } + + ArenaAllocator &getArena() { return arena; } + const std::string &getResponse() const { return message_buffer; } + void clearResponse() { message_buffer.clear(); } + void reset() { + arena.reset(); + message_buffer.clear(); + } +}; + +TEST_CASE("HttpHandler route parsing") { + SUBCASE("GET routes") { + CHECK(HttpHandler::parseRoute("GET", "/v1/version") == + HttpRoute::GET_version); + CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe") == + HttpRoute::GET_subscribe); + CHECK(HttpHandler::parseRoute("GET", "/v1/status") == + HttpRoute::GET_status); + CHECK(HttpHandler::parseRoute("GET", "/v1/retention") == + HttpRoute::GET_retention); + CHECK(HttpHandler::parseRoute("GET", "/metrics") == HttpRoute::GET_metrics); + } + + SUBCASE("POST routes") { + CHECK(HttpHandler::parseRoute("POST", "/v1/commit") == + HttpRoute::POST_commit); + } + + SUBCASE("PUT routes") { + CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy1") == + HttpRoute::PUT_retention); + } + + SUBCASE("DELETE routes") { + CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy1") == + HttpRoute::DELETE_retention); + } + + SUBCASE("Unknown routes") { + CHECK(HttpHandler::parseRoute("GET", "/unknown") == HttpRoute::NotFound); + CHECK(HttpHandler::parseRoute("PATCH", "/v1/version") == + HttpRoute::NotFound); + } + + SUBCASE("Query parameters stripped") { + CHECK(HttpHandler::parseRoute("GET", "/v1/version?foo=bar") == + HttpRoute::GET_version); + } +} + +TEST_CASE("HttpHandler route parsing functionality") { + // Test just the static route parsing method since full integration testing + // would require complex Connection setup with server dependencies + + SUBCASE("Route parsing with query parameters") { + CHECK(HttpHandler::parseRoute("GET", "/v1/version?param=value") == + HttpRoute::GET_version); + CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe?stream=true") == + HttpRoute::GET_subscribe); + } + + SUBCASE("Retention policy routes") { + CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy123") == + HttpRoute::PUT_retention); + CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy456") == + HttpRoute::DELETE_retention); + CHECK(HttpHandler::parseRoute("GET", "/v1/retention/policy789") == + HttpRoute::GET_retention); + } +}