Files
weaseldb/src/http_handler.hpp

225 lines
8.1 KiB
C++

#pragma once
#include <atomic>
#include <memory>
#include <string_view>
#include <thread>
#include <unordered_set>
#include <llhttp.h>
#include "api_url_parser.hpp"
#include "arena.hpp"
#include "config.hpp"
#include "connection.hpp"
#include "connection_handler.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
#include "thread_pipeline.hpp"
// Forward declarations
struct CommitRequest;
struct JsonCommitRequestParser;
struct RouteMatch;
/**
* HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data.
*/
struct HttpConnectionState {
Arena &arena;
llhttp_t parser;
llhttp_settings_t settings;
// Current request data (arena-allocated)
std::string_view method;
std::string_view url;
// Parse state
bool headers_complete = false;
bool message_complete = false;
bool connection_close = false; // Client requested connection close
HttpRoute route = HttpRoute::NotFound;
// Status request data
std::string_view
status_request_id; // Request ID extracted from /v1/status/{id} URL
// Header accumulation buffers (arena-allocated)
using ArenaString =
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
ArenaString current_header_field_buf;
ArenaString current_header_value_buf;
bool header_field_complete = false;
int64_t http_request_id =
0; // X-Request-Id header value (for tracing/logging)
// Streaming parser for POST requests
Arena::Ptr<JsonCommitRequestParser> commit_parser;
Arena::Ptr<CommitRequest> commit_request;
bool parsing_commit = false;
bool basic_validation_passed =
false; // Set to true if basic validation passes
explicit HttpConnectionState(Arena &arena);
};
/**
* HTTP/1.1 server implementation using llhttp for parsing.
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
struct HttpHandler : ConnectionHandler {
explicit HttpHandler(const weaseldb::Config &config)
: config_(config), banned_request_ids(ArenaStlAllocator<std::string_view>(
&banned_request_arena)) {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
auto guard = commitPipeline.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Stage 1: Precondition resolution thread
resolveThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) {
auto guard = commitPipeline.acquire<1, 0>(/*maxBatch*/ 1);
if (process_resolve_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Stage 2: Transaction persistence thread
persistThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist");
for (;;) {
auto guard = commitPipeline.acquire<2, 0>();
if (process_persist_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Stage 3: Connection return to server thread
releaseThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release");
for (;;) {
auto guard = commitPipeline.acquire<3, 0>();
if (process_release_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
}
~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{
auto guard = commitPipeline.push(1, true);
guard.batch[0] =
ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
}
void on_connection_established(Connection &conn) override;
void on_connection_closed(Connection &conn) override;
void on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) override;
void on_write_buffer_drained(std::unique_ptr<Connection> &conn_ptr) override;
void on_batch_complete(
std::span<std::unique_ptr<Connection>> /*batch*/) override;
// llhttp callbacks (public for HttpConnectionState access)
static int onUrl(llhttp_t *parser, const char *at, size_t length);
static int onHeaderField(llhttp_t *parser, const char *at, size_t length);
static int onHeaderFieldComplete(llhttp_t *parser);
static int onHeaderValue(llhttp_t *parser, const char *at, size_t length);
static int onHeaderValueComplete(llhttp_t *parser);
static int onHeadersComplete(llhttp_t *parser);
static int onBody(llhttp_t *parser, const char *at, size_t length);
static int onMessageComplete(llhttp_t *parser);
private:
static constexpr int lg_size = 16;
// Configuration reference
const weaseldb::Config &config_;
// Pipeline state (sequence thread only)
int64_t next_version = 1; // Next version to assign (sequence thread only)
// Pipeline state (persist thread writes, I/O threads read)
std::atomic<int64_t> committed_version{
0}; // Highest committed version (persist thread writes, I/O threads read)
// Arena for banned request IDs and related data structures (sequence thread
// only)
Arena banned_request_arena;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
// (string_views into arena)
// Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
1>
commitPipeline{lg_size};
// Pipeline stage threads
std::thread sequenceThread;
std::thread resolveThread;
std::thread persistThread;
std::thread releaseThread;
// Pipeline stage processing methods (batch-based)
using BatchType =
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
1, 1, 1>::Batch;
bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch);
// Route handlers
void handle_get_version(Connection &conn, const HttpConnectionState &state);
void handle_post_commit(Connection &conn, const HttpConnectionState &state);
void handle_get_subscribe(Connection &conn, const HttpConnectionState &state);
void handle_get_status(Connection &conn, HttpConnectionState &state,
const RouteMatch &route_match);
void handle_put_retention(Connection &conn, const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_get_retention(Connection &conn, const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_delete_retention(Connection &conn,
const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_get_metrics(Connection &conn, const HttpConnectionState &state);
void handle_get_ok(Connection &conn, const HttpConnectionState &state);
void handle_not_found(Connection &conn, const HttpConnectionState &state);
// HTTP utilities
static void send_response(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body,
bool close_connection = false);
static void send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);
static void send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection = false);
};