Initial http implementation

This commit is contained in:
2025-08-19 16:50:06 -04:00
parent ee32b64c02
commit b8d735f074
5 changed files with 502 additions and 19 deletions

View File

@@ -128,13 +128,14 @@ set(SOURCES
src/connection.cpp src/connection.cpp
src/server.cpp src/server.cpp
src/json_commit_request_parser.cpp src/json_commit_request_parser.cpp
src/http_handler.cpp
src/arena_allocator.cpp src/arena_allocator.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp) ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_executable(weaseldb ${SOURCES}) add_executable(weaseldb ${SOURCES})
add_dependencies(weaseldb generate_json_tokens) add_dependencies(weaseldb generate_json_tokens)
target_link_libraries(weaseldb Threads::Threads toml11::toml11 weaseljson target_link_libraries(weaseldb Threads::Threads toml11::toml11 weaseljson
simdutf::simdutf) simdutf::simdutf llhttp_static)
enable_testing() enable_testing()
@@ -158,6 +159,15 @@ target_link_libraries(test_commit_request doctest::doctest weaseljson test_data
nlohmann_json::nlohmann_json simdutf::simdutf) nlohmann_json::nlohmann_json simdutf::simdutf)
target_include_directories(test_commit_request PRIVATE src tests) target_include_directories(test_commit_request PRIVATE src tests)
add_executable(
test_http_handler tests/test_http_handler.cpp src/http_handler.cpp
src/arena_allocator.cpp src/connection.cpp)
target_link_libraries(test_http_handler doctest::doctest llhttp_static
Threads::Threads)
target_include_directories(test_http_handler PRIVATE src)
target_compile_definitions(test_http_handler
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp
src/arena_allocator.cpp) src/arena_allocator.cpp)
target_link_libraries(bench_arena_allocator nanobench) target_link_libraries(bench_arena_allocator nanobench)
@@ -192,6 +202,7 @@ target_include_directories(debug_arena PRIVATE src)
add_test(NAME arena_allocator_tests COMMAND test_arena_allocator) add_test(NAME arena_allocator_tests COMMAND test_arena_allocator)
add_test(NAME commit_request_tests COMMAND test_commit_request) add_test(NAME commit_request_tests COMMAND test_commit_request)
add_test(NAME http_handler_tests COMMAND test_http_handler)
add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator) add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator)
add_test(NAME commit_request_benchmarks COMMAND bench_commit_request) add_test(NAME commit_request_benchmarks COMMAND bench_commit_request)
add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison) add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison)

309
src/http_handler.cpp Normal file
View File

