#pragma once #include #include #include #include #include "connection.hpp" #include "connection_handler.hpp" #include "perfetto_categories.hpp" #include "server.hpp" #include "thread_pipeline.hpp" /** * HTTP routes supported by WeaselDB server. * Using enum for efficient switch-based routing. */ enum class HttpRoute { GET_version, POST_commit, GET_subscribe, GET_status, PUT_retention, GET_retention, DELETE_retention, GET_metrics, GET_ok, NotFound }; /** * HTTP connection state stored in Connection::user_data. * Manages llhttp parser state and request data. */ struct HttpConnectionState { llhttp_t parser; llhttp_settings_t settings; // Current request data (arena-allocated) std::string_view method; std::string_view url; // Parse state bool headers_complete = false; bool message_complete = false; bool connection_close = false; // Client requested connection close HttpRoute route = HttpRoute::NotFound; // Header accumulation buffers (arena-allocated) using ArenaString = std::basic_string, ArenaStlAllocator>; ArenaString current_header_field_buf; ArenaString current_header_value_buf; bool header_field_complete = false; int64_t request_id = 0; // X-Request-Id header value explicit HttpConnectionState(ArenaAllocator &arena); }; /** * HTTP/1.1 server implementation using llhttp for parsing. * Supports the WeaselDB REST API endpoints with enum-based routing. */ struct HttpHandler : ConnectionHandler { HttpHandler() { for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) { finalStageThreads.emplace_back([this, threadId]() { pthread_setname_np(pthread_self(), ("stage-1-" + std::to_string(threadId)).c_str()); for (;;) { auto guard = pipeline.acquire(1, threadId); for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { if ((it.index() % kFinalStageThreads) == threadId) { auto &c = *it; if (!c) { return; } auto *state = static_cast(c->user_data); TRACE_EVENT("http", "pipeline thread", perfetto::Flow::Global(state->request_id)); Server::release_back_to_server(std::move(c)); } } } }); } stage0Thread = std::thread{[this]() { pthread_setname_np(pthread_self(), "stage-0"); for (;;) { auto guard = pipeline.acquire(0, 0, 0, false); for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { auto &c = *it; if (!c) { return; } for (volatile int i = 0; i < 1200; i = i + 1) ; } } }}; } ~HttpHandler() { { auto guard = pipeline.push(kFinalStageThreads, true); for (auto &c : guard.batch) { c = {}; } } stage0Thread.join(); for (auto &thread : finalStageThreads) { thread.join(); } } void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; void on_data_arrived(std::string_view data, std::unique_ptr &conn_ptr) override; void on_write_buffer_drained(std::unique_ptr &conn_ptr) override; void on_batch_complete( std::span> /*batch*/) override; // Route parsing (public for testing) static HttpRoute parseRoute(std::string_view method, std::string_view url); // llhttp callbacks (public for HttpConnectionState access) static int onUrl(llhttp_t *parser, const char *at, size_t length); static int onHeaderField(llhttp_t *parser, const char *at, size_t length); static int onHeaderFieldComplete(llhttp_t *parser); static int onHeaderValue(llhttp_t *parser, const char *at, size_t length); static int onHeaderValueComplete(llhttp_t *parser); static int onHeadersComplete(llhttp_t *parser); static int onBody(llhttp_t *parser, const char *at, size_t length); static int onMessageComplete(llhttp_t *parser); private: static constexpr int kFinalStageThreads = 2; static constexpr int kLogSize = 12; ThreadPipeline> pipeline{ kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}}; std::thread stage0Thread; std::vector finalStageThreads; // Route handlers void handleGetVersion(Connection &conn, const HttpConnectionState &state); void handlePostCommit(Connection &conn, const HttpConnectionState &state); void handleGetSubscribe(Connection &conn, const HttpConnectionState &state); void handleGetStatus(Connection &conn, const HttpConnectionState &state); void handlePutRetention(Connection &conn, const HttpConnectionState &state); void handleGetRetention(Connection &conn, const HttpConnectionState &state); void handleDeleteRetention(Connection &conn, const HttpConnectionState &state); void handleGetMetrics(Connection &conn, const HttpConnectionState &state); void handleGetOk(Connection &conn, const HttpConnectionState &state); void handleNotFound(Connection &conn, const HttpConnectionState &state); // HTTP utilities static void sendResponse(Connection &conn, int status_code, std::string_view content_type, std::string_view body, bool close_connection = false); static void sendJsonResponse(Connection &conn, int status_code, std::string_view json, bool close_connection = false); static void sendErrorResponse(Connection &conn, int status_code, std::string_view message, bool close_connection = false); };