Files
weaseldb/commit_pipeline.md

527 lines
18 KiB
Markdown

# Commit Processing Pipeline
## Overview
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
## Architecture
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
```
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
```
### Pipeline Flow
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
1. **Sequence Stage**: Assign sequential version numbers to commits
1. **Resolve Stage**: Validate preconditions and check for conflicts
1. **Persist Stage**: Write commits to durable storage and notify subscribers
1. **Release Stage**: Return connections to HTTP I/O threads for response handling
## Stage Details
### Stage 0: Sequence Assignment
**Thread**: `txn-sequence`
**Purpose**: Version assignment and request ID management
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
- Record version assignments for transaction tracking
**Why Serialization is Required**:
- Version numbers must be strictly sequential without gaps
- Banned list updates must be atomic with version assignment
- Status requests must get accurate upper bound on potential commit versions
**Request ID Banned List**:
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
- Lifecycle: Grows indefinitely until process restart (leader change)
- Removal: Only on process restart/leader change, which invalidates all old request IDs
**Current Implementation**:
```cpp
bool HttpHandler::process_sequence_batch(BatchType &batch) {
for (auto &entry : batch) {
if (std::holds_alternative<ShutdownEntry>(entry)) {
return true; // Shutdown signal
}
// TODO: Pattern match on CommitEntry vs StatusEntry
// TODO: Implement sequence assignment logic for each type
}
return false; // Continue processing
}
```
### Stage 1: Precondition Resolution
**Thread**: `txn-resolve`
**Purpose**: Validate preconditions and detect conflicts
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Mark failed commits with failure information (including which preconditions failed)
**Why Serialization is Required**:
- Must maintain consistent view of in-memory recent writes set
- Conflict detection requires atomic evaluation of all preconditions against recent writes
- Recent writes set updates must be synchronized
**Transaction State Transitions**:
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
- Failed transactions continue through pipeline with failure information for client response
**Current Implementation**:
```cpp
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// TODO: Implement precondition resolution logic:
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
// 3. If failed: Mark with failure info (which preconditions failed)
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 2: Transaction Persistence
**Thread**: `txn-persist`
**Purpose**: Write semi-committed transactions to durable storage
**Serialization**: **Required** - Must mark batches durable in order
**Responsibilities**:
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Generate durability events for `/v1/subscribe` when committed version advances
- Batch multiple commits for efficient persistence operations
**Why Serialization is Required**:
- Batches must be marked durable in sequential version order
- High water mark updates must reflect strict ordering of committed versions
- Ensures consistency guarantees across all endpoints
**Committed Version High Water Mark**:
- Global atomic value tracking highest durably committed version
- Updated after each batch commits: set to highest version in the batch
- Read by `/v1/version` endpoint using atomic seq_cst reads
- Enables `/v1/subscribe` durability events when high water mark advances
**Batching Strategy**:
- Multiple semi-committed transactions can be persisted in a single batch
- High water mark updated once per batch to highest version in that batch
- See `persistence.md` for detailed persistence design
**Current Implementation**:
```cpp
bool HttpHandler::process_persist_batch(BatchType &batch) {
// TODO: Implement actual persistence logic:
// 1. For CommitEntry: Apply operations to persistent storage
// 2. Update committed version high water mark to highest version in batch
// 3. Generate durability events for /v1/subscribe
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 3: Connection Release
**Thread**: `txn-release`
**Purpose**: Return connections to HTTP server for client response
**Serialization**: Not required - Independent connection handling
**Responsibilities**:
- Return processed connections to HTTP server for all request types
- Connection carries response data (success/failure) and status information
- Trigger response transmission to clients
**Response Handling**:
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
- Failed transactions carry failure information through entire pipeline for proper client response
**Implementation**:
```cpp
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
for (auto &conn : batch) {
if (!conn) {
return true; // Shutdown signal
}
// Connection is server-owned - respond to client and connection
// remains managed by server's connection registry
// TODO: Implement response sending with new server-owned connection model
}
return false; // Continue processing
}
```
## Threading Model
### Thread Pipeline Configuration
```cpp
// 4-stage pipeline: sequence -> resolve -> persist -> release
// Pipeline with PipelineEntry variant instead of connection ownership transfer
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
// Pipeline entry type for server-owned connection model
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
```
### Thread Creation and Management
```cpp
HttpHandler() {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
auto guard = commitPipeline.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Similar pattern for resolve, persist, and release threads...
}
```
### Batch Processing
Each stage processes connections in batches using RAII guards:
```cpp
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
// Process batch
for (auto &conn : guard.batch) {
// Stage-specific processing
}
// Guard destructor automatically publishes batch to next stage
```
## Flow Control
### Pipeline Entry
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
```cpp
void HttpHandler::on_batch_complete(std::span<Connection*> batch) {
// Collect commit requests that passed basic validation for 4-stage pipeline processing
int commit_count = 0;
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
if (state->route == HttpRoute::POST_commit &&
state->commit_request &&
state->parsing_commit) {
commit_count++;
}
}
}
// Send commit requests to 4-stage pipeline in batch
if (commit_count > 0) {
auto guard = commitPipeline.push(commit_count, true);
// Move qualifying connections into pipeline
}
}
```
### Backpressure Handling
The pipeline implements natural backpressure:
- Each stage blocks if downstream stages are full
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
### Shutdown Coordination
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
```cpp
~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{
auto guard = commitPipeline.push(1, true);
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
}
```
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
## Error Handling
### Stage-Level Error Handling
Each stage handles different entry types:
```cpp
// Pattern matching on pipeline entry variant
std::visit([&](auto&& entry) {
using T = std::decay_t<decltype(entry)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Handle status entry (or skip if transferred)
}
}, pipeline_entry);
```
### Connection Error States
- Failed CommitEntries are passed through the pipeline with error information
- Downstream stages skip processing for error connections but forward them
- Error responses are sent when connection reaches release stage
- Server-owned connections ensure proper cleanup and response handling
### Pipeline Integrity
- ShutdownEntry signals shutdown to all stages
- Each stage checks for ShutdownEntry and returns true to signal shutdown
- RAII guards ensure entries are always published downstream
- No entries are lost even during error conditions
## Performance Characteristics
### Throughput Optimization
- **Batching**: Multiple connections processed per stage activation
- **Lock-Free Communication**: Ring buffer between stages
- **Minimal Context Switching**: Dedicated threads per stage
- **Arena Allocation**: Efficient memory management throughout pipeline
### Latency Optimization
- **Single-Pass Processing**: Each connection flows through all stages once
- **Streaming Design**: Stages process concurrently
- **Minimal Copying**: Request processing with server-owned connections
- **Direct Response**: Release stage triggers immediate response transmission
### Scalability Characteristics
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
- **NUMA Awareness**: Can pin threads to specific CPU cores
## Configuration
### Pipeline Parameters
```cpp
private:
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
// 4-stage pipeline configuration
StaticThreadPipeline<PipelineEntry,
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
```
### Tuning Considerations
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
## Pipeline Entry Types
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
### Pipeline Entry Variants
- **CommitEntry**: Contains connection reference/ID with CommitRequest and connection state
- **StatusEntry**: Contains connection reference/ID with StatusRequest (transferred to status threadpool after sequence)
- **ShutdownEntry**: Signals pipeline shutdown to all stages
- **Future types**: Pipeline design supports additional entry types
### Stage Processing by Type
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|-------|-------------|-------------|---------------|---------------|
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
## API Endpoint Integration
### `/v1/commit` - Transaction Submission
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
#### Request Processing Flow
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
```cpp
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
// Parse and validate anything that doesn't need serialization:
// - JSON parsing and CommitRequest construction
// - Basic validation: leader_id check, operation format validation
// - Check that we have at least one operation
// If validation fails, send error response immediately and return
// If validation succeeds, connection will enter pipeline in on_batch_complete()
}
```
1. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
1. **Pipeline Processing**:
- **Sequence**: Check banned list → assign version (or reject)
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
- **Persist**: Apply operations → mark committed, update high water mark
- **Release**: Return connection with response data
1. **Response Generation**: Based on pipeline results
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
### `/v1/status` - Commit Status Lookup
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
#### Request Processing Flow
1. **HTTP I/O Thread Processing**:
```cpp
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
// TODO: Extract request_id from URL and min_version from query params
// Current: Returns placeholder static response
}
```
1. **Two-Phase Processing**:
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
1. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
### `/v1/subscribe` - Real-time Transaction Stream
**Pipeline Integration**: Consumes events from resolve and persist stages
#### Event Sources
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
- **Persist Stage**: Durability events when committed version high water mark advances
#### Current Implementation
```cpp
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
// TODO: Parse query parameters (after, durable)
// TODO: Establish Server-Sent Events stream
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
}
```
### `/v1/version` - Version Information
**Pipeline Integration**: Direct atomic read, no pipeline interaction
```cpp
// TODO: Implement direct atomic read of committed version high water mark
// No pipeline interaction needed - seq_cst atomic read
// Leader ID is process-lifetime constant
```
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
## Integration Points
### HTTP Handler Integration
The pipeline integrates with the HTTP handler at two points:
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
1. **Exit**: Release stage responds to clients with server-owned connections
### Persistence Layer Integration
The persist stage interfaces with:
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
- **Subscriber System**: Real-time change stream notifications
- **Metrics System**: Transaction throughput and latency tracking
### Database State Integration
- **Sequence Stage**: Updates version number generator
- **Resolve Stage**: Queries current database state for precondition validation
- **Persist Stage**: Applies mutations to authoritative database state
## Future Optimizations
### Potential Enhancements
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
1. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
1. **Batch Size Tuning**: Dynamic batch size based on load
1. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
1. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
### Monitoring and Observability
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
1. **Error Tracking**: Error rates and types by stage
1. **Resource Utilization**: CPU and memory usage per pipeline thread
1. **Flow Control Events**: Backpressure and stall detection
## Implementation Status
### Current State
- ✅ Pipeline structure implemented with 4 stages
- ✅ Thread creation and management
- ✅ RAII batch processing
- ✅ Error handling framework
- ✅ Shutdown coordination
### TODO Items
- ⏳ Sequence assignment logic implementation
- ⏳ Precondition resolution implementation
- ⏳ S3 persistence batching implementation
- ⏳ Subscriber notification system
- ⏳ Performance monitoring and metrics
- ⏳ Configuration tuning and optimization