Parse commit request

This commit is contained in:
2025-09-03 21:53:04 -04:00
parent 46edb7cd26
commit 978861c430
3 changed files with 112 additions and 36 deletions

View File

@@ -6,6 +6,7 @@
#include "arena_allocator.hpp"
#include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp"
#include "perfetto_categories.hpp"
@@ -80,7 +81,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) {
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
if (!state) {
sendErrorResponse(*conn_ptr, 500, "Internal server error", true);
send_error_response(*conn_ptr, 500, "Internal server error", true);
return;
}
@@ -94,7 +95,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) {
sendErrorResponse(*conn_ptr, 400, "Bad request", true);
send_error_response(*conn_ptr, 400, "Bad request", true);
return;
}
@@ -181,7 +182,7 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
// Route handlers (basic implementations)
void HttpHandler::handleGetVersion(Connection &conn,
const HttpConnectionState &state) {
sendJsonResponse(
send_json_response(
conn, 200,
R"({"version":"0.0.1","leader":"node-1","committed_version":42})",
state.connection_close);
@@ -189,17 +190,43 @@ void HttpHandler::handleGetVersion(Connection &conn,
void HttpHandler::handlePostCommit(Connection &conn,
const HttpConnectionState &state) {
// TODO: Parse commit request from state.body and process
sendJsonResponse(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})",
state.connection_close);
// Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) {
const char *error = state.commit_parser
? state.commit_parser->get_parse_error()
: "No parser initialized";
std::string error_msg = "Parse failed: ";
error_msg += error ? error : "Unknown error";
send_error_response(conn, 400, error_msg, state.connection_close);
return;
}
const CommitRequest &commit_request = *state.commit_request;
// TODO: Process the commit request with transaction engine
// For now, return a placeholder response with parsed request_id if available
ArenaAllocator &arena = conn.get_arena();
std::string_view response;
if (commit_request.request_id().has_value()) {
response = format(
arena,
R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data());
} else {
response = static_format(
arena,
R"({"status":"committed","version":43,"leader_id":"leader123"})");
}
send_json_response(conn, 200, response, state.connection_close);
}
void HttpHandler::handleGetSubscribe(Connection &conn,
const HttpConnectionState &state) {
// TODO: Implement subscription streaming
sendJsonResponse(
send_json_response(
conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
state.connection_close);
@@ -208,7 +235,7 @@ void HttpHandler::handleGetSubscribe(Connection &conn,
void HttpHandler::handleGetStatus(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract request_id from URL and check status
sendJsonResponse(
send_json_response(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})",
state.connection_close);
@@ -217,21 +244,21 @@ void HttpHandler::handleGetStatus(Connection &conn,
void HttpHandler::handlePutRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Parse retention policy from body and store
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})",
state.connection_close);
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
state.connection_close);
}
void HttpHandler::handleGetRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract policy_id from URL or return all policies
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close);
send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
}
void HttpHandler::handleDeleteRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract policy_id from URL and delete
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})",
state.connection_close);
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
state.connection_close);
}
void HttpHandler::handleGetMetrics(Connection &conn,
@@ -285,7 +312,7 @@ void HttpHandler::handleGetOk(Connection &conn,
void HttpHandler::handleNotFound(Connection &conn,
const HttpConnectionState &state) {
sendErrorResponse(conn, 404, "Not found", state.connection_close);
send_error_response(conn, 404, "Not found", state.connection_close);
}
// HTTP utility methods
@@ -347,22 +374,22 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
conn.append_message(response);
}
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
std::string_view json,
bool close_connection) {
void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection) {
sendResponse(conn, status_code, "application/json", json, close_connection);
}
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
std::string_view message,
bool close_connection) {
void HttpHandler::send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection) {
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
std::string json = R"({"error":")";
json += message;
json += R"("})";
sendJsonResponse(conn, status_code, json, close_connection);
send_json_response(conn, status_code, json, close_connection);
}
// llhttp callbacks
@@ -443,19 +470,49 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
state->method = std::string_view(method_str);
// Check if this looks like a POST to /v1/commit to initialize streaming
// parser
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
// Initialize streaming commit request parsing
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
state->commit_request = std::make_unique<CommitRequest>();
state->parsing_commit =
state->commit_parser->begin_streaming_parse(*state->commit_request);
if (!state->parsing_commit) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
[[maybe_unused]] auto *state =
static_cast<HttpConnectionState *>(parser->data);
(void)at;
(void)length;
auto *state = static_cast<HttpConnectionState *>(parser->data);
if (state->parsing_commit && state->commit_parser) {
// Stream data to commit request parser
auto status =
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->message_complete = true;
if (state->parsing_commit && state->commit_parser) {
// Finish streaming parse
auto status = state->commit_parser->finish_streaming_parse();
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}

View File

@@ -13,6 +13,10 @@
#include "server.hpp"
#include "thread_pipeline.hpp"
// Forward declarations
struct CommitRequest;
struct JsonCommitRequestParser;
/**
* HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing.
@@ -56,6 +60,11 @@ struct HttpConnectionState {
bool header_field_complete = false;
int64_t request_id = 0; // X-Request-Id header value
// Streaming parser for POST requests
std::unique_ptr<JsonCommitRequestParser> commit_parser;
std::unique_ptr<CommitRequest> commit_request;
bool parsing_commit = false;
explicit HttpConnectionState(ArenaAllocator &arena);
};
@@ -176,10 +185,10 @@ private:
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body,
bool close_connection = false);
static void sendJsonResponse(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);
static void sendErrorResponse(Connection &conn, int status_code,
std::string_view message,
bool close_connection = false);
static void send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);
static void send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection = false);
};