Basic implementation of /commit, /version, and /status

No precondition checking, persistence, or log scanning yet.
This commit is contained in:
2025-09-04 15:40:17 -04:00
parent 8b6736127a
commit 96aae52853
5 changed files with 475 additions and 148 deletions

View File

@@ -156,6 +156,8 @@ add_executable(
test_http_handler test_http_handler
tests/test_http_handler.cpp tests/test_http_handler.cpp
src/http_handler.cpp src/http_handler.cpp
src/server.cpp
src/config.cpp
src/json_commit_request_parser.cpp src/json_commit_request_parser.cpp
src/arena_allocator.cpp src/arena_allocator.cpp
src/format.cpp src/format.cpp
@@ -169,6 +171,7 @@ target_link_libraries(
doctest::doctest doctest::doctest
llhttp_static llhttp_static
Threads::Threads Threads::Threads
toml11::toml11
perfetto perfetto
simdutf::simdutf simdutf::simdutf
weaseljson) weaseljson)

View File

@@ -209,7 +209,7 @@ struct Connection {
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued()); * metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
* ``` * ```
*/ */
int64_t outgoingBytesQueued() const { int64_t outgoing_bytes_queued() const {
#ifndef NDEBUG #ifndef NDEBUG
// Debug build: validate counter accuracy // Debug build: validate counter accuracy
int64_t computed_total = 0; int64_t computed_total = 0;

View File

@@ -1,5 +1,6 @@
#include "http_handler.hpp" #include "http_handler.hpp"
#include <atomic>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <strings.h> #include <strings.h>
@@ -9,6 +10,7 @@
#include "json_commit_request_parser.hpp" #include "json_commit_request_parser.hpp"
#include "metric.hpp" #include "metric.hpp"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp" #include "server.hpp"
auto requests_counter_family = metric::create_counter( auto requests_counter_family = metric::create_counter(
@@ -16,6 +18,12 @@ auto requests_counter_family = metric::create_counter(
thread_local auto metrics_counter = thread_local auto metrics_counter =
requests_counter_family.create({{"path", "/metrics"}}); requests_counter_family.create({{"path", "/metrics"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
"Memory used by banned request IDs arena")
.create({});
// HttpConnectionState implementation // HttpConnectionState implementation
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena) HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
: current_header_field_buf(ArenaStlAllocator<char>(&arena)), : current_header_field_buf(ArenaStlAllocator<char>(&arena)),
@@ -63,30 +71,45 @@ void HttpHandler::on_write_buffer_drained(
void HttpHandler::on_batch_complete( void HttpHandler::on_batch_complete(
std::span<std::unique_ptr<Connection>> batch) { std::span<std::unique_ptr<Connection>> batch) {
// Collect commit requests that passed basic validation for pipeline // Collect commit requests and status requests for pipeline processing
// processing int pipeline_count = 0;
int commit_count = 0;
// Count both commit and status requests
for (auto &conn : batch) { for (auto &conn : batch) {
if (conn && conn->user_data) { if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data); auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Count commit requests that passed basic validation
if (state->route == HttpRoute::POST_commit && state->commit_request && if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) { state->parsing_commit && state->basic_validation_passed) {
commit_count++; pipeline_count++;
}
// Count status requests
else if (state->route == HttpRoute::GET_status &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
} }
} }
} }
// Send commit requests to pipeline in batch // Send requests to 4-stage pipeline in batch
if (commit_count > 0) { if (pipeline_count > 0) {
auto guard = commitPipeline.push(commit_count, true); auto guard = commitPipeline.push(pipeline_count, true);
auto out_iter = guard.batch.begin(); auto out_iter = guard.batch.begin();
for (auto &conn : batch) { for (auto &conn : batch) {
if (conn && conn->user_data) { if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data); auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::POST_commit && state->commit_request && if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) { state->parsing_commit && state->basic_validation_passed) {
*out_iter++ = std::move(conn); *out_iter++ = CommitEntry{std::move(conn)};
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GET_status) {
*out_iter++ = StatusEntry{std::move(conn)};
} }
} }
} }
@@ -127,9 +150,6 @@ void HttpHandler::on_data_arrived(std::string_view data,
break; break;
case HttpRoute::POST_commit: case HttpRoute::POST_commit:
handlePostCommit(*conn_ptr, *state); handlePostCommit(*conn_ptr, *state);
// Commit requests will be sent to pipeline in batch via on_batch_complete
// If basic validation failed, connection stays in batch but won't be sent
// to pipeline
break; break;
case HttpRoute::GET_subscribe: case HttpRoute::GET_subscribe:
handleGetSubscribe(*conn_ptr, *state); handleGetSubscribe(*conn_ptr, *state);
@@ -203,7 +223,8 @@ void HttpHandler::handleGetVersion(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
send_json_response( send_json_response(
conn, 200, conn, 200,
R"({"version":"0.0.1","leader":"node-1","committed_version":42})", format(conn.get_arena(), R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)),
state.connection_close); state.connection_close);
} }
@@ -240,12 +261,6 @@ void HttpHandler::handlePostCommit(Connection &conn,
error_msg = "Commit request must specify a leader_id"; error_msg = "Commit request must specify a leader_id";
} }
// Check leader_id matches current leader (static for process lifetime)
if (valid && commit_request.leader_id() != "test-leader") {
valid = false;
error_msg = "Leader ID mismatch - not current leader";
}
// Check operations are well-formed // Check operations are well-formed
if (valid) { if (valid) {
for (const auto &op : commit_request.operations()) { for (const auto &op : commit_request.operations()) {
@@ -267,9 +282,9 @@ void HttpHandler::handlePostCommit(Connection &conn,
return; return;
} }
// Basic validation passed - mark for pipeline processing // Basic validation passed - mark for 4-stage pipeline processing
const_cast<HttpConnectionState &>(state).basic_validation_passed = true; const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
// Response will be sent after pipeline processing is complete // Response will be sent after 4-stage pipeline processing is complete
} }
void HttpHandler::handleGetSubscribe(Connection &conn, void HttpHandler::handleGetSubscribe(Connection &conn,
@@ -282,12 +297,62 @@ void HttpHandler::handleGetSubscribe(Connection &conn,
} }
void HttpHandler::handleGetStatus(Connection &conn, void HttpHandler::handleGetStatus(Connection &conn,
const HttpConnectionState &state) { HttpConnectionState &state) {
// TODO: Extract request_id from URL and check status // Status requests are processed through the pipeline
send_json_response( // Response will be generated in the sequence stage
conn, 200, // This handler extracts request_id from query parameters and prepares for
R"({"request_id":"example","status":"committed","version":43})", // pipeline processing
state.connection_close);
// Extract request_id from query parameters:
// /v1/status?request_id=<ID>&min_version=<VERSION>
std::string_view url = state.url;
// Find query parameters
size_t query_pos = url.find('?');
if (query_pos == std::string_view::npos) {
// No query parameters
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
std::string_view query_string = url.substr(query_pos + 1);
// Simple query parameter parsing for request_id
// Look for "request_id=" in the query string
size_t request_id_pos = query_string.find("request_id=");
if (request_id_pos == std::string_view::npos) {
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
// Extract the request_id value
size_t value_start = request_id_pos + 11; // length of "request_id="
if (value_start >= query_string.length()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Find the end of the request_id value (next & or end of string)
size_t value_end = query_string.find('&', value_start);
if (value_end == std::string_view::npos) {
value_end = query_string.length();
}
state.status_request_id =
query_string.substr(value_start, value_end - value_start);
if (state.status_request_id.empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Ready for pipeline processing
} }
void HttpHandler::handlePutRetention(Connection &conn, void HttpHandler::handlePutRetention(Connection &conn,
@@ -331,16 +396,16 @@ void HttpHandler::handleGetMetrics(Connection &conn,
arena, "HTTP/1.1 200 OK\r\n", arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n", "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"Connection: close\r\n", "\r\n"); "\r\n", "Connection: close\r\n", "\r\n");
conn.close_after_send(); conn.close_after_send();
} else { } else {
headers = static_format( headers = static_format(
arena, "HTTP/1.1 200 OK\r\n", arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n", "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"Connection: keep-alive\r\n", "\r\n"); "\r\n", "Connection: keep-alive\r\n", "\r\n");
} }
// Send headers // Send headers
@@ -354,7 +419,7 @@ void HttpHandler::handleGetMetrics(Connection &conn,
void HttpHandler::handleGetOk(Connection &conn, void HttpHandler::handleGetOk(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id)); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
sendResponse(conn, 200, "text/plain", "OK", state.connection_close); sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
} }
@@ -403,7 +468,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
"\r\n%.*s", "\r\n%.*s",
status_code, static_cast<int>(status_text.size()), status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()), status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), state->request_id, content_type.data(), body.size(), state->http_request_id,
connection_header, static_cast<int>(body.size()), body.data()); connection_header, static_cast<int>(body.size()), body.data());
if (close_connection) { if (close_connection) {
@@ -489,7 +554,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
id = id * 10 + (c - '0'); id = id * 10 + (c - '0');
} }
} }
state->request_id = id; state->http_request_id = id;
} }
// Clear buffers for next header // Clear buffers for next header
@@ -557,110 +622,281 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) {
} }
// Pipeline stage implementations (batch-based) // Pipeline stage implementations (batch-based)
bool HttpHandler::process_validation_batch(BatchType &batch) { bool HttpHandler::process_sequence_batch(BatchType &batch) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing: // This stage performs ONLY work that requires serial processing:
// - Version assignment (must be sequential) // - Version/sequence number assignment (must be sequential)
// - Precondition validation (requires consistent database state view) // - Request ID banned list management
for (auto &conn : batch) { for (auto &entry : batch) {
if (!conn) { // Pattern match on pipeline entry variant
return true; // Shutdown signal bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Should not happen - basic validation was done on I/O thread
send_error_response(*commit_entry.connection, 500,
"Internal server error", true);
return false;
}
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
if (state->commit_request &&
state->commit_request->request_id().has_value()) {
auto commit_request_id =
state->commit_request->request_id().value();
if (banned_request_ids.find(commit_request_id) !=
banned_request_ids.end()) {
// Request ID is banned, this commit should fail
send_json_response(
*commit_entry.connection, 409,
R"({"status": "not_committed", "error": "request_id_banned"})",
state->connection_close);
return false;
}
}
// Assign sequential version number
commit_entry.assigned_version = next_version++;
TRACE_EVENT("http", "sequence_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version
// upper bound
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state && !state->status_request_id.empty()) {
// Add request_id to banned list - store the string in arena and
// use string_view
char *arena_chars = banned_request_arena.allocate<char>(
state->status_request_id.size());
std::memcpy(arena_chars, state->status_request_id.data(),
state->status_request_id.size());
std::string_view request_id_view(arena_chars,
state->status_request_id.size());
banned_request_ids.insert(request_id_view);
// Update memory usage metric
banned_request_ids_memory_gauge.set(
banned_request_arena.total_allocated());
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version - 1;
}
TRACE_EVENT("http", "sequence_status",
perfetto::Flow::Global(state->http_request_id));
// TODO: Transfer to status threadpool - for now just respond
// not_committed
send_json_response(*status_entry.connection, 200,
R"({"status": "not_committed"})",
state->connection_close);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
} }
TRACE_EVENT(
"http", "validation",
perfetto::Flow::Global(
static_cast<HttpConnectionState *>(conn->user_data)->request_id));
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
if (!state || !state->commit_request) {
// Should not happen - basic validation was done on I/O thread
send_error_response(*conn, 500, "Internal server error", true);
state = nullptr; // Mark as error
continue;
}
// TODO: Implement serialized validation logic:
// 1. Assign version number to this commit (must be sequential)
// 2. Validate preconditions against current database state:
// - Check point reads match expected values/existence
// - Check range reads haven't been modified since read_version
// - Ensure no conflicting writes from other transactions
// For now, just simulate version assignment
// All basic validation (including leader check) was done on I/O thread
// TODO: Actual precondition validation would go here
// This is where we'd check each precondition in
// commit_request.preconditions() against the current database state to
// ensure no conflicts
// For now, assume all preconditions pass since basic validation was done
// Precondition validation passed - continue to persistence stage
} }
return false; // Continue processing return false; // Continue processing
} }
bool HttpHandler::process_persistence_batch(BatchType &batch) { bool HttpHandler::process_resolve_batch(BatchType &batch) {
// TODO: This is where we can implement efficient batching for S3 writes // Stage 1: Precondition resolution
// - Collect all valid commits in the batch // This stage must be serialized to maintain consistent database state view
// - Write them as a single S3 batch operation // - Validate preconditions against current database state
// - Generate responses for all commits in the batch // - Check for conflicts with other transactions
for (auto &conn : batch) { for (auto &entry : batch) {
if (!conn) { // Pattern match on pipeline entry variant
return true; // Shutdown signal bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Skip processing for failed sequence stage
return false;
}
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
TRACE_EVENT("http", "resolve_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
} }
}
return false; // Continue processing
}
auto *state = static_cast<HttpConnectionState *>(conn->user_data); bool HttpHandler::process_persist_batch(BatchType &batch) {
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
// Check if validation stage failed or connection is in error state for (auto &entry : batch) {
if (!state || !state->commit_request || !state->message_complete) { // Pattern match on pipeline entry variant
// Skip persistence processing for failed validation bool should_shutdown = std::visit(
continue; [&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
// Skip if resolve failed or connection is in error state
if (!state || !state->commit_request ||
!commit_entry.resolve_success) {
return false;
}
// Mark as persisted and update committed version high water mark
commit_entry.persist_success = true;
committed_version.store(commit_entry.assigned_version,
std::memory_order_seq_cst);
TRACE_EVENT("http", "persist_commit",
perfetto::Flow::Global(state->http_request_id));
const CommitRequest &commit_request = *state->commit_request;
ArenaAllocator &arena = commit_entry.connection->get_arena();
std::string_view response;
// Generate success response with actual assigned version
if (commit_request.request_id().has_value()) {
response = format(
arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response = format(
arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
send_json_response(*commit_entry.connection, 200, response,
state->connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
} }
TRACE_EVENT("http", "persistence",
perfetto::Flow::Global(state->request_id));
const CommitRequest &commit_request = *state->commit_request;
// For now, process individually but this could be batched
ArenaAllocator &arena = conn->get_arena();
std::string_view response;
if (commit_request.request_id().has_value()) {
response = format(
arena,
R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data());
} else {
response = static_format(
arena,
R"({"status":"committed","version":43,"leader_id":"leader123"})");
}
send_json_response(*conn, 200, response, state->connection_close);
} }
return false; // Continue processing return false; // Continue processing
} }
bool HttpHandler::process_release_batch(BatchType &batch) { bool HttpHandler::process_release_batch(BatchType &batch) {
for (auto &conn : batch) { // Stage 3: Connection release
if (!conn) { // Return connections to server for response transmission
return true; // Shutdown signal
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_commit",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(commit_entry.connection));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_status",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(status_entry.connection));
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
} }
TRACE_EVENT(
"http", "release",
perfetto::Flow::Global(
static_cast<HttpConnectionState *>(conn->user_data)->request_id));
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(conn));
} }
return false; // Continue processing return false; // Continue processing