@@ -0,0 +1,309 @@
#include "http_handler.hpp"
#include "arena_allocator.hpp"
#include <cstring>
#include <string>
// HttpConnectionState implementation
HttpConnectionState::HttpConnectionState(
[[maybe_unused]] ArenaAllocator &arena) {
llhttp_settings_init(&settings);
// Set up llhttp callbacks
settings.on_url = HttpHandler::onUrl;
settings.on_header_field = HttpHandler::onHeaderField;
settings.on_header_value = HttpHandler::onHeaderValue;
settings.on_headers_complete = HttpHandler::onHeadersComplete;
settings.on_body = HttpHandler::onBody;
settings.on_message_complete = HttpHandler::onMessageComplete;
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
}
// HttpHandler implementation
void HttpHandler::on_connection_established(Connection &conn) {
// Allocate HTTP state in connection's arena
ArenaAllocator &arena = conn.getArena();
auto *state = arena.construct<HttpConnectionState>(arena);
conn.user_data = state;
}
void HttpHandler::on_connection_closed(Connection &conn) {
// Arena cleanup happens automatically when connection is destroyed
conn.user_data = nullptr;
}
void HttpHandler::on_write_progress(std::unique_ptr<Connection> &conn_ptr) {
// Reset arena after all messages have been written for the next request
if (conn_ptr->outgoingBytesQueued() == 0) {
conn_ptr->reset();
on_connection_established(*conn_ptr);
}
}
void HttpHandler::on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) {
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
if (!state) {
sendErrorResponse(*conn_ptr, 500, "Internal server error");
return;
}
// Parse HTTP data with llhttp
enum llhttp_errno err =
llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) {
sendErrorResponse(*conn_ptr, 400, "Bad request");
return;
}
// If message is complete, route and handle the request
if (state->message_complete) {
// Parse route from method and URL
state->route = parseRoute(state->method, state->url);
// Route to appropriate handler
switch (state->route) {
case HttpRoute::GET_version:
handleGetVersion(*conn_ptr, *state);
break;
case HttpRoute::POST_commit:
handlePostCommit(*conn_ptr, *state);
break;
case HttpRoute::GET_subscribe:
handleGetSubscribe(*conn_ptr, *state);
break;
case HttpRoute::GET_status:
handleGetStatus(*conn_ptr, *state);
break;
case HttpRoute::PUT_retention:
handlePutRetention(*conn_ptr, *state);
break;
case HttpRoute::GET_retention:
handleGetRetention(*conn_ptr, *state);
break;
case HttpRoute::DELETE_retention:
handleDeleteRetention(*conn_ptr, *state);
break;
case HttpRoute::GET_metrics:
handleGetMetrics(*conn_ptr, *state);
break;
case HttpRoute::NotFound:
default:
handleNotFound(*conn_ptr, *state);
break;
}
// Connection state will be reset when appropriate by the server
// For now, we don't reset immediately since messages may still be queued
}
}
HttpRoute HttpHandler::parseRoute(std::string_view method,
std::string_view url) {
// Strip query parameters if present
size_t query_pos = url.find('?');
if (query_pos != std::string_view::npos) {
url = url.substr(0, query_pos);
}
// Route based on method and path
if (method == "GET") {
if (url == "/v1/version")
return HttpRoute::GET_version;
if (url == "/v1/subscribe")
return HttpRoute::GET_subscribe;
if (url.starts_with("/v1/status"))
return HttpRoute::GET_status;
if (url.starts_with("/v1/retention")) {
// Check if it's a specific retention policy or list all
return HttpRoute::GET_retention;
}
if (url == "/metrics")
return HttpRoute::GET_metrics;
} else if (method == "POST") {
if (url == "/v1/commit")
return HttpRoute::POST_commit;
} else if (method == "PUT") {
if (url.starts_with("/v1/retention/"))
return HttpRoute::PUT_retention;
} else if (method == "DELETE") {
if (url.starts_with("/v1/retention/"))
return HttpRoute::DELETE_retention;
}
return HttpRoute::NotFound;
}
// Route handlers (basic implementations)
void HttpHandler::handleGetVersion(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
sendJsonResponse(
conn, 200,
R"({"version":"0.0.1","leader":"node-1","committed_version":42})");
}
void HttpHandler::handlePostCommit(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Parse commit request from state.body and process
sendJsonResponse(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})");
}
void HttpHandler::handleGetSubscribe(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Implement subscription streaming
sendJsonResponse(
conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})");
}
void HttpHandler::handleGetStatus(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Extract request_id from URL and check status
sendJsonResponse(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})");
}
void HttpHandler::handlePutRetention(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Parse retention policy from body and store
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})");
}
void HttpHandler::handleGetRetention(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Extract policy_id from URL or return all policies
sendJsonResponse(conn, 200, R"({"policies":[]})");
}
void HttpHandler::handleDeleteRetention(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Extract policy_id from URL and delete
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})");
}
void HttpHandler::handleGetMetrics(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
// TODO: Implement metrics collection and formatting
sendResponse(conn, 200, "text/plain",
"# WeaselDB metrics\nweaseldb_requests_total 0\n");
}
void HttpHandler::handleNotFound(
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
sendErrorResponse(conn, 404, "Not found");
}
// HTTP utility methods
void HttpHandler::sendResponse(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body) {
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
// Build HTTP response using arena
std::string response;
response.reserve(256 + body.size());
response += "HTTP/1.1 ";
response += std::to_string(status_code);
response += " ";
// Status text
switch (status_code) {
case 200:
response += "OK";
break;
case 400:
response += "Bad Request";
break;
case 404:
response += "Not Found";
break;
case 500:
response += "Internal Server Error";
break;
default:
response += "Unknown";
break;
}
response += "\r\n";
response += "Content-Type: ";
response += content_type;
response += "\r\n";
response += "Content-Length: ";
response += std::to_string(body.size());
response += "\r\n";
response += "Connection: keep-alive\r\n";
response += "\r\n";
response += body;
conn.appendMessage(response);
}
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
std::string_view json) {
sendResponse(conn, status_code, "application/json", json);
}
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
std::string_view message) {
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
std::string json = R"({"error":")";
json += message;
json += R"("})";
sendJsonResponse(conn, status_code, json);
}
// llhttp callbacks
int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
// Store URL in arena (simplified - would need to accumulate for streaming)
state->url = std::string_view(at, length);
return 0;
}
int HttpHandler::onHeaderField([[maybe_unused]] llhttp_t *parser,
[[maybe_unused]] const char *at,
[[maybe_unused]] size_t length) {
// TODO: Store headers if needed
return 0;
}
int HttpHandler::onHeaderValue([[maybe_unused]] llhttp_t *parser,
[[maybe_unused]] const char *at,
[[maybe_unused]] size_t length) {
// TODO: Store header values if needed
return 0;
}
int HttpHandler::onHeadersComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->headers_complete = true;
// Get HTTP method
const char *method_str =
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
state->method = std::string_view(method_str);
return 0;
}
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
// Store body (simplified - would need to accumulate for streaming)
state->body = std::string_view(at, length);
return 0;
}
int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->message_complete = true;
return 0;
}

