From cb66d6547951070bb0f29534b30d31e0c2e6c711 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 16 Sep 2025 00:44:00 -0400 Subject: [PATCH] Add a bit more precision to docs, plus philosophy --- api.md | 8 ++++-- commit_pipeline.md | 49 ++++++++++++++++++--------------- design.md | 11 ++++++++ threading_performance_report.md | 4 +-- 4 files changed, 46 insertions(+), 26 deletions(-) diff --git a/api.md b/api.md index ed9b40c..b5e3d46 100644 --- a/api.md +++ b/api.md @@ -91,7 +91,9 @@ Submits a transaction to be committed. The transaction consists of read precondi // If not committed, a more recent version that the client can use to retry. "version": 123456, // The unique ID of the leader at this version. - "leader_id": "abcdefg" + "leader_id": "abcdefg", + // Echo back the request_id if it was provided in the original request + "request_id": "abcdefg" } ``` @@ -169,7 +171,7 @@ The response is a stream of events compliant with the SSE protocol. ``` event: transaction -data: {"request_id":"abcdefg","version":123456,"timestamp":"2025-08-07T20:27:42.555Z","leader_id":"abcdefg","operations":[...]} +data: {"request_id":"abcdefg","version":123456,"prev_version":123455,"timestamp":"2025-08-07T20:27:42.555Z","leader_id":"abcdefg","operations":[...]} ``` @@ -194,6 +196,8 @@ data: {"committed_version":123456,"leader_id":"abcdefg"} 1. **Leader Changes & Reconnection**: When `durable=false`, if the leader changes, clients **must** discard all of that leader's `transaction` events received after their last-seen `checkpoint` event. They must then manually reconnect (as the server connection will likely be terminated) and restart the subscription by setting the `after` query parameter to the version specified in that last-known checkpoint. Clients should implement a randomized exponential backoff strategy (backoff with jitter) when reconnecting. +1. **Gap Detection**: Each `transaction` event includes a `prev_version` field linking to the previous transaction version, forming a linked list. Clients can detect gaps in the transaction stream by checking that each transaction's `prev_version` matches the previous transaction's `version`. This ensures gapless transitions between historical data from S3 and live events from the server. + 1. **Connection Handling & Errors**: The server may periodically send `keepalive` comments to prevent idle timeouts on network proxies. The server will buffer unconsumed data up to a configurable limit; if the client falls too far behind, the connection will be closed. If the `after` version has been truncated from the log, this endpoint will return a standard `410 Gone` HTTP error instead of an event stream. ## `PUT /v1/retention/` diff --git a/commit_pipeline.md b/commit_pipeline.md index dafdfd3..3c30c96 100644 --- a/commit_pipeline.md +++ b/commit_pipeline.md @@ -31,7 +31,7 @@ HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HT **Responsibilities**: - **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage -- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool +- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound for version range scanning - Record version assignments for transaction tracking **Why Serialization is Required**: @@ -104,8 +104,8 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { **Responsibilities**: -- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark -- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage) +- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark, generate success response JSON +- **For StatusEntry**: N/A (empty husk, connection transferred to status threadpool after sequence stage) - Generate durability events for `/v1/subscribe` when committed version advances - Batch multiple commits for efficient persistence operations @@ -132,17 +132,16 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { ```cpp bool HttpHandler::process_persist_batch(BatchType &batch) { - // TODO: Implement actual persistence logic: - // 1. For CommitEntry: Apply operations to persistent storage - // 2. Update committed version high water mark to highest version in batch - // 3. Generate durability events for /v1/subscribe - // 4. For StatusEntry: N/A (already transferred to status threadpool) + // For CommitEntry: Apply operations to persistent storage, update high water mark, generate response JSON + // For StatusEntry: N/A (empty husk, connection transferred to status threadpool) + // Generate durability events for /v1/subscribe when committed version advances + // Semi-committed transactions are retried until durable or leader fails } ``` ### Stage 3: Connection Release -**Thread**: `txn-release` +**Threads**: Multiple `txn-release` threads (configurable) **Purpose**: Return connections to HTTP server for client response **Serialization**: Not required - Independent connection handling @@ -154,8 +153,8 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { **Response Handling**: -- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions) -- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline) +- **CommitRequests**: Response JSON generated by persist stage (success with version, or failure with conflicting preconditions from resolve stage) +- **StatusRequests**: Response generated by separate status threadpool (connection transferred after sequence stage) - Failed transactions carry failure information through entire pipeline for proper client response **Implementation**: @@ -255,7 +254,9 @@ void HttpHandler::on_batch_complete(std::span batch) { The pipeline implements natural backpressure: -- Each stage blocks if downstream stages are full +- Fixed-size pipeline buffer causes I/O threads to block when pipeline is full +- This prevents unbounded memory growth under high load +- I/O threads blocking may impact accept() rate, but provides system-wide flow control - `WaitIfUpstreamIdle` strategy balances latency vs throughput - Ring buffer size (`lg_size = 16`) controls maximum queued batches @@ -373,9 +374,9 @@ The pipeline processes different types of entries using a variant/union type sys | Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization | |-------|-------------|-------------|---------------|---------------| | **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** | -| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** | -| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** | -| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required | +| **Resolve** | Check preconditions, update recent writes | N/A (empty husk) | Return true (shutdown) | **Required** | +| **Persist** | Apply operations, update high water mark | N/A (empty husk) | Return true (shutdown) | **Required** | +| **Release** | Return connection to HTTP threads | N/A (empty husk) | Return true (shutdown) | Not required (multiple threads) | ## API Endpoint Integration @@ -415,7 +416,7 @@ The pipeline processes different types of entries using a variant/union type sys ### `/v1/status` - Commit Status Lookup -**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool +**Pipeline Interaction**: StatusEntry through sequence stage only #### Request Processing Flow @@ -423,17 +424,21 @@ The pipeline processes different types of entries using a variant/union type sys ```cpp void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) { - // TODO: Extract request_id from URL and min_version from query params - // Current: Returns placeholder static response + // Extract request_id from URL and min_version from query params + // Create StatusEntry for pipeline processing } ``` -1. **Two-Phase Processing**: +1. **Pipeline Processing**: - - **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound - - **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic + - **Sequence Stage**: StatusEntry adds request_id to banned list, establishes version scanning range, transfers connection to status threadpool + - **Subsequent Stages**: Empty StatusEntry husk flows through resolve/persist/release as no-op -1. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id +1. **Status Lookup Logic**: + + - Version range determined in sequence stage (min_version parameter to version upper bound) + - Actual S3 scanning performed by separate status threadpool outside the pipeline + - Return "committed" with version if found, "not_found" if not found in scanned range ### `/v1/subscribe` - Real-time Transaction Stream diff --git a/design.md b/design.md index ff6cba0..ad518e3 100644 --- a/design.md +++ b/design.md @@ -22,10 +22,20 @@ WeaselDB is a high-performance write-side database component designed for system - **Ultra-fast arena allocation** (~1ns vs ~20-270ns for malloc) - **High-performance JSON parsing** with streaming support and SIMD optimization - **Multi-threaded networking** using multiple epoll instances with unified I/O thread pool +- **Multi-stage commit pipeline** with serial processing for consistency and parallel I/O for performance +- **Non-blocking metrics system** with try-lock optimization preventing pipeline stalls - **Configurable epoll instances** to eliminate kernel-level contention - **Optimized memory management** with arena allocation and efficient copying - **Factory pattern safety** ensuring correct object lifecycle management +### Design Philosophy + +**"Two machines once you've mastered one"** - Optimize aggressively for single-machine performance before distributing. Most systems prematurely scale horizontally and never fully utilize their hardware. How are you supposed to horizontally scale strict serializability anyway? + +**Boring formats, fast implementations** - Use standard data formats (JSON, HTTP, base64) with heavily optimized parsing. Universal compatibility without sacrificing performance. + +**Read/write separation** - Fan out reads from the single write stream (persist stage to many subscribers), with true horizontal scaling via S3 for historical data. Keep writes simple and fast. + ______________________________________________________________________ ## Quick Start @@ -359,6 +369,7 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions. - **CPU**: Perfect hashing and SIMD operations are critical paths - avoid alternatives - **I/O**: Streaming parser design supports incremental network data processing - **Cache**: String views avoid copying, keeping data cache-friendly +- **Pipeline**: Serial stages must never block - only parallel release stage can take locks ### Configuration & Testing diff --git a/threading_performance_report.md b/threading_performance_report.md index ce52578..62e3f41 100644 --- a/threading_performance_report.md +++ b/threading_performance_report.md @@ -66,10 +66,10 @@ I/O Threads (8) → HttpHandler::on_batch_complete() → Commit Pipeline | Stage 2: Persist (generate response) | (send "OK" response) | ↓ - | Stage 3: Release (connection return) + | Stage 3: Release (wake I/O threads) | (optimized futex wake) | ↓ - └─────────────────────── Connection returned to server pool + └─────────────────────── I/O threads send response to client ``` ## Test Configuration