Add a bit more precision to docs, plus philosophy
This commit is contained in:
@@ -31,7 +31,7 @@ HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HT
|
||||
**Responsibilities**:
|
||||
|
||||
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
|
||||
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
|
||||
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound for version range scanning
|
||||
- Record version assignments for transaction tracking
|
||||
|
||||
**Why Serialization is Required**:
|
||||
@@ -104,8 +104,8 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||
|
||||
**Responsibilities**:
|
||||
|
||||
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
|
||||
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
||||
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark, generate success response JSON
|
||||
- **For StatusEntry**: N/A (empty husk, connection transferred to status threadpool after sequence stage)
|
||||
- Generate durability events for `/v1/subscribe` when committed version advances
|
||||
- Batch multiple commits for efficient persistence operations
|
||||
|
||||
@@ -132,17 +132,16 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||
|
||||
```cpp
|
||||
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||
// TODO: Implement actual persistence logic:
|
||||
// 1. For CommitEntry: Apply operations to persistent storage
|
||||
// 2. Update committed version high water mark to highest version in batch
|
||||
// 3. Generate durability events for /v1/subscribe
|
||||
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
||||
// For CommitEntry: Apply operations to persistent storage, update high water mark, generate response JSON
|
||||
// For StatusEntry: N/A (empty husk, connection transferred to status threadpool)
|
||||
// Generate durability events for /v1/subscribe when committed version advances
|
||||
// Semi-committed transactions are retried until durable or leader fails
|
||||
}
|
||||
```
|
||||
|
||||
### Stage 3: Connection Release
|
||||
|
||||
**Thread**: `txn-release`
|
||||
**Threads**: Multiple `txn-release` threads (configurable)
|
||||
**Purpose**: Return connections to HTTP server for client response
|
||||
**Serialization**: Not required - Independent connection handling
|
||||
|
||||
@@ -154,8 +153,8 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||
|
||||
**Response Handling**:
|
||||
|
||||
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
|
||||
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
|
||||
- **CommitRequests**: Response JSON generated by persist stage (success with version, or failure with conflicting preconditions from resolve stage)
|
||||
- **StatusRequests**: Response generated by separate status threadpool (connection transferred after sequence stage)
|
||||
- Failed transactions carry failure information through entire pipeline for proper client response
|
||||
|
||||
**Implementation**:
|
||||
@@ -255,7 +254,9 @@ void HttpHandler::on_batch_complete(std::span<Connection*> batch) {
|
||||
|
||||
The pipeline implements natural backpressure:
|
||||
|
||||
- Each stage blocks if downstream stages are full
|
||||
- Fixed-size pipeline buffer causes I/O threads to block when pipeline is full
|
||||
- This prevents unbounded memory growth under high load
|
||||
- I/O threads blocking may impact accept() rate, but provides system-wide flow control
|
||||
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
|
||||
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
|
||||
|
||||
@@ -373,9 +374,9 @@ The pipeline processes different types of entries using a variant/union type sys
|
||||
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|
||||
|-------|-------------|-------------|---------------|---------------|
|
||||
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
|
||||
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
|
||||
| **Resolve** | Check preconditions, update recent writes | N/A (empty husk) | Return true (shutdown) | **Required** |
|
||||
| **Persist** | Apply operations, update high water mark | N/A (empty husk) | Return true (shutdown) | **Required** |
|
||||
| **Release** | Return connection to HTTP threads | N/A (empty husk) | Return true (shutdown) | Not required (multiple threads) |
|
||||
|
||||
## API Endpoint Integration
|
||||
|
||||
@@ -415,7 +416,7 @@ The pipeline processes different types of entries using a variant/union type sys
|
||||
|
||||
### `/v1/status` - Commit Status Lookup
|
||||
|
||||
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
|
||||
**Pipeline Interaction**: StatusEntry through sequence stage only
|
||||
|
||||
#### Request Processing Flow
|
||||
|
||||
@@ -423,17 +424,21 @@ The pipeline processes different types of entries using a variant/union type sys
|
||||
|
||||
```cpp
|
||||
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
|
||||
// TODO: Extract request_id from URL and min_version from query params
|
||||
// Current: Returns placeholder static response
|
||||
// Extract request_id from URL and min_version from query params
|
||||
// Create StatusEntry for pipeline processing
|
||||
}
|
||||
```
|
||||
|
||||
1. **Two-Phase Processing**:
|
||||
1. **Pipeline Processing**:
|
||||
|
||||
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
|
||||
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
|
||||
- **Sequence Stage**: StatusEntry adds request_id to banned list, establishes version scanning range, transfers connection to status threadpool
|
||||
- **Subsequent Stages**: Empty StatusEntry husk flows through resolve/persist/release as no-op
|
||||
|
||||
1. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
|
||||
1. **Status Lookup Logic**:
|
||||
|
||||
- Version range determined in sequence stage (min_version parameter to version upper bound)
|
||||
- Actual S3 scanning performed by separate status threadpool outside the pipeline
|
||||
- Return "committed" with version if found, "not_found" if not found in scanned range
|
||||
|
||||
### `/v1/subscribe` - Real-time Transaction Stream
|
||||
|
||||
|
||||
Reference in New Issue
Block a user