diff --git a/commit_pipeline.md b/commit_pipeline.md new file mode 100644 index 0000000..cd7825c --- /dev/null +++ b/commit_pipeline.md @@ -0,0 +1,499 @@ +# Commit Processing Pipeline + +## Overview + +WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing. + +## Architecture + +The commit processing pipeline consists of four sequential stages, each running on a dedicated thread: + +``` +HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads +``` + +### Pipeline Flow + +1. **HTTP I/O Threads**: Parse and validate incoming commit requests +2. **Sequence Stage**: Assign sequential version numbers to commits +3. **Resolve Stage**: Validate preconditions and check for conflicts +4. **Persist Stage**: Write commits to durable storage and notify subscribers +5. **Release Stage**: Return connections to HTTP I/O threads for response handling + +## Stage Details + +### Stage 0: Sequence Assignment + +**Thread**: `txn-sequence` +**Purpose**: Version assignment and request ID management +**Serialization**: **Required** - Must be single-threaded + +**Responsibilities**: +- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage +- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool +- Record version assignments for transaction tracking + +**Why Serialization is Required**: +- Version numbers must be strictly sequential without gaps +- Banned list updates must be atomic with version assignment +- Status requests must get accurate upper bound on potential commit versions + +**Request ID Banned List**: +- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries +- Lifecycle: Grows indefinitely until process restart (leader change) +- Removal: Only on process restart/leader change, which invalidates all old request IDs + +**Current Implementation**: +```cpp +bool HttpHandler::process_sequence_batch(BatchType &batch) { + for (auto &entry : batch) { + if (std::holds_alternative(entry)) { + return true; // Shutdown signal + } + // TODO: Pattern match on CommitEntry vs StatusEntry + // TODO: Implement sequence assignment logic for each type + } + return false; // Continue processing +} +``` + +### Stage 1: Precondition Resolution + +**Thread**: `txn-resolve` +**Purpose**: Validate preconditions and detect conflicts +**Serialization**: **Required** - Must be single-threaded + +**Responsibilities**: +- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted +- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage) +- Mark failed commits with failure information (including which preconditions failed) + +**Why Serialization is Required**: +- Must maintain consistent view of in-memory recent writes set +- Conflict detection requires atomic evaluation of all preconditions against recent writes +- Recent writes set updates must be synchronized + +**Transaction State Transitions**: +- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes) +- Failed transactions continue through pipeline with failure information for client response + +**Current Implementation**: +```cpp +bool HttpHandler::process_resolve_batch(BatchType &batch) { + // TODO: Implement precondition resolution logic: + // 1. For CommitEntry: Check preconditions against in-memory recent writes set + // 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed + // 3. If failed: Mark with failure info (which preconditions failed) + // 4. For StatusEntry: N/A (already transferred to status threadpool) +} +``` + +### Stage 2: Transaction Persistence + +**Thread**: `txn-persist` +**Purpose**: Write semi-committed transactions to durable storage +**Serialization**: **Required** - Must mark batches durable in order + +**Responsibilities**: +- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark +- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage) +- Generate durability events for `/v1/subscribe` when committed version advances +- Batch multiple commits for efficient persistence operations + +**Why Serialization is Required**: +- Batches must be marked durable in sequential version order +- High water mark updates must reflect strict ordering of committed versions +- Ensures consistency guarantees across all endpoints + +**Committed Version High Water Mark**: +- Global atomic value tracking highest durably committed version +- Updated after each batch commits: set to highest version in the batch +- Read by `/v1/version` endpoint using atomic seq_cst reads +- Enables `/v1/subscribe` durability events when high water mark advances + +**Batching Strategy**: +- Multiple semi-committed transactions can be persisted in a single batch +- High water mark updated once per batch to highest version in that batch +- See `persistence.md` for detailed persistence design + +**Current Implementation**: +```cpp +bool HttpHandler::process_persist_batch(BatchType &batch) { + // TODO: Implement actual persistence logic: + // 1. For CommitEntry: Apply operations to persistent storage + // 2. Update committed version high water mark to highest version in batch + // 3. Generate durability events for /v1/subscribe + // 4. For StatusEntry: N/A (already transferred to status threadpool) +} +``` + +### Stage 3: Connection Release + +**Thread**: `txn-release` +**Purpose**: Return connections to HTTP server for client response +**Serialization**: Not required - Independent connection handling + +**Responsibilities**: +- Return processed connections to HTTP server for all request types +- Connection carries response data (success/failure) and status information +- Trigger response transmission to clients + +**Response Handling**: +- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions) +- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline) +- Failed transactions carry failure information through entire pipeline for proper client response + +**Implementation**: +```cpp +bool HttpHandler::process_release_batch(BatchType &batch) { + // Stage 3: Connection release + for (auto &conn : batch) { + if (!conn) { + return true; // Shutdown signal + } + // Return connection to server for further processing or cleanup + Server::release_back_to_server(std::move(conn)); + } + return false; // Continue processing +} +``` + +## Threading Model + +### Thread Pipeline Configuration + +```cpp +// 4-stage pipeline: sequence -> resolve -> persist -> release +// TODO: Update pipeline type from std::unique_ptr to PipelineEntry variant +StaticThreadPipeline + WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1> + commitPipeline{lg_size}; + +// Pipeline entry type (to be implemented) +using PipelineEntry = std::variant; +``` + +### Thread Creation and Management + +```cpp +HttpHandler() { + // Stage 0: Sequence assignment thread + sequenceThread = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-sequence"); + for (;;) { + auto guard = commitPipeline.acquire<0, 0>(); + if (process_sequence_batch(guard.batch)) { + return; // Shutdown signal received + } + } + }}; + + // Similar pattern for resolve, persist, and release threads... +} +``` + +### Batch Processing + +Each stage processes connections in batches using RAII guards: + +```cpp +auto guard = commitPipeline.acquire(); +// Process batch +for (auto &conn : guard.batch) { + // Stage-specific processing +} +// Guard destructor automatically publishes batch to next stage +``` + +## Flow Control + +### Pipeline Entry + +Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`: + +```cpp +void HttpHandler::on_batch_complete(std::span> batch) { + // Collect commit requests that passed basic validation for 4-stage pipeline processing + int commit_count = 0; + for (auto &conn : batch) { + if (conn && conn->user_data) { + auto *state = static_cast(conn->user_data); + if (state->route == HttpRoute::POST_commit && + state->commit_request && + state->parsing_commit) { + commit_count++; + } + } + } + + // Send commit requests to 4-stage pipeline in batch + if (commit_count > 0) { + auto guard = commitPipeline.push(commit_count, true); + // Move qualifying connections into pipeline + } +} +``` + +### Backpressure Handling + +The pipeline implements natural backpressure: +- Each stage blocks if downstream stages are full +- `WaitIfUpstreamIdle` strategy balances latency vs throughput +- Ring buffer size (`lg_size = 16`) controls maximum queued batches + +### Shutdown Coordination + +Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages: + +```cpp +~HttpHandler() { + // Send single shutdown signal that flows through all pipeline stages + { + auto guard = commitPipeline.push(1, true); + guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages + } + + // Join all pipeline threads + sequenceThread.join(); + resolveThread.join(); + persistThread.join(); + releaseThread.join(); +} +``` + +**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal. + +## Error Handling + +### Stage-Level Error Handling + +Each stage handles different entry types: + +```cpp +// Pattern matching on pipeline entry variant +std::visit([&](auto&& entry) { + using T = std::decay_t; + if constexpr (std::is_same_v) { + return true; // Signal shutdown + } else if constexpr (std::is_same_v) { + // Process commit entry + } else if constexpr (std::is_same_v) { + // Handle status entry (or skip if transferred) + } +}, pipeline_entry); +``` + +### Connection Error States + +- Failed CommitEntries are passed through the pipeline with error information +- Downstream stages skip processing for error connections but forward them +- Error responses are sent when connection reaches release stage +- Connection ownership is always transferred to ensure cleanup + +### Pipeline Integrity + +- ShutdownEntry signals shutdown to all stages +- Each stage checks for ShutdownEntry and returns true to signal shutdown +- RAII guards ensure entries are always published downstream +- No entries are lost even during error conditions + +## Performance Characteristics + +### Throughput Optimization + +- **Batching**: Multiple connections processed per stage activation +- **Lock-Free Communication**: Ring buffer between stages +- **Minimal Context Switching**: Dedicated threads per stage +- **Arena Allocation**: Efficient memory management throughout pipeline + +### Latency Optimization + +- **Single-Pass Processing**: Each connection flows through all stages once +- **Streaming Design**: Stages process concurrently +- **Minimal Copying**: Connection ownership transfer, not data copying +- **Direct Response**: Release stage triggers immediate response transmission + +### Scalability Characteristics + +- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff +- **Thread Affinity**: Dedicated threads reduce scheduling overhead +- **NUMA Awareness**: Can pin threads to specific CPU cores + +## Configuration + +### Pipeline Parameters + +```cpp +private: + static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries + + // 4-stage pipeline configuration + StaticThreadPipeline, + WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1> + commitPipeline{lg_size}; +``` + +### Tuning Considerations + +- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching +- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency +- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs + +## Pipeline Entry Types + +The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr`: + +### Pipeline Entry Variants +- **CommitEntry**: Contains `std::unique_ptr` with CommitRequest and connection state +- **StatusEntry**: Contains `std::unique_ptr` with StatusRequest (transferred to status threadpool after sequence) +- **ShutdownEntry**: Signals pipeline shutdown to all stages +- **Future types**: Pipeline design supports additional entry types + +### Stage Processing by Type + +| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization | +|-------|-------------|-------------|---------------|---------------| +| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** | +| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** | +| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** | +| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required | + +## API Endpoint Integration + +### `/v1/commit` - Transaction Submission + +**Pipeline Interaction**: Full pipeline traversal as CommitEntry + +#### Request Processing Flow + +1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`): + ```cpp + void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) { + // Parse and validate anything that doesn't need serialization: + // - JSON parsing and CommitRequest construction + // - Basic validation: leader_id check, operation format validation + // - Check that we have at least one operation + + // If validation fails, send error response immediately and return + // If validation succeeds, connection will enter pipeline in on_batch_complete() + } + ``` + +2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest) + +3. **Pipeline Processing**: + - **Sequence**: Check banned list → assign version (or reject) + - **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details) + - **Persist**: Apply operations → mark committed, update high water mark + - **Release**: Return connection with response data + +4. **Response Generation**: Based on pipeline results + - **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}` + - **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}` + +### `/v1/status` - Commit Status Lookup + +**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool + +#### Request Processing Flow + +1. **HTTP I/O Thread Processing**: + ```cpp + void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) { + // TODO: Extract request_id from URL and min_version from query params + // Current: Returns placeholder static response + } + ``` + +2. **Two-Phase Processing**: + - **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound + - **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic + +3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id + +### `/v1/subscribe` - Real-time Transaction Stream + +**Pipeline Integration**: Consumes events from resolve and persist stages + +#### Event Sources +- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming +- **Persist Stage**: Durability events when committed version high water mark advances + +#### Current Implementation +```cpp +void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) { + // TODO: Parse query parameters (after, durable) + // TODO: Establish Server-Sent Events stream + // TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events +} +``` + +### `/v1/version` - Version Information + +**Pipeline Integration**: Direct atomic read, no pipeline interaction + +```cpp +// TODO: Implement direct atomic read of committed version high water mark +// No pipeline interaction needed - seq_cst atomic read +// Leader ID is process-lifetime constant +``` + +**Response**: `{"version": , "leader_id": ""}` + +## Integration Points + +### HTTP Handler Integration + +The pipeline integrates with the HTTP handler at two points: + +1. **Entry**: `on_batch_complete()` feeds connections into sequence stage +2. **Exit**: Release stage calls `Server::release_back_to_server()` + +### Persistence Layer Integration + +The persist stage interfaces with: +- **S3 Backend**: Batch writes for durability (see `persistence.md`) +- **Subscriber System**: Real-time change stream notifications +- **Metrics System**: Transaction throughput and latency tracking + +### Database State Integration + +- **Sequence Stage**: Updates version number generator +- **Resolve Stage**: Queries current database state for precondition validation +- **Persist Stage**: Applies mutations to authoritative database state + +## Future Optimizations + +### Potential Enhancements + +1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable +2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores +3. **Batch Size Tuning**: Dynamic batch size based on load +4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions +5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes + +### Monitoring and Observability + +1. **Stage Metrics**: Throughput, latency, and queue depth per stage +2. **Error Tracking**: Error rates and types by stage +3. **Resource Utilization**: CPU and memory usage per pipeline thread +4. **Flow Control Events**: Backpressure and stall detection + +## Implementation Status + +### Current State + +- ✅ Pipeline structure implemented with 4 stages +- ✅ Thread creation and management +- ✅ RAII batch processing +- ✅ Error handling framework +- ✅ Shutdown coordination + +### TODO Items + +- ⏳ Sequence assignment logic implementation +- ⏳ Precondition resolution implementation +- ⏳ S3 persistence batching implementation +- ⏳ Subscriber notification system +- ⏳ Performance monitoring and metrics +- ⏳ Configuration tuning and optimization