View File

@@ -1,15 +1,19 @@
#pragma once #pragma once
#include <atomic>
#include <memory> #include <memory>
#include <string_view> #include <string_view>
#include <thread> #include <thread>
#include <unordered_set>
#include <llhttp.h> #include <llhttp.h>
#include "arena_allocator.hpp"
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp" #include "connection_handler.hpp"
#include "loop_iterations.hpp" #include "loop_iterations.hpp"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp" #include "server.hpp"
#include "thread_pipeline.hpp" #include "thread_pipeline.hpp"
@@ -52,13 +56,18 @@ struct HttpConnectionState {
bool connection_close = false; // Client requested connection close bool connection_close = false; // Client requested connection close
HttpRoute route = HttpRoute::NotFound; HttpRoute route = HttpRoute::NotFound;
// Status request data
std::string_view
status_request_id; // Request ID extracted from /v1/status/{id} URL
// Header accumulation buffers (arena-allocated) // Header accumulation buffers (arena-allocated)
using ArenaString = using ArenaString =
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>; std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
ArenaString current_header_field_buf; ArenaString current_header_field_buf;
ArenaString current_header_value_buf; ArenaString current_header_value_buf;
bool header_field_complete = false; bool header_field_complete = false;
int64_t request_id = 0; // X-Request-Id header value int64_t http_request_id =
0; // X-Request-Id header value (for tracing/logging)
// Streaming parser for POST requests // Streaming parser for POST requests
std::unique_ptr<JsonCommitRequestParser> commit_parser; std::unique_ptr<JsonCommitRequestParser> commit_parser;
@@ -75,34 +84,47 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing. * Supports the WeaselDB REST API endpoints with enum-based routing.
*/ */
struct HttpHandler : ConnectionHandler { struct HttpHandler : ConnectionHandler {
HttpHandler() { HttpHandler()
// Stage 0: Version assignment and precondition validation thread : banned_request_ids(
validationThread = std::thread{[this]() { ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
pthread_setname_np(pthread_self(), "txn-validate"); // Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) { for (;;) {
auto guard = commitPipeline.acquire<0, 0>(); auto guard = commitPipeline.acquire<0, 0>();
if (process_validation_batch(guard.batch)) { if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received return; // Shutdown signal received
} }
} }
}}; }};
// Stage 1: Transaction persistence and subscriber streaming thread // Stage 1: Precondition resolution thread
persistenceThread = std::thread{[this]() { resolveThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist"); pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) { for (;;) {
auto guard = commitPipeline.acquire<1, 0>(); auto guard = commitPipeline.acquire<1, 0>();
if (process_persistence_batch(guard.batch)) { if (process_resolve_batch(guard.batch)) {
return; // Shutdown signal received return; // Shutdown signal received
} }
} }
}}; }};
// Stage 2: Connection return to server thread // Stage 2: Transaction persistence thread
persistThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist");
for (;;) {
auto guard = commitPipeline.acquire<2, 0>();
if (process_persist_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Stage 3: Connection return to server thread
releaseThread = std::thread{[this]() { releaseThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release"); pthread_setname_np(pthread_self(), "txn-release");
for (;;) { for (;;) {
auto guard = commitPipeline.acquire<2, 0>(); auto guard = commitPipeline.acquire<3, 0>();
if (process_release_batch(guard.batch)) { if (process_release_batch(guard.batch)) {
return; // Shutdown signal received return; // Shutdown signal received
} }
@@ -110,17 +132,17 @@ struct HttpHandler : ConnectionHandler {
}}; }};
} }
~HttpHandler() { ~HttpHandler() {
// Send shutdown signals to all pipeline stages // Send single shutdown signal that flows through all pipeline stages
{ {
auto guard = commitPipeline.push(3, true); auto guard = commitPipeline.push(1, true);
for (auto &c : guard.batch) { guard.batch[0] =
c = {}; // null connection signals shutdown ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
} }
// Join all pipeline threads // Join all pipeline threads
validationThread.join(); sequenceThread.join();
persistenceThread.join(); resolveThread.join();
persistThread.join();
releaseThread.join(); releaseThread.join();
} }
@@ -148,29 +170,48 @@ struct HttpHandler : ConnectionHandler {
private: private:
static constexpr int lg_size = 16; static constexpr int lg_size = 16;
// Main commit processing pipeline: validation -> persistence -> release // Pipeline state (sequence thread only)
StaticThreadPipeline<std::unique_ptr<Connection>, int64_t next_version = 1; // Next version to assign (sequence thread only)
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>
// Pipeline state (persist thread writes, I/O threads read)
std::atomic<int64_t> committed_version{
0}; // Highest committed version (persist thread writes, I/O threads read)
// Arena for banned request IDs and related data structures (sequence thread
// only)
ArenaAllocator banned_request_arena;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
// (string_views into arena)
// Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
1>
commitPipeline{lg_size}; commitPipeline{lg_size};
// Pipeline stage threads // Pipeline stage threads
std::thread validationThread; std::thread sequenceThread;
std::thread persistenceThread; std::thread resolveThread;
std::thread persistThread;
std::thread releaseThread; std::thread releaseThread;
// Pipeline stage processing methods (batch-based) // Pipeline stage processing methods (batch-based)
using BatchType = using BatchType =
StaticThreadPipeline<std::unique_ptr<Connection>, StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>::Batch; 1, 1, 1>::Batch;
bool process_validation_batch(BatchType &batch); bool process_sequence_batch(BatchType &batch);
bool process_persistence_batch(BatchType &batch); bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch); bool process_release_batch(BatchType &batch);
// Route handlers // Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state); void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state); void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state); void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, const HttpConnectionState &state); void handleGetStatus(Connection &conn, HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state); void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state); void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn, void handleDeleteRetention(Connection &conn,

47
src/pipeline_entry.hpp Normal file
View File

@@ -0,0 +1,47 @@
#pragma once
#include "connection.hpp"
#include <memory>
#include <variant>
/**
* Pipeline entry for commit requests that need full 4-stage processing.
* Contains connection with parsed CommitRequest.
*/
struct CommitEntry {
std::unique_ptr<Connection> connection;
int64_t assigned_version = 0; // Set by sequence stage
bool resolve_success = false; // Set by resolve stage
bool persist_success = false; // Set by persist stage
CommitEntry() = default; // Default constructor for variant
explicit CommitEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for status requests that need sequence stage processing
* then transfer to status threadpool.
*/
struct StatusEntry {
std::unique_ptr<Connection> connection;
int64_t version_upper_bound = 0; // Set by sequence stage
StatusEntry() = default; // Default constructor for variant
explicit StatusEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup.
*/
struct ShutdownEntry {
// Empty struct - presence indicates shutdown
};
/**
* Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately.
*/
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;