Outline commit pipeline
This commit is contained in:
@@ -9,6 +9,7 @@
|
|||||||
#include "json_commit_request_parser.hpp"
|
#include "json_commit_request_parser.hpp"
|
||||||
#include "metric.hpp"
|
#include "metric.hpp"
|
||||||
#include "perfetto_categories.hpp"
|
#include "perfetto_categories.hpp"
|
||||||
|
#include "server.hpp"
|
||||||
|
|
||||||
auto requests_counter_family = metric::create_counter(
|
auto requests_counter_family = metric::create_counter(
|
||||||
"weaseldb_http_requests_total", "Total http requests");
|
"weaseldb_http_requests_total", "Total http requests");
|
||||||
@@ -62,16 +63,31 @@ void HttpHandler::on_write_buffer_drained(
|
|||||||
|
|
||||||
void HttpHandler::on_batch_complete(
|
void HttpHandler::on_batch_complete(
|
||||||
std::span<std::unique_ptr<Connection>> batch) {
|
std::span<std::unique_ptr<Connection>> batch) {
|
||||||
int readyCount = 0;
|
// Collect commit requests that passed basic validation for pipeline
|
||||||
for (int i = 0; i < int(batch.size()); ++i) {
|
// processing
|
||||||
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
|
int commit_count = 0;
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (conn && conn->user_data) {
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||||
|
state->parsing_commit && state->basic_validation_passed) {
|
||||||
|
commit_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send commit requests to pipeline in batch
|
||||||
|
if (commit_count > 0) {
|
||||||
|
auto guard = commitPipeline.push(commit_count, true);
|
||||||
|
auto out_iter = guard.batch.begin();
|
||||||
|
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (conn && conn->user_data) {
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||||
|
state->parsing_commit && state->basic_validation_passed) {
|
||||||
|
*out_iter++ = std::move(conn);
|
||||||
}
|
}
|
||||||
if (readyCount > 0) {
|
|
||||||
auto guard = pipeline.push(readyCount, /*block=*/true);
|
|
||||||
auto outIter = guard.batch.begin();
|
|
||||||
for (int i = 0; i < int(batch.size()); ++i) {
|
|
||||||
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
|
|
||||||
*outIter++ = std::move(batch[i]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -111,6 +127,9 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
|||||||
break;
|
break;
|
||||||
case HttpRoute::POST_commit:
|
case HttpRoute::POST_commit:
|
||||||
handlePostCommit(*conn_ptr, *state);
|
handlePostCommit(*conn_ptr, *state);
|
||||||
|
// Commit requests will be sent to pipeline in batch via on_batch_complete
|
||||||
|
// If basic validation failed, connection stays in batch but won't be sent
|
||||||
|
// to pipeline
|
||||||
break;
|
break;
|
||||||
case HttpRoute::GET_subscribe:
|
case HttpRoute::GET_subscribe:
|
||||||
handleGetSubscribe(*conn_ptr, *state);
|
handleGetSubscribe(*conn_ptr, *state);
|
||||||
@@ -204,24 +223,53 @@ void HttpHandler::handlePostCommit(Connection &conn,
|
|||||||
|
|
||||||
const CommitRequest &commit_request = *state.commit_request;
|
const CommitRequest &commit_request = *state.commit_request;
|
||||||
|
|
||||||
// TODO: Process the commit request with transaction engine
|
// Perform basic validation that doesn't require serialization (done on I/O
|
||||||
// For now, return a placeholder response with parsed request_id if available
|
// threads)
|
||||||
ArenaAllocator &arena = conn.get_arena();
|
bool valid = true;
|
||||||
std::string_view response;
|
std::string_view error_msg;
|
||||||
|
|
||||||
if (commit_request.request_id().has_value()) {
|
// Check that we have at least one operation
|
||||||
response = format(
|
if (commit_request.operations().empty()) {
|
||||||
arena,
|
valid = false;
|
||||||
R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})",
|
error_msg = "Commit request must contain at least one operation";
|
||||||
static_cast<int>(commit_request.request_id().value().size()),
|
|
||||||
commit_request.request_id().value().data());
|
|
||||||
} else {
|
|
||||||
response = static_format(
|
|
||||||
arena,
|
|
||||||
R"({"status":"committed","version":43,"leader_id":"leader123"})");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
send_json_response(conn, 200, response, state.connection_close);
|
// Check leader_id is not empty
|
||||||
|
if (valid && commit_request.leader_id().empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Commit request must specify a leader_id";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check leader_id matches current leader (static for process lifetime)
|
||||||
|
if (valid && commit_request.leader_id() != "test-leader") {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Leader ID mismatch - not current leader";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check operations are well-formed
|
||||||
|
if (valid) {
|
||||||
|
for (const auto &op : commit_request.operations()) {
|
||||||
|
if (op.param1.empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Operation key cannot be empty";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (op.type == Operation::Type::Write && op.param2.empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Write operation value cannot be empty";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!valid) {
|
||||||
|
send_error_response(conn, 400, error_msg, state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic validation passed - mark for pipeline processing
|
||||||
|
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
|
||||||
|
// Response will be sent after pipeline processing is complete
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleGetSubscribe(Connection &conn,
|
void HttpHandler::handleGetSubscribe(Connection &conn,
|
||||||
@@ -507,3 +555,113 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) {
|
|||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pipeline stage implementations (batch-based)
|
||||||
|
bool HttpHandler::process_validation_batch(BatchType &batch) {
|
||||||
|
// This stage performs ONLY work that requires serial processing:
|
||||||
|
// - Version assignment (must be sequential)
|
||||||
|
// - Precondition validation (requires consistent database state view)
|
||||||
|
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (!conn) {
|
||||||
|
return true; // Shutdown signal
|
||||||
|
}
|
||||||
|
|
||||||
|
TRACE_EVENT(
|
||||||
|
"http", "validation",
|
||||||
|
perfetto::Flow::Global(
|
||||||
|
static_cast<HttpConnectionState *>(conn->user_data)->request_id));
|
||||||
|
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
if (!state || !state->commit_request) {
|
||||||
|
// Should not happen - basic validation was done on I/O thread
|
||||||
|
send_error_response(*conn, 500, "Internal server error", true);
|
||||||
|
state = nullptr; // Mark as error
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Implement serialized validation logic:
|
||||||
|
// 1. Assign version number to this commit (must be sequential)
|
||||||
|
// 2. Validate preconditions against current database state:
|
||||||
|
// - Check point reads match expected values/existence
|
||||||
|
// - Check range reads haven't been modified since read_version
|
||||||
|
// - Ensure no conflicting writes from other transactions
|
||||||
|
|
||||||
|
// For now, just simulate version assignment
|
||||||
|
// All basic validation (including leader check) was done on I/O thread
|
||||||
|
|
||||||
|
// TODO: Actual precondition validation would go here
|
||||||
|
// This is where we'd check each precondition in
|
||||||
|
// commit_request.preconditions() against the current database state to
|
||||||
|
// ensure no conflicts
|
||||||
|
|
||||||
|
// For now, assume all preconditions pass since basic validation was done
|
||||||
|
|
||||||
|
// Precondition validation passed - continue to persistence stage
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpHandler::process_persistence_batch(BatchType &batch) {
|
||||||
|
// TODO: This is where we can implement efficient batching for S3 writes
|
||||||
|
// - Collect all valid commits in the batch
|
||||||
|
// - Write them as a single S3 batch operation
|
||||||
|
// - Generate responses for all commits in the batch
|
||||||
|
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (!conn) {
|
||||||
|
return true; // Shutdown signal
|
||||||
|
}
|
||||||
|
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
|
||||||
|
// Check if validation stage failed or connection is in error state
|
||||||
|
if (!state || !state->commit_request || !state->message_complete) {
|
||||||
|
// Skip persistence processing for failed validation
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRACE_EVENT("http", "persistence",
|
||||||
|
perfetto::Flow::Global(state->request_id));
|
||||||
|
|
||||||
|
const CommitRequest &commit_request = *state->commit_request;
|
||||||
|
|
||||||
|
// For now, process individually but this could be batched
|
||||||
|
ArenaAllocator &arena = conn->get_arena();
|
||||||
|
std::string_view response;
|
||||||
|
|
||||||
|
if (commit_request.request_id().has_value()) {
|
||||||
|
response = format(
|
||||||
|
arena,
|
||||||
|
R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})",
|
||||||
|
static_cast<int>(commit_request.request_id().value().size()),
|
||||||
|
commit_request.request_id().value().data());
|
||||||
|
} else {
|
||||||
|
response = static_format(
|
||||||
|
arena,
|
||||||
|
R"({"status":"committed","version":43,"leader_id":"leader123"})");
|
||||||
|
}
|
||||||
|
|
||||||
|
send_json_response(*conn, 200, response, state->connection_close);
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpHandler::process_release_batch(BatchType &batch) {
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (!conn) {
|
||||||
|
return true; // Shutdown signal
|
||||||
|
}
|
||||||
|
|
||||||
|
TRACE_EVENT(
|
||||||
|
"http", "release",
|
||||||
|
perfetto::Flow::Global(
|
||||||
|
static_cast<HttpConnectionState *>(conn->user_data)->request_id));
|
||||||
|
|
||||||
|
// Return connection to server for further processing or cleanup
|
||||||
|
Server::release_back_to_server(std::move(conn));
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|||||||
@@ -64,6 +64,8 @@ struct HttpConnectionState {
|
|||||||
std::unique_ptr<JsonCommitRequestParser> commit_parser;
|
std::unique_ptr<JsonCommitRequestParser> commit_parser;
|
||||||
std::unique_ptr<CommitRequest> commit_request;
|
std::unique_ptr<CommitRequest> commit_request;
|
||||||
bool parsing_commit = false;
|
bool parsing_commit = false;
|
||||||
|
bool basic_validation_passed =
|
||||||
|
false; // Set to true if basic validation passes
|
||||||
|
|
||||||
explicit HttpConnectionState(ArenaAllocator &arena);
|
explicit HttpConnectionState(ArenaAllocator &arena);
|
||||||
};
|
};
|
||||||
@@ -74,69 +76,52 @@ struct HttpConnectionState {
|
|||||||
*/
|
*/
|
||||||
struct HttpHandler : ConnectionHandler {
|
struct HttpHandler : ConnectionHandler {
|
||||||
HttpHandler() {
|
HttpHandler() {
|
||||||
finalStageThreads.emplace_back([this]() {
|
// Stage 0: Version assignment and precondition validation thread
|
||||||
pthread_setname_np(pthread_self(), "stage-1-0");
|
validationThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-validate");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<1, 0>();
|
auto guard = commitPipeline.acquire<0, 0>();
|
||||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
if (process_validation_batch(guard.batch)) {
|
||||||
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
|
return; // Shutdown signal received
|
||||||
auto &c = *it;
|
|
||||||
if (!c) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
|
||||||
TRACE_EVENT("http", "release",
|
|
||||||
perfetto::Flow::Global(state->request_id));
|
|
||||||
Server::release_back_to_server(std::move(c));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}};
|
||||||
});
|
|
||||||
finalStageThreads.emplace_back([this]() {
|
// Stage 1: Transaction persistence and subscriber streaming thread
|
||||||
pthread_setname_np(pthread_self(), "stage-1-1");
|
persistenceThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-persist");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<1, 1>();
|
auto guard = commitPipeline.acquire<1, 0>();
|
||||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
if (process_persistence_batch(guard.batch)) {
|
||||||
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
|
return; // Shutdown signal received
|
||||||
auto &c = *it;
|
|
||||||
if (!c) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
|
||||||
TRACE_EVENT("http", "release",
|
|
||||||
perfetto::Flow::Global(state->request_id));
|
|
||||||
Server::release_back_to_server(std::move(c));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}};
|
||||||
});
|
|
||||||
stage0Thread = std::thread{[this]() {
|
// Stage 2: Connection return to server thread
|
||||||
pthread_setname_np(pthread_self(), "stage-0");
|
releaseThread = std::thread{[this]() {
|
||||||
int nulls = 0;
|
pthread_setname_np(pthread_self(), "txn-release");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<0, 0>(1);
|
auto guard = commitPipeline.acquire<2, 0>();
|
||||||
for (auto &c : guard.batch) {
|
if (process_release_batch(guard.batch)) {
|
||||||
nulls += !c;
|
return; // Shutdown signal received
|
||||||
if (nulls == 2) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (volatile int i = 0; i < loop_iterations; i = i + 1)
|
|
||||||
;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
~HttpHandler() {
|
~HttpHandler() {
|
||||||
|
// Send shutdown signals to all pipeline stages
|
||||||
{
|
{
|
||||||
auto guard = pipeline.push(2, true);
|
auto guard = commitPipeline.push(3, true);
|
||||||
for (auto &c : guard.batch) {
|
for (auto &c : guard.batch) {
|
||||||
c = {};
|
c = {}; // null connection signals shutdown
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
stage0Thread.join();
|
|
||||||
for (auto &thread : finalStageThreads) {
|
// Join all pipeline threads
|
||||||
thread.join();
|
validationThread.join();
|
||||||
}
|
persistenceThread.join();
|
||||||
|
releaseThread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_connection_established(Connection &conn) override;
|
void on_connection_established(Connection &conn) override;
|
||||||
@@ -162,11 +147,24 @@ struct HttpHandler : ConnectionHandler {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static constexpr int lg_size = 16;
|
static constexpr int lg_size = 16;
|
||||||
|
|
||||||
|
// Main commit processing pipeline: validation -> persistence -> release
|
||||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||||
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>
|
||||||
pipeline{lg_size};
|
commitPipeline{lg_size};
|
||||||
std::thread stage0Thread;
|
|
||||||
std::vector<std::thread> finalStageThreads;
|
// Pipeline stage threads
|
||||||
|
std::thread validationThread;
|
||||||
|
std::thread persistenceThread;
|
||||||
|
std::thread releaseThread;
|
||||||
|
|
||||||
|
// Pipeline stage processing methods (batch-based)
|
||||||
|
using BatchType =
|
||||||
|
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||||
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>::Batch;
|
||||||
|
bool process_validation_batch(BatchType &batch);
|
||||||
|
bool process_persistence_batch(BatchType &batch);
|
||||||
|
bool process_release_batch(BatchType &batch);
|
||||||
|
|
||||||
// Route handlers
|
// Route handlers
|
||||||
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
|
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
|
||||||
|
|||||||
Reference in New Issue
Block a user