500 lines
18 KiB
Markdown
500 lines
18 KiB
Markdown
# Commit Processing Pipeline
|
|
|
|
## Overview
|
|
|
|
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
|
|
|
|
## Architecture
|
|
|
|
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
|
|
|
|
```
|
|
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
|
|
```
|
|
|
|
### Pipeline Flow
|
|
|
|
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
|
|
2. **Sequence Stage**: Assign sequential version numbers to commits
|
|
3. **Resolve Stage**: Validate preconditions and check for conflicts
|
|
4. **Persist Stage**: Write commits to durable storage and notify subscribers
|
|
5. **Release Stage**: Return connections to HTTP I/O threads for response handling
|
|
|
|
## Stage Details
|
|
|
|
### Stage 0: Sequence Assignment
|
|
|
|
**Thread**: `txn-sequence`
|
|
**Purpose**: Version assignment and request ID management
|
|
**Serialization**: **Required** - Must be single-threaded
|
|
|
|
**Responsibilities**:
|
|
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
|
|
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
|
|
- Record version assignments for transaction tracking
|
|
|
|
**Why Serialization is Required**:
|
|
- Version numbers must be strictly sequential without gaps
|
|
- Banned list updates must be atomic with version assignment
|
|
- Status requests must get accurate upper bound on potential commit versions
|
|
|
|
**Request ID Banned List**:
|
|
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
|
|
- Lifecycle: Grows indefinitely until process restart (leader change)
|
|
- Removal: Only on process restart/leader change, which invalidates all old request IDs
|
|
|
|
**Current Implementation**:
|
|
```cpp
|
|
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
|
for (auto &entry : batch) {
|
|
if (std::holds_alternative<ShutdownEntry>(entry)) {
|
|
return true; // Shutdown signal
|
|
}
|
|
// TODO: Pattern match on CommitEntry vs StatusEntry
|
|
// TODO: Implement sequence assignment logic for each type
|
|
}
|
|
return false; // Continue processing
|
|
}
|
|
```
|
|
|
|
### Stage 1: Precondition Resolution
|
|
|
|
**Thread**: `txn-resolve`
|
|
**Purpose**: Validate preconditions and detect conflicts
|
|
**Serialization**: **Required** - Must be single-threaded
|
|
|
|
**Responsibilities**:
|
|
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
|
|
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
|
- Mark failed commits with failure information (including which preconditions failed)
|
|
|
|
**Why Serialization is Required**:
|
|
- Must maintain consistent view of in-memory recent writes set
|
|
- Conflict detection requires atomic evaluation of all preconditions against recent writes
|
|
- Recent writes set updates must be synchronized
|
|
|
|
**Transaction State Transitions**:
|
|
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
|
|
- Failed transactions continue through pipeline with failure information for client response
|
|
|
|
**Current Implementation**:
|
|
```cpp
|
|
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
|
// TODO: Implement precondition resolution logic:
|
|
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
|
|
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
|
|
// 3. If failed: Mark with failure info (which preconditions failed)
|
|
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
|
}
|
|
```
|
|
|
|
### Stage 2: Transaction Persistence
|
|
|
|
**Thread**: `txn-persist`
|
|
**Purpose**: Write semi-committed transactions to durable storage
|
|
**Serialization**: **Required** - Must mark batches durable in order
|
|
|
|
**Responsibilities**:
|
|
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
|
|
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
|
- Generate durability events for `/v1/subscribe` when committed version advances
|
|
- Batch multiple commits for efficient persistence operations
|
|
|
|
**Why Serialization is Required**:
|
|
- Batches must be marked durable in sequential version order
|
|
- High water mark updates must reflect strict ordering of committed versions
|
|
- Ensures consistency guarantees across all endpoints
|
|
|
|
**Committed Version High Water Mark**:
|
|
- Global atomic value tracking highest durably committed version
|
|
- Updated after each batch commits: set to highest version in the batch
|
|
- Read by `/v1/version` endpoint using atomic seq_cst reads
|
|
- Enables `/v1/subscribe` durability events when high water mark advances
|
|
|
|
**Batching Strategy**:
|
|
- Multiple semi-committed transactions can be persisted in a single batch
|
|
- High water mark updated once per batch to highest version in that batch
|
|
- See `persistence.md` for detailed persistence design
|
|
|
|
**Current Implementation**:
|
|
```cpp
|
|
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
|
// TODO: Implement actual persistence logic:
|
|
// 1. For CommitEntry: Apply operations to persistent storage
|
|
// 2. Update committed version high water mark to highest version in batch
|
|
// 3. Generate durability events for /v1/subscribe
|
|
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
|
}
|
|
```
|
|
|
|
### Stage 3: Connection Release
|
|
|
|
**Thread**: `txn-release`
|
|
**Purpose**: Return connections to HTTP server for client response
|
|
**Serialization**: Not required - Independent connection handling
|
|
|
|
**Responsibilities**:
|
|
- Return processed connections to HTTP server for all request types
|
|
- Connection carries response data (success/failure) and status information
|
|
- Trigger response transmission to clients
|
|
|
|
**Response Handling**:
|
|
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
|
|
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
|
|
- Failed transactions carry failure information through entire pipeline for proper client response
|
|
|
|
**Implementation**:
|
|
```cpp
|
|
bool HttpHandler::process_release_batch(BatchType &batch) {
|
|
// Stage 3: Connection release
|
|
for (auto &conn : batch) {
|
|
if (!conn) {
|
|
return true; // Shutdown signal
|
|
}
|
|
// Return connection to server for further processing or cleanup
|
|
Server::release_back_to_server(std::move(conn));
|
|
}
|
|
return false; // Continue processing
|
|
}
|
|
```
|
|
|
|
## Threading Model
|
|
|
|
### Thread Pipeline Configuration
|
|
|
|
```cpp
|
|
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
|
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
|
|
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
|
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
|
commitPipeline{lg_size};
|
|
|
|
// Pipeline entry type (to be implemented)
|
|
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
|
```
|
|
|
|
### Thread Creation and Management
|
|
|
|
```cpp
|
|
HttpHandler() {
|
|
// Stage 0: Sequence assignment thread
|
|
sequenceThread = std::thread{[this]() {
|
|
pthread_setname_np(pthread_self(), "txn-sequence");
|
|
for (;;) {
|
|
auto guard = commitPipeline.acquire<0, 0>();
|
|
if (process_sequence_batch(guard.batch)) {
|
|
return; // Shutdown signal received
|
|
}
|
|
}
|
|
}};
|
|
|
|
// Similar pattern for resolve, persist, and release threads...
|
|
}
|
|
```
|
|
|
|
### Batch Processing
|
|
|
|
Each stage processes connections in batches using RAII guards:
|
|
|
|
```cpp
|
|
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
|
|
// Process batch
|
|
for (auto &conn : guard.batch) {
|
|
// Stage-specific processing
|
|
}
|
|
// Guard destructor automatically publishes batch to next stage
|
|
```
|
|
|
|
## Flow Control
|
|
|
|
### Pipeline Entry
|
|
|
|
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
|
|
|
|
```cpp
|
|
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
|
|
// Collect commit requests that passed basic validation for 4-stage pipeline processing
|
|
int commit_count = 0;
|
|
for (auto &conn : batch) {
|
|
if (conn && conn->user_data) {
|
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
|
if (state->route == HttpRoute::POST_commit &&
|
|
state->commit_request &&
|
|
state->parsing_commit) {
|
|
commit_count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send commit requests to 4-stage pipeline in batch
|
|
if (commit_count > 0) {
|
|
auto guard = commitPipeline.push(commit_count, true);
|
|
// Move qualifying connections into pipeline
|
|
}
|
|
}
|
|
```
|
|
|
|
### Backpressure Handling
|
|
|
|
The pipeline implements natural backpressure:
|
|
- Each stage blocks if downstream stages are full
|
|
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
|
|
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
|
|
|
|
### Shutdown Coordination
|
|
|
|
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
|
|
|
|
```cpp
|
|
~HttpHandler() {
|
|
// Send single shutdown signal that flows through all pipeline stages
|
|
{
|
|
auto guard = commitPipeline.push(1, true);
|
|
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
|
}
|
|
|
|
// Join all pipeline threads
|
|
sequenceThread.join();
|
|
resolveThread.join();
|
|
persistThread.join();
|
|
releaseThread.join();
|
|
}
|
|
```
|
|
|
|
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
|
|
|
|
## Error Handling
|
|
|
|
### Stage-Level Error Handling
|
|
|
|
Each stage handles different entry types:
|
|
|
|
```cpp
|
|
// Pattern matching on pipeline entry variant
|
|
std::visit([&](auto&& entry) {
|
|
using T = std::decay_t<decltype(entry)>;
|
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
|
return true; // Signal shutdown
|
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
|
// Process commit entry
|
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
|
// Handle status entry (or skip if transferred)
|
|
}
|
|
}, pipeline_entry);
|
|
```
|
|
|
|
### Connection Error States
|
|
|
|
- Failed CommitEntries are passed through the pipeline with error information
|
|
- Downstream stages skip processing for error connections but forward them
|
|
- Error responses are sent when connection reaches release stage
|
|
- Connection ownership is always transferred to ensure cleanup
|
|
|
|
### Pipeline Integrity
|
|
|
|
- ShutdownEntry signals shutdown to all stages
|
|
- Each stage checks for ShutdownEntry and returns true to signal shutdown
|
|
- RAII guards ensure entries are always published downstream
|
|
- No entries are lost even during error conditions
|
|
|
|
## Performance Characteristics
|
|
|
|
### Throughput Optimization
|
|
|
|
- **Batching**: Multiple connections processed per stage activation
|
|
- **Lock-Free Communication**: Ring buffer between stages
|
|
- **Minimal Context Switching**: Dedicated threads per stage
|
|
- **Arena Allocation**: Efficient memory management throughout pipeline
|
|
|
|
### Latency Optimization
|
|
|
|
- **Single-Pass Processing**: Each connection flows through all stages once
|
|
- **Streaming Design**: Stages process concurrently
|
|
- **Minimal Copying**: Connection ownership transfer, not data copying
|
|
- **Direct Response**: Release stage triggers immediate response transmission
|
|
|
|
### Scalability Characteristics
|
|
|
|
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
|
|
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
|
|
- **NUMA Awareness**: Can pin threads to specific CPU cores
|
|
|
|
## Configuration
|
|
|
|
### Pipeline Parameters
|
|
|
|
```cpp
|
|
private:
|
|
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
|
|
|
|
// 4-stage pipeline configuration
|
|
StaticThreadPipeline<std::unique_ptr<Connection>,
|
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
|
commitPipeline{lg_size};
|
|
```
|
|
|
|
### Tuning Considerations
|
|
|
|
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
|
|
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
|
|
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
|
|
|
|
## Pipeline Entry Types
|
|
|
|
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
|
|
|
|
### Pipeline Entry Variants
|
|
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
|
|
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
|
|
- **ShutdownEntry**: Signals pipeline shutdown to all stages
|
|
- **Future types**: Pipeline design supports additional entry types
|
|
|
|
### Stage Processing by Type
|
|
|
|
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|
|
|-------|-------------|-------------|---------------|---------------|
|
|
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
|
|
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
|
|
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
|
|
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
|
|
|
|
## API Endpoint Integration
|
|
|
|
### `/v1/commit` - Transaction Submission
|
|
|
|
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
|
|
|
|
#### Request Processing Flow
|
|
|
|
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
|
|
```cpp
|
|
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
|
|
// Parse and validate anything that doesn't need serialization:
|
|
// - JSON parsing and CommitRequest construction
|
|
// - Basic validation: leader_id check, operation format validation
|
|
// - Check that we have at least one operation
|
|
|
|
// If validation fails, send error response immediately and return
|
|
// If validation succeeds, connection will enter pipeline in on_batch_complete()
|
|
}
|
|
```
|
|
|
|
2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
|
|
|
|
3. **Pipeline Processing**:
|
|
- **Sequence**: Check banned list → assign version (or reject)
|
|
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
|
|
- **Persist**: Apply operations → mark committed, update high water mark
|
|
- **Release**: Return connection with response data
|
|
|
|
4. **Response Generation**: Based on pipeline results
|
|
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
|
|
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
|
|
|
|
### `/v1/status` - Commit Status Lookup
|
|
|
|
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
|
|
|
|
#### Request Processing Flow
|
|
|
|
1. **HTTP I/O Thread Processing**:
|
|
```cpp
|
|
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
|
|
// TODO: Extract request_id from URL and min_version from query params
|
|
// Current: Returns placeholder static response
|
|
}
|
|
```
|
|
|
|
2. **Two-Phase Processing**:
|
|
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
|
|
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
|
|
|
|
3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
|
|
|
|
### `/v1/subscribe` - Real-time Transaction Stream
|
|
|
|
**Pipeline Integration**: Consumes events from resolve and persist stages
|
|
|
|
#### Event Sources
|
|
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
|
|
- **Persist Stage**: Durability events when committed version high water mark advances
|
|
|
|
#### Current Implementation
|
|
```cpp
|
|
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
|
|
// TODO: Parse query parameters (after, durable)
|
|
// TODO: Establish Server-Sent Events stream
|
|
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
|
|
}
|
|
```
|
|
|
|
### `/v1/version` - Version Information
|
|
|
|
**Pipeline Integration**: Direct atomic read, no pipeline interaction
|
|
|
|
```cpp
|
|
// TODO: Implement direct atomic read of committed version high water mark
|
|
// No pipeline interaction needed - seq_cst atomic read
|
|
// Leader ID is process-lifetime constant
|
|
```
|
|
|
|
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
|
|
|
|
## Integration Points
|
|
|
|
### HTTP Handler Integration
|
|
|
|
The pipeline integrates with the HTTP handler at two points:
|
|
|
|
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
|
|
2. **Exit**: Release stage calls `Server::release_back_to_server()`
|
|
|
|
### Persistence Layer Integration
|
|
|
|
The persist stage interfaces with:
|
|
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
|
|
- **Subscriber System**: Real-time change stream notifications
|
|
- **Metrics System**: Transaction throughput and latency tracking
|
|
|
|
### Database State Integration
|
|
|
|
- **Sequence Stage**: Updates version number generator
|
|
- **Resolve Stage**: Queries current database state for precondition validation
|
|
- **Persist Stage**: Applies mutations to authoritative database state
|
|
|
|
## Future Optimizations
|
|
|
|
### Potential Enhancements
|
|
|
|
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
|
|
2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
|
|
3. **Batch Size Tuning**: Dynamic batch size based on load
|
|
4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
|
|
5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
|
|
|
|
### Monitoring and Observability
|
|
|
|
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
|
|
2. **Error Tracking**: Error rates and types by stage
|
|
3. **Resource Utilization**: CPU and memory usage per pipeline thread
|
|
4. **Flow Control Events**: Backpressure and stall detection
|
|
|
|
## Implementation Status
|
|
|
|
### Current State
|
|
|
|
- ✅ Pipeline structure implemented with 4 stages
|
|
- ✅ Thread creation and management
|
|
- ✅ RAII batch processing
|
|
- ✅ Error handling framework
|
|
- ✅ Shutdown coordination
|
|
|
|
### TODO Items
|
|
|
|
- ⏳ Sequence assignment logic implementation
|
|
- ⏳ Precondition resolution implementation
|
|
- ⏳ S3 persistence batching implementation
|
|
- ⏳ Subscriber notification system
|
|
- ⏳ Performance monitoring and metrics
|
|
- ⏳ Configuration tuning and optimization
|