# Commit Processing Pipeline ## Overview WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing. ## Architecture The commit processing pipeline consists of four sequential stages, each running on a dedicated thread: ``` HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads ``` ### Pipeline Flow 1. **HTTP I/O Threads**: Parse and validate incoming commit requests 1. **Sequence Stage**: Assign sequential version numbers to commits 1. **Resolve Stage**: Validate preconditions and check for conflicts 1. **Persist Stage**: Write commits to durable storage and notify subscribers 1. **Release Stage**: Return connections to HTTP I/O threads for response handling ## Stage Details ### Stage 0: Sequence Assignment **Thread**: `txn-sequence` **Purpose**: Version assignment and request ID management **Serialization**: **Required** - Must be single-threaded **Responsibilities**: - **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage - **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool - Record version assignments for transaction tracking **Why Serialization is Required**: - Version numbers must be strictly sequential without gaps - Banned list updates must be atomic with version assignment - Status requests must get accurate upper bound on potential commit versions **Request ID Banned List**: - Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries - Lifecycle: Grows indefinitely until process restart (leader change) - Removal: Only on process restart/leader change, which invalidates all old request IDs **Current Implementation**: ```cpp bool HttpHandler::process_sequence_batch(BatchType &batch) { for (auto &entry : batch) { if (std::holds_alternative(entry)) { return true; // Shutdown signal } // TODO: Pattern match on CommitEntry vs StatusEntry // TODO: Implement sequence assignment logic for each type } return false; // Continue processing } ``` ### Stage 1: Precondition Resolution **Thread**: `txn-resolve` **Purpose**: Validate preconditions and detect conflicts **Serialization**: **Required** - Must be single-threaded **Responsibilities**: - **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted - **For StatusEntry**: N/A (transferred to status threadpool after sequence stage) - Mark failed commits with failure information (including which preconditions failed) **Why Serialization is Required**: - Must maintain consistent view of in-memory recent writes set - Conflict detection requires atomic evaluation of all preconditions against recent writes - Recent writes set updates must be synchronized **Transaction State Transitions**: - **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes) - Failed transactions continue through pipeline with failure information for client response **Current Implementation**: ```cpp bool HttpHandler::process_resolve_batch(BatchType &batch) { // TODO: Implement precondition resolution logic: // 1. For CommitEntry: Check preconditions against in-memory recent writes set // 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed // 3. If failed: Mark with failure info (which preconditions failed) // 4. For StatusEntry: N/A (already transferred to status threadpool) } ``` ### Stage 2: Transaction Persistence **Thread**: `txn-persist` **Purpose**: Write semi-committed transactions to durable storage **Serialization**: **Required** - Must mark batches durable in order **Responsibilities**: - **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark - **For StatusEntry**: N/A (transferred to status threadpool after sequence stage) - Generate durability events for `/v1/subscribe` when committed version advances - Batch multiple commits for efficient persistence operations **Why Serialization is Required**: - Batches must be marked durable in sequential version order - High water mark updates must reflect strict ordering of committed versions - Ensures consistency guarantees across all endpoints **Committed Version High Water Mark**: - Global atomic value tracking highest durably committed version - Updated after each batch commits: set to highest version in the batch - Read by `/v1/version` endpoint using atomic seq_cst reads - Enables `/v1/subscribe` durability events when high water mark advances **Batching Strategy**: - Multiple semi-committed transactions can be persisted in a single batch - High water mark updated once per batch to highest version in that batch - See `persistence.md` for detailed persistence design **Current Implementation**: ```cpp bool HttpHandler::process_persist_batch(BatchType &batch) { // TODO: Implement actual persistence logic: // 1. For CommitEntry: Apply operations to persistent storage // 2. Update committed version high water mark to highest version in batch // 3. Generate durability events for /v1/subscribe // 4. For StatusEntry: N/A (already transferred to status threadpool) } ``` ### Stage 3: Connection Release **Thread**: `txn-release` **Purpose**: Return connections to HTTP server for client response **Serialization**: Not required - Independent connection handling **Responsibilities**: - Return processed connections to HTTP server for all request types - Connection carries response data (success/failure) and status information - Trigger response transmission to clients **Response Handling**: - **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions) - **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline) - Failed transactions carry failure information through entire pipeline for proper client response **Implementation**: ```cpp bool HttpHandler::process_release_batch(BatchType &batch) { // Stage 3: Connection release for (auto &conn : batch) { if (!conn) { return true; // Shutdown signal } // Return connection to server for further processing or cleanup Server::release_back_to_server(std::move(conn)); } return false; // Continue processing } ``` ## Threading Model ### Thread Pipeline Configuration ```cpp // 4-stage pipeline: sequence -> resolve -> persist -> release // TODO: Update pipeline type from std::unique_ptr to PipelineEntry variant StaticThreadPipeline WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1> commitPipeline{lg_size}; // Pipeline entry type (to be implemented) using PipelineEntry = std::variant; ``` ### Thread Creation and Management ```cpp HttpHandler() { // Stage 0: Sequence assignment thread sequenceThread = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-sequence"); for (;;) { auto guard = commitPipeline.acquire<0, 0>(); if (process_sequence_batch(guard.batch)) { return; // Shutdown signal received } } }}; // Similar pattern for resolve, persist, and release threads... } ``` ### Batch Processing Each stage processes connections in batches using RAII guards: ```cpp auto guard = commitPipeline.acquire(); // Process batch for (auto &conn : guard.batch) { // Stage-specific processing } // Guard destructor automatically publishes batch to next stage ``` ## Flow Control ### Pipeline Entry Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`: ```cpp void HttpHandler::on_batch_complete(std::span> batch) { // Collect commit requests that passed basic validation for 4-stage pipeline processing int commit_count = 0; for (auto &conn : batch) { if (conn && conn->user_data) { auto *state = static_cast(conn->user_data); if (state->route == HttpRoute::POST_commit && state->commit_request && state->parsing_commit) { commit_count++; } } } // Send commit requests to 4-stage pipeline in batch if (commit_count > 0) { auto guard = commitPipeline.push(commit_count, true); // Move qualifying connections into pipeline } } ``` ### Backpressure Handling The pipeline implements natural backpressure: - Each stage blocks if downstream stages are full - `WaitIfUpstreamIdle` strategy balances latency vs throughput - Ring buffer size (`lg_size = 16`) controls maximum queued batches ### Shutdown Coordination Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages: ```cpp ~HttpHandler() { // Send single shutdown signal that flows through all pipeline stages { auto guard = commitPipeline.push(1, true); guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages } // Join all pipeline threads sequenceThread.join(); resolveThread.join(); persistThread.join(); releaseThread.join(); } ``` **Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal. ## Error Handling ### Stage-Level Error Handling Each stage handles different entry types: ```cpp // Pattern matching on pipeline entry variant std::visit([&](auto&& entry) { using T = std::decay_t; if constexpr (std::is_same_v) { return true; // Signal shutdown } else if constexpr (std::is_same_v) { // Process commit entry } else if constexpr (std::is_same_v) { // Handle status entry (or skip if transferred) } }, pipeline_entry); ``` ### Connection Error States - Failed CommitEntries are passed through the pipeline with error information - Downstream stages skip processing for error connections but forward them - Error responses are sent when connection reaches release stage - Connection ownership is always transferred to ensure cleanup ### Pipeline Integrity - ShutdownEntry signals shutdown to all stages - Each stage checks for ShutdownEntry and returns true to signal shutdown - RAII guards ensure entries are always published downstream - No entries are lost even during error conditions ## Performance Characteristics ### Throughput Optimization - **Batching**: Multiple connections processed per stage activation - **Lock-Free Communication**: Ring buffer between stages - **Minimal Context Switching**: Dedicated threads per stage - **Arena Allocation**: Efficient memory management throughout pipeline ### Latency Optimization - **Single-Pass Processing**: Each connection flows through all stages once - **Streaming Design**: Stages process concurrently - **Minimal Copying**: Connection ownership transfer, not data copying - **Direct Response**: Release stage triggers immediate response transmission ### Scalability Characteristics - **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff - **Thread Affinity**: Dedicated threads reduce scheduling overhead - **NUMA Awareness**: Can pin threads to specific CPU cores ## Configuration ### Pipeline Parameters ```cpp private: static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries // 4-stage pipeline configuration StaticThreadPipeline, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1> commitPipeline{lg_size}; ``` ### Tuning Considerations - **Ring Buffer Size**: Larger buffers increase memory usage but improve batching - **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency - **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs ## Pipeline Entry Types The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr`: ### Pipeline Entry Variants - **CommitEntry**: Contains `std::unique_ptr` with CommitRequest and connection state - **StatusEntry**: Contains `std::unique_ptr` with StatusRequest (transferred to status threadpool after sequence) - **ShutdownEntry**: Signals pipeline shutdown to all stages - **Future types**: Pipeline design supports additional entry types ### Stage Processing by Type | Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization | |-------|-------------|-------------|---------------|---------------| | **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** | | **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** | | **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** | | **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required | ## API Endpoint Integration ### `/v1/commit` - Transaction Submission **Pipeline Interaction**: Full pipeline traversal as CommitEntry #### Request Processing Flow 1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`): ```cpp void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) { // Parse and validate anything that doesn't need serialization: // - JSON parsing and CommitRequest construction // - Basic validation: leader_id check, operation format validation // - Check that we have at least one operation // If validation fails, send error response immediately and return // If validation succeeds, connection will enter pipeline in on_batch_complete() } ``` 1. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest) 1. **Pipeline Processing**: - **Sequence**: Check banned list → assign version (or reject) - **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details) - **Persist**: Apply operations → mark committed, update high water mark - **Release**: Return connection with response data 1. **Response Generation**: Based on pipeline results - **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}` - **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}` ### `/v1/status` - Commit Status Lookup **Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool #### Request Processing Flow 1. **HTTP I/O Thread Processing**: ```cpp void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) { // TODO: Extract request_id from URL and min_version from query params // Current: Returns placeholder static response } ``` 1. **Two-Phase Processing**: - **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound - **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic 1. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id ### `/v1/subscribe` - Real-time Transaction Stream **Pipeline Integration**: Consumes events from resolve and persist stages #### Event Sources - **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming - **Persist Stage**: Durability events when committed version high water mark advances #### Current Implementation ```cpp void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) { // TODO: Parse query parameters (after, durable) // TODO: Establish Server-Sent Events stream // TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events } ``` ### `/v1/version` - Version Information **Pipeline Integration**: Direct atomic read, no pipeline interaction ```cpp // TODO: Implement direct atomic read of committed version high water mark // No pipeline interaction needed - seq_cst atomic read // Leader ID is process-lifetime constant ``` **Response**: `{"version": , "leader_id": ""}` ## Integration Points ### HTTP Handler Integration The pipeline integrates with the HTTP handler at two points: 1. **Entry**: `on_batch_complete()` feeds connections into sequence stage 1. **Exit**: Release stage calls `Server::release_back_to_server()` ### Persistence Layer Integration The persist stage interfaces with: - **S3 Backend**: Batch writes for durability (see `persistence.md`) - **Subscriber System**: Real-time change stream notifications - **Metrics System**: Transaction throughput and latency tracking ### Database State Integration - **Sequence Stage**: Updates version number generator - **Resolve Stage**: Queries current database state for precondition validation - **Persist Stage**: Applies mutations to authoritative database state ## Future Optimizations ### Potential Enhancements 1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable 1. **NUMA Optimization**: Pin pipeline threads to specific CPU cores 1. **Batch Size Tuning**: Dynamic batch size based on load 1. **Stage Bypassing**: Skip resolve stage for transactions without preconditions 1. **Persistence Batching**: Aggregate multiple commits into larger S3 writes ### Monitoring and Observability 1. **Stage Metrics**: Throughput, latency, and queue depth per stage 1. **Error Tracking**: Error rates and types by stage 1. **Resource Utilization**: CPU and memory usage per pipeline thread 1. **Flow Control Events**: Backpressure and stall detection ## Implementation Status ### Current State - ✅ Pipeline structure implemented with 4 stages - ✅ Thread creation and management - ✅ RAII batch processing - ✅ Error handling framework - ✅ Shutdown coordination ### TODO Items - ⏳ Sequence assignment logic implementation - ⏳ Precondition resolution implementation - ⏳ S3 persistence batching implementation - ⏳ Subscriber notification system - ⏳ Performance monitoring and metrics - ⏳ Configuration tuning and optimization