diff --git a/persistence.md b/persistence.md index b30f49a..0ea57b4 100644 --- a/persistence.md +++ b/persistence.md @@ -2,80 +2,110 @@ ## Overview -The persistence thread receives commit batches from the main processing pipeline (via ThreadPipeline interface) and uploads them to S3. The transport layer uses a single-threaded TCP client with connection pooling, leaving protocol details (HTTP, authentication, etc.) to higher layers. +The persistence thread receives commit batches from the main processing pipeline and uploads them to S3. It uses a single-threaded design with connection pooling and batching for optimal performance. -## Core Flow +## Architecture -The persistence thread operates in a loop with integrated connection management and batching logic. Batches are triggered when any limit is reached: time (`batch_timeout_ms`) or size (`batch_size_threshold`). Flow control via `max_in_flight_requests` prevents new batches when at capacity. +**Input**: Commits arrive via `ThreadPipeline` interface from upstream processing +**Output**: Batched commits uploaded to S3 persistence backend +**Transport**: Single-threaded TCP client with connection pooling +**Protocol**: Higher layers handle HTTP, authentication, and S3-specific details -### Main Loop +## Batching Strategy -1. **Batch Acquisition** - Use `ThreadPipeline::acquire()` to get commit batches: - - If no in-flight requests: acquire blocking and process immediately - - If in-flight requests exist: collect commits until any trigger condition: - - **Time limit**: `batch_timeout_ms` elapsed since last batch sent - - **Size limit**: `batch_size_threshold` commits collected - - **Flow control**: at `max_in_flight_requests`, block until reply received - - Use epoll_wait with timeout for I/O event handling during collection - - If no commits available and no in-flight requests, switch to blocking acquire +The persistence thread collects commits into batches using two trigger conditions: -2. **Connection Management** - Acquire healthy connection from pool: - - Create new connections if pool below `target_pool_size` - - If no healthy connections available, block until one becomes available - - Maintain automatic pool replenishment +1. **Time Trigger**: `batch_timeout_ms` elapsed since batch collection started +2. **Size Trigger**: `batch_size_threshold` commits collected (can be exceeded by final commit) -3. **Data Transmission** - Send batch to S3: - - Write protocol data to connection - - Publish accepted transactions to subscriber system - - Track in-flight requests for flow control +**Flow Control**: When `max_in_flight_requests` reached, block until responses received. -4. **Event Processing** - Drive I/O and monitor health: - - Handle epoll events for all in-flight connections - - Monitor connection health via heartbeats - - Process responses and detect failures +## Main Processing Loop -5. **Response Handling** - Process successful persistence: - - Only acknowledge batch if all prior batches are durable (ordered acknowledgment) - - Release batch via StageGuard destructor (publishes to next pipeline stage) - - Publish durability events to subscriber system - - Return healthy connection to pool +### 1. Batch Collection -6. **Failure Handling** - Manage connection failures: - - Remove failed connection from pool - - Retry batch with exponential backoff (up to `max_retry_attempts`) - - Exponential backoff delays only affect the specific failing batch - - If retries exhausted, abort process or escalate error - - Initiate pool replenishment if below target +**No In-Flight Requests**: +- Use blocking acquire to get first commit batch +- Process immediately (no batching delay) -### Connection Pool +**With In-Flight Requests**: +- Check flow control: if at `max_in_flight_requests`, block for responses +- Collect commits using non-blocking acquire until trigger condition: + - Check for available commits (non-blocking) + - If `batch_size_threshold` reached → process batch immediately + - If below threshold → use `epoll_wait(batch_timeout_ms)` for I/O and timeout + - On timeout → process collected commits +- If no commits available and no in-flight requests → switch to blocking acquire -Maintains `target_pool_size` connections with automatic replenishment. Pool sizing should be 2x `max_in_flight_requests` to ensure availability during peak load and connection replacement scenarios. +### 2. Connection Management -## Implementation Considerations +- Acquire healthy connection from pool +- Create new connections if pool below `target_pool_size` +- If no healthy connections available, block until one becomes available +- Maintain automatic pool replenishment -**Batch Ordering**: Batches can be retried out-of-order for performance, but acknowledgment to the next pipeline stage must maintain ordering - a batch can only be acknowledged after all prior batches are durable. +### 3. Data Transmission -**Backpressure**: Retry delays for individual batches naturally create backpressure that eventually blocks the persistence thread when in-flight limits are reached. +- Write batch data to S3 connection using appropriate protocol +- Publish accepted transactions to subscriber system +- Track request as in-flight for flow control -**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating the persistence thread. +### 4. I/O Event Processing -## Configurable Parameters +- Handle epoll events for all in-flight connections +- Monitor connection health via heartbeats +- Process incoming responses and detect connection failures -- **`batch_timeout_ms`** (default: 5ms) - Maximum time to wait collecting additional commits for batching -- **`batch_size_threshold`** - Threshold for triggering batch processing -- **`max_in_flight_requests`** - Maximum number of concurrent requests to persistence backend -- **`target_pool_size`** (recommended: 2x `max_in_flight_requests`) - Target number of connections to maintain in pool -- **`max_retry_attempts`** (default: 3) - Maximum retries for failed batches before aborting -- **`retry_base_delay_ms`** (default: 100ms) - Base delay for exponential backoff retries +### 5. Response Handling + +- **Ordered Acknowledgment**: Only acknowledge batch after all prior batches are durable +- Release batch via `StageGuard` destructor (publishes to next pipeline stage) +- Publish durability events to subscriber system +- Return healthy connection to pool + +### 6. Failure Handling + +- Remove failed connection from pool +- Retry batch with exponential backoff (up to `max_retry_attempts`) +- Backoff delays only affect the specific failing batch +- If retries exhausted, abort process or escalate error +- Initiate pool replenishment if below target + +## Connection Pool + +**Target Size**: `target_pool_size` connections (recommended: 2x `max_in_flight_requests`) +**Replenishment**: Automatic creation when below target +**Health Monitoring**: Heartbeat-based connection validation +**Sizing Rationale**: 2x multiplier ensures availability during peak load and connection replacement + +## Key Design Properties + +**Batch Ordering**: Batches may be retried out-of-order for performance, but acknowledgment to next pipeline stage maintains strict ordering. + +**Backpressure**: Retry delays for failing batches create natural backpressure that eventually blocks the persistence thread when in-flight limits are reached. + +**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating. + +## Configuration Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `batch_timeout_ms` | 5ms | Maximum time to wait collecting commits for batching | +| `batch_size_threshold` | - | Threshold for triggering batch processing | +| `max_in_flight_requests` | - | Maximum concurrent requests to persistence backend | +| `target_pool_size` | 2x in-flight | Target number of connections to maintain | +| `max_retry_attempts` | 3 | Maximum retries for failed batches before aborting | +| `retry_base_delay_ms` | 100ms | Base delay for exponential backoff retries | ## Configuration Validation -The following validation rules should be enforced: +**Required Constraints**: +- `batch_size_threshold` > 0 (must process at least one commit per batch) +- `max_in_flight_requests` > 0 (must allow at least one concurrent request) +- `target_pool_size` >= `max_in_flight_requests` (pool must accommodate all in-flight requests) +- `batch_timeout_ms` > 0 (timeout must be positive) +- `max_retry_attempts` >= 0 (zero disables retries) +- `retry_base_delay_ms` > 0 (delay must be positive if retries enabled) -- **`batch_size_threshold`** > 0 (must process at least one commit per batch) -- **`max_in_flight_requests`** > 0 (must allow at least one concurrent request) -- **`target_pool_size`** >= `max_in_flight_requests` (pool must accommodate all in-flight requests) -- **`target_pool_size`** <= 2x `max_in_flight_requests` (recommended for optimal performance) -- **`batch_timeout_ms`** > 0 (timeout must be positive) -- **`max_retry_attempts`** >= 0 (zero disables retries) -- **`retry_base_delay_ms`** > 0 (delay must be positive if retries enabled) \ No newline at end of file +**Performance Recommendations**: +- `target_pool_size` <= 2x `max_in_flight_requests` (optimal for performance)