Files
weaseldb/src/http_handler.hpp

169 lines
5.8 KiB
C++

#pragma once
#include <memory>
#include <string_view>
#include <thread>
#include <llhttp.h>
#include "connection.hpp"
#include "connection_handler.hpp"
#include "perfetto_categories.hpp"
#include "server.hpp"
#include "thread_pipeline.hpp"
/**
* HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing.
*/
enum class HttpRoute {
GET_version,
POST_commit,
GET_subscribe,
GET_status,
PUT_retention,
GET_retention,
DELETE_retention,
GET_metrics,
GET_ok,
NotFound
};
/**
* HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data.
*/
struct HttpConnectionState {
llhttp_t parser;
llhttp_settings_t settings;
// Current request data (arena-allocated)
std::string_view method;
std::string_view url;
// Parse state
bool headers_complete = false;
bool message_complete = false;
bool connection_close = false; // Client requested connection close
HttpRoute route = HttpRoute::NotFound;
// Header accumulation buffers (arena-allocated)
using ArenaString =
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
ArenaString current_header_field_buf;
ArenaString current_header_value_buf;
bool header_field_complete = false;
int64_t request_id = 0; // X-Request-Id header value
explicit HttpConnectionState(ArenaAllocator &arena);
};
/**
* HTTP/1.1 server implementation using llhttp for parsing.
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
struct HttpHandler : ConnectionHandler {
HttpHandler() {
for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) {
finalStageThreads.emplace_back([this, threadId]() {
pthread_setname_np(pthread_self(),
("stage-1-" + std::to_string(threadId)).c_str());
for (;;) {
auto guard = pipeline.acquire(1, threadId);
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % kFinalStageThreads) == threadId) {
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "pipeline thread",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
}
}
}
});
}
stage0Thread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "stage-0");
for (;;) {
auto guard = pipeline.acquire(0, 0, 0, false);
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
auto &c = *it;
if (!c) {
return;
}
for (volatile int i = 0; i < 1200; i = i + 1)
;
}
}
}};
}
~HttpHandler() {
{
auto guard = pipeline.push(kFinalStageThreads, true);
for (auto &c : guard.batch) {
c = {};
}
}
stage0Thread.join();
for (auto &thread : finalStageThreads) {
thread.join();
}
}
void on_connection_established(Connection &conn) override;
void on_connection_closed(Connection &conn) override;
void on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) override;
void on_write_buffer_drained(std::unique_ptr<Connection> &conn_ptr) override;
void on_batch_complete(
std::span<std::unique_ptr<Connection>> /*batch*/) override;
// Route parsing (public for testing)
static HttpRoute parseRoute(std::string_view method, std::string_view url);
// llhttp callbacks (public for HttpConnectionState access)
static int onUrl(llhttp_t *parser, const char *at, size_t length);
static int onHeaderField(llhttp_t *parser, const char *at, size_t length);
static int onHeaderFieldComplete(llhttp_t *parser);
static int onHeaderValue(llhttp_t *parser, const char *at, size_t length);
static int onHeaderValueComplete(llhttp_t *parser);
static int onHeadersComplete(llhttp_t *parser);
static int onBody(llhttp_t *parser, const char *at, size_t length);
static int onMessageComplete(llhttp_t *parser);
private:
static constexpr int kFinalStageThreads = 2;
static constexpr int kLogSize = 12;
ThreadPipeline<std::unique_ptr<Connection>> pipeline{
kLogSize, {/*noop serial thread*/ 1, kFinalStageThreads}};
std::thread stage0Thread;
std::vector<std::thread> finalStageThreads;
// Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, const HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn,
const HttpConnectionState &state);
void handleGetMetrics(Connection &conn, const HttpConnectionState &state);
void handleGetOk(Connection &conn, const HttpConnectionState &state);
void handleNotFound(Connection &conn, const HttpConnectionState &state);
// HTTP utilities
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body,
bool close_connection = false);
static void sendJsonResponse(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);
static void sendErrorResponse(Connection &conn, int status_code,
std::string_view message,
bool close_connection = false);
};