Improve clarity

This commit is contained in:
2025-08-24 20:02:11 -04:00
parent f54d1e0dc1
commit 333148bb5a

View File

@@ -2,80 +2,110 @@
## Overview
The persistence thread receives commit batches from the main processing pipeline (via ThreadPipeline interface) and uploads them to S3. The transport layer uses a single-threaded TCP client with connection pooling, leaving protocol details (HTTP, authentication, etc.) to higher layers.
The persistence thread receives commit batches from the main processing pipeline and uploads them to S3. It uses a single-threaded design with connection pooling and batching for optimal performance.
## Core Flow
## Architecture
The persistence thread operates in a loop with integrated connection management and batching logic. Batches are triggered when any limit is reached: time (`batch_timeout_ms`) or size (`batch_size_threshold`). Flow control via `max_in_flight_requests` prevents new batches when at capacity.
**Input**: Commits arrive via `ThreadPipeline` interface from upstream processing
**Output**: Batched commits uploaded to S3 persistence backend
**Transport**: Single-threaded TCP client with connection pooling
**Protocol**: Higher layers handle HTTP, authentication, and S3-specific details
### Main Loop
## Batching Strategy
1. **Batch Acquisition** - Use `ThreadPipeline::acquire()` to get commit batches:
- If no in-flight requests: acquire blocking and process immediately
- If in-flight requests exist: collect commits until any trigger condition:
- **Time limit**: `batch_timeout_ms` elapsed since last batch sent
- **Size limit**: `batch_size_threshold` commits collected
- **Flow control**: at `max_in_flight_requests`, block until reply received
- Use epoll_wait with timeout for I/O event handling during collection
- If no commits available and no in-flight requests, switch to blocking acquire
The persistence thread collects commits into batches using two trigger conditions:
2. **Connection Management** - Acquire healthy connection from pool:
- Create new connections if pool below `target_pool_size`
- If no healthy connections available, block until one becomes available
- Maintain automatic pool replenishment
1. **Time Trigger**: `batch_timeout_ms` elapsed since batch collection started
2. **Size Trigger**: `batch_size_threshold` commits collected (can be exceeded by final commit)
3. **Data Transmission** - Send batch to S3:
- Write protocol data to connection
- Publish accepted transactions to subscriber system
- Track in-flight requests for flow control
**Flow Control**: When `max_in_flight_requests` reached, block until responses received.
4. **Event Processing** - Drive I/O and monitor health:
- Handle epoll events for all in-flight connections
- Monitor connection health via heartbeats
- Process responses and detect failures
## Main Processing Loop
5. **Response Handling** - Process successful persistence:
- Only acknowledge batch if all prior batches are durable (ordered acknowledgment)
- Release batch via StageGuard destructor (publishes to next pipeline stage)
- Publish durability events to subscriber system
- Return healthy connection to pool
### 1. Batch Collection
6. **Failure Handling** - Manage connection failures:
- Remove failed connection from pool
- Retry batch with exponential backoff (up to `max_retry_attempts`)
- Exponential backoff delays only affect the specific failing batch
- If retries exhausted, abort process or escalate error
- Initiate pool replenishment if below target
**No In-Flight Requests**:
- Use blocking acquire to get first commit batch
- Process immediately (no batching delay)
### Connection Pool
**With In-Flight Requests**:
- Check flow control: if at `max_in_flight_requests`, block for responses
- Collect commits using non-blocking acquire until trigger condition:
- Check for available commits (non-blocking)
- If `batch_size_threshold` reached → process batch immediately
- If below threshold → use `epoll_wait(batch_timeout_ms)` for I/O and timeout
- On timeout → process collected commits
- If no commits available and no in-flight requests → switch to blocking acquire
Maintains `target_pool_size` connections with automatic replenishment. Pool sizing should be 2x `max_in_flight_requests` to ensure availability during peak load and connection replacement scenarios.
### 2. Connection Management
## Implementation Considerations
- Acquire healthy connection from pool
- Create new connections if pool below `target_pool_size`
- If no healthy connections available, block until one becomes available
- Maintain automatic pool replenishment
**Batch Ordering**: Batches can be retried out-of-order for performance, but acknowledgment to the next pipeline stage must maintain ordering - a batch can only be acknowledged after all prior batches are durable.
### 3. Data Transmission
**Backpressure**: Retry delays for individual batches naturally create backpressure that eventually blocks the persistence thread when in-flight limits are reached.
- Write batch data to S3 connection using appropriate protocol
- Publish accepted transactions to subscriber system
- Track request as in-flight for flow control
**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating the persistence thread.
### 4. I/O Event Processing
## Configurable Parameters
- Handle epoll events for all in-flight connections
- Monitor connection health via heartbeats
- Process incoming responses and detect connection failures
- **`batch_timeout_ms`** (default: 5ms) - Maximum time to wait collecting additional commits for batching
- **`batch_size_threshold`** - Threshold for triggering batch processing
- **`max_in_flight_requests`** - Maximum number of concurrent requests to persistence backend
- **`target_pool_size`** (recommended: 2x `max_in_flight_requests`) - Target number of connections to maintain in pool
- **`max_retry_attempts`** (default: 3) - Maximum retries for failed batches before aborting
- **`retry_base_delay_ms`** (default: 100ms) - Base delay for exponential backoff retries
### 5. Response Handling
- **Ordered Acknowledgment**: Only acknowledge batch after all prior batches are durable
- Release batch via `StageGuard` destructor (publishes to next pipeline stage)
- Publish durability events to subscriber system
- Return healthy connection to pool
### 6. Failure Handling
- Remove failed connection from pool
- Retry batch with exponential backoff (up to `max_retry_attempts`)
- Backoff delays only affect the specific failing batch
- If retries exhausted, abort process or escalate error
- Initiate pool replenishment if below target
## Connection Pool
**Target Size**: `target_pool_size` connections (recommended: 2x `max_in_flight_requests`)
**Replenishment**: Automatic creation when below target
**Health Monitoring**: Heartbeat-based connection validation
**Sizing Rationale**: 2x multiplier ensures availability during peak load and connection replacement
## Key Design Properties
**Batch Ordering**: Batches may be retried out-of-order for performance, but acknowledgment to next pipeline stage maintains strict ordering.
**Backpressure**: Retry delays for failing batches create natural backpressure that eventually blocks the persistence thread when in-flight limits are reached.
**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating.
## Configuration Parameters
| Parameter | Default | Description |
|-----------|---------|-------------|
| `batch_timeout_ms` | 5ms | Maximum time to wait collecting commits for batching |
| `batch_size_threshold` | - | Threshold for triggering batch processing |
| `max_in_flight_requests` | - | Maximum concurrent requests to persistence backend |
| `target_pool_size` | 2x in-flight | Target number of connections to maintain |
| `max_retry_attempts` | 3 | Maximum retries for failed batches before aborting |
| `retry_base_delay_ms` | 100ms | Base delay for exponential backoff retries |
## Configuration Validation
The following validation rules should be enforced:
**Required Constraints**:
- `batch_size_threshold` > 0 (must process at least one commit per batch)
- `max_in_flight_requests` > 0 (must allow at least one concurrent request)
- `target_pool_size` >= `max_in_flight_requests` (pool must accommodate all in-flight requests)
- `batch_timeout_ms` > 0 (timeout must be positive)
- `max_retry_attempts` >= 0 (zero disables retries)
- `retry_base_delay_ms` > 0 (delay must be positive if retries enabled)
- **`batch_size_threshold`** > 0 (must process at least one commit per batch)
- **`max_in_flight_requests`** > 0 (must allow at least one concurrent request)
- **`target_pool_size`** >= `max_in_flight_requests` (pool must accommodate all in-flight requests)
- **`target_pool_size`** <= 2x `max_in_flight_requests` (recommended for optimal performance)
- **`batch_timeout_ms`** > 0 (timeout must be positive)
- **`max_retry_attempts`** >= 0 (zero disables retries)
- **`retry_base_delay_ms`** > 0 (delay must be positive if retries enabled)
**Performance Recommendations**:
- `target_pool_size` <= 2x `max_in_flight_requests` (optimal for performance)