92
src/http_handler.hpp Normal file
View File

@@ -0,0 +1,92 @@
#pragma once
#include "connection.hpp"
#include "connection_handler.hpp"
#include <llhttp.h>
#include <memory>
#include <string_view>
/**
* HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing.
*/
enum class HttpRoute {
GET_version,
POST_commit,
GET_subscribe,
GET_status,
PUT_retention,
GET_retention,
DELETE_retention,
GET_metrics,
NotFound
};
/**
* HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data.
*/
struct HttpConnectionState {
llhttp_t parser;
llhttp_settings_t settings;
// Current request data (arena-allocated)
std::string_view method;
std::string_view url;
std::string_view body;
// Parse state
bool headers_complete = false;
bool message_complete = false;
HttpRoute route = HttpRoute::NotFound;
explicit HttpConnectionState(ArenaAllocator &arena);
};
/**
* HTTP/1.1 server implementation using llhttp for parsing.
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
class HttpHandler : public ConnectionHandler {
public:
HttpHandler() = default;
void on_connection_established(Connection &conn) override;
void on_connection_closed(Connection &conn) override;
void on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) override;
void on_write_progress(std::unique_ptr<Connection> &conn_ptr) override;
// Route parsing (public for testing)
static HttpRoute parseRoute(std::string_view method, std::string_view url);
// llhttp callbacks (public for HttpConnectionState access)
static int onUrl(llhttp_t *parser, const char *at, size_t length);
static int onHeaderField(llhttp_t *parser, const char *at, size_t length);
static int onHeaderValue(llhttp_t *parser, const char *at, size_t length);
static int onHeadersComplete(llhttp_t *parser);
static int onBody(llhttp_t *parser, const char *at, size_t length);
static int onMessageComplete(llhttp_t *parser);
private:
// Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, const HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn,
const HttpConnectionState &state);
void handleGetMetrics(Connection &conn, const HttpConnectionState &state);
void handleNotFound(Connection &conn, const HttpConnectionState &state);
// HTTP utilities
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body);
static void sendJsonResponse(Connection &conn, int status_code,
std::string_view json);
static void sendErrorResponse(Connection &conn, int status_code,
std::string_view message);
};

View File

@@ -1,6 +1,7 @@
#include "config.hpp" #include "config.hpp"
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp" #include "connection_handler.hpp"
#include "http_handler.hpp"
#include "server.hpp" #include "server.hpp"
#include <atomic> #include <atomic>
#include <csignal> #include <csignal>
@@ -21,21 +22,6 @@ void signal_handler(int sig) {
} }
} }
/**
* Simple echo handler that mirrors received data back to the client.
*
* This implementation preserves the current behavior of the server
* while demonstrating the ConnectionHandler interface pattern.
*/
class EchoHandler : public ConnectionHandler {
public:
void on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) override {
// Echo the received data back to the client
conn_ptr->appendMessage(data);
}
};
void print_help(const char *program_name) { void print_help(const char *program_name) {
std::cout << "WeaselDB - High-performance write-side database component\n\n"; std::cout << "WeaselDB - High-performance write-side database component\n\n";
std::cout << "Usage: " << program_name << " [OPTIONS]\n\n"; std::cout << "Usage: " << program_name << " [OPTIONS]\n\n";
@@ -121,8 +107,8 @@ int main(int argc, char *argv[]) {
<< std::endl; << std::endl;
// Create handler and server // Create handler and server
EchoHandler echo_handler; HttpHandler http_handler;
auto server = Server::create(*config, echo_handler); auto server = Server::create(*config, http_handler);
g_server = server.get(); g_server = server.get();
// Setup signal handling // Setup signal handling
@@ -130,7 +116,7 @@ int main(int argc, char *argv[]) {
signal(SIGTERM, signal_handler); signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler); signal(SIGINT, signal_handler);
std::cout << "Starting WeaselDB server..." << std::endl; std::cout << "Starting WeaselDB HTTP server..." << std::endl;
server->run(); server->run();
std::cout << "Server shutdown complete." << std::endl; std::cout << "Server shutdown complete." << std::endl;

View File

@@ -0,0 +1,85 @@
#include "arena_allocator.hpp"
#include "http_handler.hpp"
#include <atomic>
#include <doctest/doctest.h>
// Global variable needed by Connection
std::atomic<int> activeConnections{0};
// Simple test helper since Connection has complex constructor requirements
struct TestConnectionData {
ArenaAllocator arena;
std::string message_buffer;
void *user_data = nullptr;
void appendMessage(std::string_view data) { message_buffer += data; }
ArenaAllocator &getArena() { return arena; }
const std::string &getResponse() const { return message_buffer; }
void clearResponse() { message_buffer.clear(); }
void reset() {
arena.reset();
message_buffer.clear();
}
};
TEST_CASE("HttpHandler route parsing") {
SUBCASE("GET routes") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version") ==
HttpRoute::GET_version);
CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe") ==
HttpRoute::GET_subscribe);
CHECK(HttpHandler::parseRoute("GET", "/v1/status") ==
HttpRoute::GET_status);
CHECK(HttpHandler::parseRoute("GET", "/v1/retention") ==
HttpRoute::GET_retention);
CHECK(HttpHandler::parseRoute("GET", "/metrics") == HttpRoute::GET_metrics);
}
SUBCASE("POST routes") {
CHECK(HttpHandler::parseRoute("POST", "/v1/commit") ==
HttpRoute::POST_commit);
}
SUBCASE("PUT routes") {
CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy1") ==
HttpRoute::PUT_retention);
}
SUBCASE("DELETE routes") {
CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy1") ==
HttpRoute::DELETE_retention);
}
SUBCASE("Unknown routes") {
CHECK(HttpHandler::parseRoute("GET", "/unknown") == HttpRoute::NotFound);
CHECK(HttpHandler::parseRoute("PATCH", "/v1/version") ==
HttpRoute::NotFound);
}
SUBCASE("Query parameters stripped") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version?foo=bar") ==
HttpRoute::GET_version);
}
}
TEST_CASE("HttpHandler route parsing functionality") {
// Test just the static route parsing method since full integration testing
// would require complex Connection setup with server dependencies
SUBCASE("Route parsing with query parameters") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version?param=value") ==
HttpRoute::GET_version);
CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe?stream=true") ==
HttpRoute::GET_subscribe);
}
SUBCASE("Retention policy routes") {
CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy123") ==
HttpRoute::PUT_retention);
CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy456") ==
HttpRoute::DELETE_retention);
CHECK(HttpHandler::parseRoute("GET", "/v1/retention/policy789") ==
HttpRoute::GET_retention);
}
}