From 92720481082b41212040faa51caa92dec00c8461 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 3 Sep 2025 23:43:03 -0400 Subject: [PATCH] Outline commit pipeline --- src/http_handler.cpp | 206 ++++++++++++++++++++++++++++++++++++++----- src/http_handler.hpp | 100 ++++++++++----------- 2 files changed, 231 insertions(+), 75 deletions(-) diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 2425ae2..4600cc9 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -9,6 +9,7 @@ #include "json_commit_request_parser.hpp" #include "metric.hpp" #include "perfetto_categories.hpp" +#include "server.hpp" auto requests_counter_family = metric::create_counter( "weaseldb_http_requests_total", "Total http requests"); @@ -62,16 +63,31 @@ void HttpHandler::on_write_buffer_drained( void HttpHandler::on_batch_complete( std::span> batch) { - int readyCount = 0; - for (int i = 0; i < int(batch.size()); ++i) { - readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0; + // Collect commit requests that passed basic validation for pipeline + // processing + int commit_count = 0; + for (auto &conn : batch) { + if (conn && conn->user_data) { + auto *state = static_cast(conn->user_data); + if (state->route == HttpRoute::POST_commit && state->commit_request && + state->parsing_commit && state->basic_validation_passed) { + commit_count++; + } + } } - if (readyCount > 0) { - auto guard = pipeline.push(readyCount, /*block=*/true); - auto outIter = guard.batch.begin(); - for (int i = 0; i < int(batch.size()); ++i) { - if (batch[i] && batch[i]->outgoingBytesQueued() > 0) { - *outIter++ = std::move(batch[i]); + + // Send commit requests to pipeline in batch + if (commit_count > 0) { + auto guard = commitPipeline.push(commit_count, true); + auto out_iter = guard.batch.begin(); + + for (auto &conn : batch) { + if (conn && conn->user_data) { + auto *state = static_cast(conn->user_data); + if (state->route == HttpRoute::POST_commit && state->commit_request && + state->parsing_commit && state->basic_validation_passed) { + *out_iter++ = std::move(conn); + } } } } @@ -111,6 +127,9 @@ void HttpHandler::on_data_arrived(std::string_view data, break; case HttpRoute::POST_commit: handlePostCommit(*conn_ptr, *state); + // Commit requests will be sent to pipeline in batch via on_batch_complete + // If basic validation failed, connection stays in batch but won't be sent + // to pipeline break; case HttpRoute::GET_subscribe: handleGetSubscribe(*conn_ptr, *state); @@ -204,24 +223,53 @@ void HttpHandler::handlePostCommit(Connection &conn, const CommitRequest &commit_request = *state.commit_request; - // TODO: Process the commit request with transaction engine - // For now, return a placeholder response with parsed request_id if available - ArenaAllocator &arena = conn.get_arena(); - std::string_view response; + // Perform basic validation that doesn't require serialization (done on I/O + // threads) + bool valid = true; + std::string_view error_msg; - if (commit_request.request_id().has_value()) { - response = format( - arena, - R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})", - static_cast(commit_request.request_id().value().size()), - commit_request.request_id().value().data()); - } else { - response = static_format( - arena, - R"({"status":"committed","version":43,"leader_id":"leader123"})"); + // Check that we have at least one operation + if (commit_request.operations().empty()) { + valid = false; + error_msg = "Commit request must contain at least one operation"; } - send_json_response(conn, 200, response, state.connection_close); + // Check leader_id is not empty + if (valid && commit_request.leader_id().empty()) { + valid = false; + error_msg = "Commit request must specify a leader_id"; + } + + // Check leader_id matches current leader (static for process lifetime) + if (valid && commit_request.leader_id() != "test-leader") { + valid = false; + error_msg = "Leader ID mismatch - not current leader"; + } + + // Check operations are well-formed + if (valid) { + for (const auto &op : commit_request.operations()) { + if (op.param1.empty()) { + valid = false; + error_msg = "Operation key cannot be empty"; + break; + } + if (op.type == Operation::Type::Write && op.param2.empty()) { + valid = false; + error_msg = "Write operation value cannot be empty"; + break; + } + } + } + + if (!valid) { + send_error_response(conn, 400, error_msg, state.connection_close); + return; + } + + // Basic validation passed - mark for pipeline processing + const_cast(state).basic_validation_passed = true; + // Response will be sent after pipeline processing is complete } void HttpHandler::handleGetSubscribe(Connection &conn, @@ -507,3 +555,113 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) { return 0; } + +// Pipeline stage implementations (batch-based) +bool HttpHandler::process_validation_batch(BatchType &batch) { + // This stage performs ONLY work that requires serial processing: + // - Version assignment (must be sequential) + // - Precondition validation (requires consistent database state view) + + for (auto &conn : batch) { + if (!conn) { + return true; // Shutdown signal + } + + TRACE_EVENT( + "http", "validation", + perfetto::Flow::Global( + static_cast(conn->user_data)->request_id)); + + auto *state = static_cast(conn->user_data); + if (!state || !state->commit_request) { + // Should not happen - basic validation was done on I/O thread + send_error_response(*conn, 500, "Internal server error", true); + state = nullptr; // Mark as error + continue; + } + + // TODO: Implement serialized validation logic: + // 1. Assign version number to this commit (must be sequential) + // 2. Validate preconditions against current database state: + // - Check point reads match expected values/existence + // - Check range reads haven't been modified since read_version + // - Ensure no conflicting writes from other transactions + + // For now, just simulate version assignment + // All basic validation (including leader check) was done on I/O thread + + // TODO: Actual precondition validation would go here + // This is where we'd check each precondition in + // commit_request.preconditions() against the current database state to + // ensure no conflicts + + // For now, assume all preconditions pass since basic validation was done + + // Precondition validation passed - continue to persistence stage + } + return false; // Continue processing +} + +bool HttpHandler::process_persistence_batch(BatchType &batch) { + // TODO: This is where we can implement efficient batching for S3 writes + // - Collect all valid commits in the batch + // - Write them as a single S3 batch operation + // - Generate responses for all commits in the batch + + for (auto &conn : batch) { + if (!conn) { + return true; // Shutdown signal + } + + auto *state = static_cast(conn->user_data); + + // Check if validation stage failed or connection is in error state + if (!state || !state->commit_request || !state->message_complete) { + // Skip persistence processing for failed validation + continue; + } + + TRACE_EVENT("http", "persistence", + perfetto::Flow::Global(state->request_id)); + + const CommitRequest &commit_request = *state->commit_request; + + // For now, process individually but this could be batched + ArenaAllocator &arena = conn->get_arena(); + std::string_view response; + + if (commit_request.request_id().has_value()) { + response = format( + arena, + R"({"request_id":"%.*s","status":"committed","version":43,"leader_id":"leader123"})", + static_cast(commit_request.request_id().value().size()), + commit_request.request_id().value().data()); + } else { + response = static_format( + arena, + R"({"status":"committed","version":43,"leader_id":"leader123"})"); + } + + send_json_response(*conn, 200, response, state->connection_close); + } + + return false; // Continue processing +} + +bool HttpHandler::process_release_batch(BatchType &batch) { + for (auto &conn : batch) { + if (!conn) { + return true; // Shutdown signal + } + + TRACE_EVENT( + "http", "release", + perfetto::Flow::Global( + static_cast(conn->user_data)->request_id)); + + // Return connection to server for further processing or cleanup + Server::release_back_to_server(std::move(conn)); + } + + return false; // Continue processing +} diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 753ec43..1b0846c 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -64,6 +64,8 @@ struct HttpConnectionState { std::unique_ptr commit_parser; std::unique_ptr commit_request; bool parsing_commit = false; + bool basic_validation_passed = + false; // Set to true if basic validation passes explicit HttpConnectionState(ArenaAllocator &arena); }; @@ -74,69 +76,52 @@ struct HttpConnectionState { */ struct HttpHandler : ConnectionHandler { HttpHandler() { - finalStageThreads.emplace_back([this]() { - pthread_setname_np(pthread_self(), "stage-1-0"); + // Stage 0: Version assignment and precondition validation thread + validationThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-validate"); for (;;) { - auto guard = pipeline.acquire<1, 0>(); - for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { - if ((it.index() % 2) == 0) { // Thread 0 handles even indices - auto &c = *it; - if (!c) { - return; - } - auto *state = static_cast(c->user_data); - TRACE_EVENT("http", "release", - perfetto::Flow::Global(state->request_id)); - Server::release_back_to_server(std::move(c)); - } + auto guard = commitPipeline.acquire<0, 0>(); + if (process_validation_batch(guard.batch)) { + return; // Shutdown signal received } } - }); - finalStageThreads.emplace_back([this]() { - pthread_setname_np(pthread_self(), "stage-1-1"); + }}; + + // Stage 1: Transaction persistence and subscriber streaming thread + persistenceThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-persist"); for (;;) { - auto guard = pipeline.acquire<1, 1>(); - for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { - if ((it.index() % 2) == 1) { // Thread 1 handles odd indices - auto &c = *it; - if (!c) { - return; - } - auto *state = static_cast(c->user_data); - TRACE_EVENT("http", "release", - perfetto::Flow::Global(state->request_id)); - Server::release_back_to_server(std::move(c)); - } + auto guard = commitPipeline.acquire<1, 0>(); + if (process_persistence_batch(guard.batch)) { + return; // Shutdown signal received } } - }); - stage0Thread = std::thread{[this]() { - pthread_setname_np(pthread_self(), "stage-0"); - int nulls = 0; + }}; + + // Stage 2: Connection return to server thread + releaseThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-release"); for (;;) { - auto guard = pipeline.acquire<0, 0>(1); - for (auto &c : guard.batch) { - nulls += !c; - if (nulls == 2) { - return; - } - for (volatile int i = 0; i < loop_iterations; i = i + 1) - ; + auto guard = commitPipeline.acquire<2, 0>(); + if (process_release_batch(guard.batch)) { + return; // Shutdown signal received } } }}; } ~HttpHandler() { + // Send shutdown signals to all pipeline stages { - auto guard = pipeline.push(2, true); + auto guard = commitPipeline.push(3, true); for (auto &c : guard.batch) { - c = {}; + c = {}; // null connection signals shutdown } } - stage0Thread.join(); - for (auto &thread : finalStageThreads) { - thread.join(); - } + + // Join all pipeline threads + validationThread.join(); + persistenceThread.join(); + releaseThread.join(); } void on_connection_established(Connection &conn) override; @@ -162,11 +147,24 @@ struct HttpHandler : ConnectionHandler { private: static constexpr int lg_size = 16; + + // Main commit processing pipeline: validation -> persistence -> release StaticThreadPipeline, - WaitStrategy::WaitIfUpstreamIdle, 1, 2> - pipeline{lg_size}; - std::thread stage0Thread; - std::vector finalStageThreads; + WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1> + commitPipeline{lg_size}; + + // Pipeline stage threads + std::thread validationThread; + std::thread persistenceThread; + std::thread releaseThread; + + // Pipeline stage processing methods (batch-based) + using BatchType = + StaticThreadPipeline, + WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1>::Batch; + bool process_validation_batch(BatchType &batch); + bool process_persistence_batch(BatchType &batch); + bool process_release_batch(BatchType &batch); // Route handlers void handleGetVersion(Connection &conn, const HttpConnectionState &state);