4.4 KiB
Persistence Thread Design
Overview
The persistence thread receives commit batches from the main processing pipeline (via ThreadPipeline interface) and uploads them to S3. The transport layer uses a single-threaded TCP client with connection pooling, leaving protocol details (HTTP, authentication, etc.) to higher layers.
Core Flow
The persistence thread operates in a loop with integrated connection management and batching logic. Batches are triggered when any limit is reached: time (batch_timeout_ms) or size (batch_size_threshold). Flow control via max_in_flight_requests prevents new batches when at capacity.
Main Loop
-
Batch Acquisition - Use
ThreadPipeline::acquire()to get commit batches:- If no in-flight requests: acquire blocking and process immediately
- If in-flight requests exist: collect commits until any trigger condition:
- Time limit:
batch_timeout_mselapsed since last batch sent - Size limit:
batch_size_thresholdcommits collected - Flow control: at
max_in_flight_requests, block until reply received
- Time limit:
- Use epoll_wait with timeout for I/O event handling during collection
- If no commits available and no in-flight requests, switch to blocking acquire
-
Connection Management - Acquire healthy connection from pool:
- Create new connections if pool below
target_pool_size - If no healthy connections available, block until one becomes available
- Maintain automatic pool replenishment
- Create new connections if pool below
-
Data Transmission - Send batch to S3:
- Write protocol data to connection
- Publish accepted transactions to subscriber system
- Track in-flight requests for flow control
-
Event Processing - Drive I/O and monitor health:
- Handle epoll events for all in-flight connections
- Monitor connection health via heartbeats
- Process responses and detect failures
-
Response Handling - Process successful persistence:
- Only acknowledge batch if all prior batches are durable (ordered acknowledgment)
- Release batch via StageGuard destructor (publishes to next pipeline stage)
- Publish durability events to subscriber system
- Return healthy connection to pool
-
Failure Handling - Manage connection failures:
- Remove failed connection from pool
- Retry batch with exponential backoff (up to
max_retry_attempts) - Exponential backoff delays only affect the specific failing batch
- If retries exhausted, abort process or escalate error
- Initiate pool replenishment if below target
Connection Pool
Maintains target_pool_size connections with automatic replenishment. Pool sizing should be 2x max_in_flight_requests to ensure availability during peak load and connection replacement scenarios.
Implementation Considerations
Batch Ordering: Batches can be retried out-of-order for performance, but acknowledgment to the next pipeline stage must maintain ordering - a batch can only be acknowledged after all prior batches are durable.
Backpressure: Retry delays for individual batches naturally create backpressure that eventually blocks the persistence thread when in-flight limits are reached.
Graceful Shutdown: On shutdown signal, drain all in-flight batches to completion before terminating the persistence thread.
Configurable Parameters
batch_timeout_ms(default: 5ms) - Maximum time to wait collecting additional commits for batchingbatch_size_threshold- Threshold for triggering batch processingmax_in_flight_requests- Maximum number of concurrent requests to persistence backendtarget_pool_size(recommended: 2xmax_in_flight_requests) - Target number of connections to maintain in poolmax_retry_attempts(default: 3) - Maximum retries for failed batches before abortingretry_base_delay_ms(default: 100ms) - Base delay for exponential backoff retries
Configuration Validation
The following validation rules should be enforced:
batch_size_threshold> 0 (must process at least one commit per batch)max_in_flight_requests> 0 (must allow at least one concurrent request)target_pool_size>=max_in_flight_requests(pool must accommodate all in-flight requests)target_pool_size<= 2xmax_in_flight_requests(recommended for optimal performance)batch_timeout_ms> 0 (timeout must be positive)max_retry_attempts>= 0 (zero disables retries)retry_base_delay_ms> 0 (delay must be positive if retries enabled)