Compare commits

...

5 Commits

4 changed files with 2290 additions and 0 deletions

View File

@@ -0,0 +1,970 @@
#!/usr/bin/env python3
"""
Database Persistence Pattern Comparison
Compares WeaselDB's batched S3 persistence approach against traditional
database persistence patterns to understand trade-offs in latency,
throughput, consistency, and operational complexity.
Simulated approaches:
1. WeaselDB: Batched async S3 persistence with optimistic concurrency
2. Traditional WAL: Write-ahead log with periodic sync to disk
3. Synchronous: Immediate disk sync per transaction (like PostgreSQL serializable)
4. Group Commit: Batched disk writes with configurable group size
5. Async Replication: Immediate response with async durability
"""
import heapq
import random
import statistics
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple, Any
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
from abc import ABC, abstractmethod
from enum import Enum
import time
class PersistencePattern(Enum):
WEASELDB_S3 = "WeaselDB S3 Batched"
TRADITIONAL_WAL = "Traditional WAL"
SYNCHRONOUS = "Synchronous Disk"
GROUP_COMMIT = "Group Commit"
ASYNC_REPLICATION = "Async Replication"
@dataclass
class Transaction:
"""Represents a database transaction/commit"""
txn_id: int
arrival_time: float
size_bytes: int = 1024
requires_durability: bool = True # Some txns can be async
@dataclass
class PersistenceMetrics:
"""Metrics for a persistence approach"""
pattern: PersistencePattern
total_transactions: int
completed_transactions: int
# Latency metrics (milliseconds)
min_latency: float
mean_latency: float
median_latency: float
p95_latency: float
p99_latency: float
max_latency: float
# Throughput metrics
avg_throughput_tps: float
peak_throughput_tps: float
# Resource metrics
avg_disk_iops: float
avg_network_mbps: float
storage_efficiency: float # GB stored / GB logical data
# Consistency metrics
durability_guarantee: str
consistency_model: str
recovery_time_estimate: float
# Operational metrics
operational_complexity: int # 1-10 scale
infrastructure_cost: float # Relative cost per transaction
class PersistenceSimulator(ABC):
"""Abstract base class for persistence pattern simulators"""
def __init__(self,
simulation_duration: float = 60.0,
arrival_rate_per_sec: float = 1000.0):
self.simulation_duration = simulation_duration
self.arrival_rate_per_sec = arrival_rate_per_sec
# Simulation state
self.current_time = 0.0
self.event_queue = []
self.completed_transactions = []
self.next_txn_id = 0
# Metrics tracking
self.disk_iops = []
self.network_usage = []
self.throughput_samples = []
# Random number generator
self.rng = np.random.RandomState(42)
def generate_transaction(self, arrival_time: float) -> Transaction:
"""Generate a transaction with realistic characteristics"""
# Size distribution: mostly small, some large
if self.rng.random() < 0.8: # 80% small transactions
size = self.rng.randint(100, 2048) # 100B - 2KB
elif self.rng.random() < 0.95: # 15% medium
size = self.rng.randint(2048, 20480) # 2KB - 20KB
else: # 5% large transactions
size = self.rng.randint(20480, 102400) # 20KB - 100KB
return Transaction(
txn_id=self.next_txn_id,
arrival_time=arrival_time,
size_bytes=size,
requires_durability=True
)
@abstractmethod
def process_transaction(self, txn: Transaction) -> None:
"""Process a transaction according to the persistence pattern"""
pass
@abstractmethod
def get_pattern_name(self) -> PersistencePattern:
"""Return the persistence pattern this simulator implements"""
pass
def run_simulation(self) -> PersistenceMetrics:
"""Run the simulation and return metrics"""
# Generate arrival events
self._generate_arrivals()
# Process events
while self.event_queue and self.current_time < self.simulation_duration:
time, event_type, data = heapq.heappop(self.event_queue)
self.current_time = time
if event_type == 'transaction_arrival':
self.process_transaction(data)
elif event_type == 'custom':
self._handle_custom_event(data)
return self._calculate_metrics()
def _generate_arrivals(self):
"""Generate Poisson arrival events"""
time = 0.0
while time < self.simulation_duration:
inter_arrival = self.rng.exponential(1.0 / self.arrival_rate_per_sec)
time += inter_arrival
if time >= self.simulation_duration:
break
txn = self.generate_transaction(time)
self.next_txn_id += 1
heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn))
def _handle_custom_event(self, data):
"""Handle custom events - override in subclasses"""
pass
def schedule_event(self, time: float, event_type: str, data: Any):
"""Schedule a custom event"""
heapq.heappush(self.event_queue, (time, event_type, data))
def _calculate_metrics(self) -> PersistenceMetrics:
"""Calculate performance metrics from completed transactions"""
if not self.completed_transactions:
raise ValueError("No transactions completed")
latencies = [txn['latency_ms'] for txn in self.completed_transactions]
return PersistenceMetrics(
pattern=self.get_pattern_name(),
total_transactions=self.next_txn_id,
completed_transactions=len(self.completed_transactions),
# Latency metrics
min_latency=min(latencies),
mean_latency=statistics.mean(latencies),
median_latency=statistics.median(latencies),
p95_latency=np.percentile(latencies, 95),
p99_latency=np.percentile(latencies, 99),
max_latency=max(latencies),
# Throughput metrics
avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration,
peak_throughput_tps=self._calculate_peak_throughput(),
# Resource metrics - implemented by subclasses
avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0,
avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0,
storage_efficiency=self._calculate_storage_efficiency(),
# Pattern-specific characteristics
durability_guarantee=self._get_durability_guarantee(),
consistency_model=self._get_consistency_model(),
recovery_time_estimate=self._get_recovery_time(),
operational_complexity=self._get_operational_complexity(),
infrastructure_cost=self._get_infrastructure_cost()
)
def _calculate_peak_throughput(self) -> float:
"""Calculate peak throughput in 1-second windows"""
if not self.completed_transactions:
return 0.0
# Group transactions by second
throughput_by_second = defaultdict(int)
for txn in self.completed_transactions:
second = int(txn['completion_time'])
throughput_by_second[second] += 1
return max(throughput_by_second.values()) if throughput_by_second else 0
# Abstract methods for pattern-specific characteristics
@abstractmethod
def _calculate_storage_efficiency(self) -> float:
pass
@abstractmethod
def _get_durability_guarantee(self) -> str:
pass
@abstractmethod
def _get_consistency_model(self) -> str:
pass
@abstractmethod
def _get_recovery_time(self) -> float:
pass
@abstractmethod
def _get_operational_complexity(self) -> int:
pass
@abstractmethod
def _get_infrastructure_cost(self) -> float:
pass
class WeaselDBSimulator(PersistenceSimulator):
"""WeaselDB's batched S3 persistence simulation"""
def __init__(self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs):
super().__init__(**kwargs)
self.batch_timeout_ms = batch_timeout_ms
self.batch_size_threshold = batch_size_threshold
self.max_in_flight = max_in_flight
# State
self.current_batch = []
self.batch_start_time = None
self.in_flight_batches = {}
self.next_batch_id = 0
# S3 characteristics
self.s3_base_latency_ms = 60.0 # From our simulation
self.s3_size_penalty_per_mb = 20.0
def get_pattern_name(self) -> PersistencePattern:
return PersistencePattern.WEASELDB_S3
def process_transaction(self, txn: Transaction):
"""Process transaction using WeaselDB batching logic"""
# Add to current batch
if not self.current_batch:
self.batch_start_time = self.current_time
self.current_batch.append(txn)
# Check if we should send batch
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
self._send_current_batch()
def _should_send_batch(self) -> bool:
"""Check batch triggers"""
if not self.current_batch:
return False
# Size trigger
batch_size = sum(txn.size_bytes for txn in self.current_batch)
if batch_size >= self.batch_size_threshold:
return True
# Time trigger
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
return True
return False
def _send_current_batch(self):
"""Send current batch to S3"""
if not self.current_batch:
return
batch_id = self.next_batch_id
self.next_batch_id += 1
batch_size = sum(txn.size_bytes for txn in self.current_batch)
# Sample S3 latency
s3_latency = self._sample_s3_latency(batch_size) / 1000.0
completion_time = self.current_time + s3_latency
# Track batch
self.in_flight_batches[batch_id] = {
'transactions': self.current_batch.copy(),
'sent_time': self.current_time,
'completion_time': completion_time,
'size_bytes': batch_size
}
# Schedule completion
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
# Track network usage
self.network_usage.append(batch_size / (1024 * 1024)) # MB
# Clear batch
self.current_batch.clear()
self.batch_start_time = None
def _sample_s3_latency(self, batch_size_bytes: int) -> float:
"""Sample S3 latency with size scaling"""
base = self.s3_base_latency_ms
size_penalty = (batch_size_bytes / (1024 * 1024)) * self.s3_size_penalty_per_mb
variable = self.rng.gamma(2.0, 15.0) # Variable component
return 30.0 + variable + size_penalty # 30ms RTT + variable + size
def _handle_custom_event(self, data):
"""Handle batch completion events"""
if data['type'] == 'batch_complete':
batch_id = data['batch_id']
if batch_id in self.in_flight_batches:
batch = self.in_flight_batches.pop(batch_id)
# Mark transactions complete
for txn in batch['transactions']:
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
def _calculate_storage_efficiency(self) -> float:
return 1.0 # S3 has no storage overhead
def _get_durability_guarantee(self) -> str:
return "Eventually durable (S3 11 9's)"
def _get_consistency_model(self) -> str:
return "Optimistic concurrency, eventual consistency"
def _get_recovery_time(self) -> float:
return 0.1 # Fast recovery, just reconnect to S3
def _get_operational_complexity(self) -> int:
return 3 # Moderate - S3 managed, but need batching logic
def _get_infrastructure_cost(self) -> float:
return 1.0 # Baseline cost
class TraditionalWALSimulator(PersistenceSimulator):
"""Traditional Write-Ahead Log with periodic sync"""
def __init__(self,
wal_sync_interval_ms: float = 10.0,
checkpoint_interval_sec: float = 30.0,
**kwargs):
super().__init__(**kwargs)
self.wal_sync_interval_ms = wal_sync_interval_ms
self.checkpoint_interval_sec = checkpoint_interval_sec
# WAL state
self.wal_buffer = []
self.last_sync_time = 0.0
self.pending_transactions = {} # Waiting for sync
# EBS characteristics
self.disk_latency_ms = 1.0 # EBS base latency
self.disk_iops_limit = 10000 # Typical SSD
# Schedule periodic syncs
self._schedule_periodic_syncs()
def get_pattern_name(self) -> PersistencePattern:
return PersistencePattern.TRADITIONAL_WAL
def process_transaction(self, txn: Transaction):
"""Write to WAL buffer, wait for sync"""
# Write to WAL buffer (immediate)
self.wal_buffer.append(txn)
self.pending_transactions[txn.txn_id] = txn
# Track disk IOPS (write to WAL)
self.disk_iops.append(1.0)
def _schedule_periodic_syncs(self):
"""Schedule periodic WAL sync events"""
sync_time = self.wal_sync_interval_ms / 1000.0
while sync_time < self.simulation_duration:
self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'})
sync_time += self.wal_sync_interval_ms / 1000.0
def _handle_custom_event(self, data):
"""Handle WAL sync events"""
if data['type'] == 'wal_sync':
self._perform_wal_sync()
def _perform_wal_sync(self):
"""Sync WAL buffer to disk"""
if not self.wal_buffer:
return
# Calculate sync latency based on buffer size
buffer_size = sum(txn.size_bytes for txn in self.wal_buffer)
sync_latency = self._calculate_disk_sync_latency(buffer_size)
completion_time = self.current_time + sync_latency / 1000.0
# Schedule sync completion
syncing_txns = list(self.wal_buffer)
self.schedule_event(completion_time, 'custom', {
'type': 'sync_complete',
'transactions': syncing_txns
})
# Track IOPS for sync operation
self.disk_iops.append(len(self.wal_buffer))
# Clear buffer
self.wal_buffer.clear()
self.last_sync_time = self.current_time
def _calculate_disk_sync_latency(self, size_bytes: int) -> float:
"""Calculate disk sync latency with realistic fsync modeling including directory sync"""
# Data write to page cache
write_latency = 0.1 # Fast write to page cache
# EBS sequential write throughput
throughput_mbps = 1000.0 # EBS gp3 throughput
size_mb = size_bytes / (1024 * 1024)
transfer_latency = size_mb * (1000.0 / throughput_mbps)
# WAL file fsync latency on EBS
# fdatasync on EBS with network replication
file_fsync_base = 0.8 # Higher base latency on EBS
file_fsync_variable = self.rng.gamma(2.2, 0.4) # More variable due to network
# Size penalty for large WAL syncs
size_penalty = min(size_mb * 0.05, 1.0) # Smaller penalty than batched writes
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
# WAL typically appends to existing files, so no directory fsync needed
# Directory fsync only required when creating new WAL segments
return write_latency + transfer_latency + file_fsync_latency
def _handle_custom_event(self, data):
"""Handle sync completion"""
if data['type'] == 'wal_sync':
self._perform_wal_sync()
elif data['type'] == 'sync_complete':
# Mark transactions as durable
for txn in data['transactions']:
if txn.txn_id in self.pending_transactions:
del self.pending_transactions[txn.txn_id]
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
def _calculate_storage_efficiency(self) -> float:
return 2.0 # WAL + main storage
def _get_durability_guarantee(self) -> str:
return "ACID durable after WAL sync"
def _get_consistency_model(self) -> str:
return "Strict ACID consistency"
def _get_recovery_time(self) -> float:
return 30.0 # WAL replay time
def _get_operational_complexity(self) -> int:
return 7 # Complex - WAL management, checkpoints, recovery
def _get_infrastructure_cost(self) -> float:
return 1.5 # Higher due to disk I/O overhead
class SynchronousSimulator(PersistenceSimulator):
"""Synchronous disk persistence (like PostgreSQL with synchronous_commit=on)"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.disk_latency_ms = 1.0 # EBS base latency
def get_pattern_name(self) -> PersistencePattern:
return PersistencePattern.SYNCHRONOUS
def process_transaction(self, txn: Transaction):
"""Immediately sync transaction to disk"""
# Calculate disk write latency
disk_latency = self._calculate_disk_latency(txn.size_bytes) / 1000.0
completion_time = self.current_time + disk_latency
# Schedule completion
self.schedule_event(completion_time, 'custom', {
'type': 'disk_write_complete',
'transaction': txn
})
# Track disk IOPS
self.disk_iops.append(1.0)
def _calculate_disk_latency(self, size_bytes: int) -> float:
"""Calculate per-transaction disk write latency with fsync"""
# Write to page cache (fast)
write_latency = 0.1
# FSYNC latency for immediate durability on EBS
# Each transaction requires its own fsync - expensive on network storage!
fsync_base = 1.5 # Much higher base latency for individual fsyncs on EBS
fsync_variable = self.rng.gamma(2.3, 0.5) # More variable due to network
# Small write penalty - less efficient than batched writes
if size_bytes < 4096:
penalty = 0.5 # Individual small writes are less efficient
else:
penalty = 0.0
fsync_latency = fsync_base + fsync_variable + penalty
# Synchronous commits to existing database files don't need directory fsync
# Directory fsync only needed when creating new database files/tablespaces
return write_latency + fsync_latency
def _handle_custom_event(self, data):
"""Handle disk write completion"""
if data['type'] == 'disk_write_complete':
txn = data['transaction']
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
def _calculate_storage_efficiency(self) -> float:
return 1.5 # Some overhead for metadata
def _get_durability_guarantee(self) -> str:
return "Immediate ACID durability"
def _get_consistency_model(self) -> str:
return "Strict ACID with immediate consistency"
def _get_recovery_time(self) -> float:
return 5.0 # Fast recovery, data already on disk
def _get_operational_complexity(self) -> int:
return 5 # Moderate - standard database operations
def _get_infrastructure_cost(self) -> float:
return 2.0 # High due to disk I/O per transaction
class WeaselDBDiskSimulator(PersistenceSimulator):
"""WeaselDB's batched disk persistence simulation"""
def __init__(self,
batch_timeout_ms: float = 1.0,
batch_size_threshold: int = 800000,
max_in_flight: int = 50,
**kwargs):
super().__init__(**kwargs)
self.batch_timeout_ms = batch_timeout_ms
self.batch_size_threshold = batch_size_threshold
self.max_in_flight = max_in_flight
# State
self.current_batch = []
self.batch_start_time = None
self.in_flight_batches = {}
self.next_batch_id = 0
# EBS characteristics (gp3 or io2 volumes)
self.disk_base_latency_ms = 0.5 # EBS has higher base latency than local NVMe
self.disk_throughput_mbps = 1000.0 # EBS gp3 max throughput
def get_pattern_name(self) -> PersistencePattern:
return PersistencePattern.WEASELDB_S3 # Reuse enum, but it's actually disk
def process_transaction(self, txn: Transaction):
"""Process transaction using WeaselDB batching logic"""
# Add to current batch
if not self.current_batch:
self.batch_start_time = self.current_time
self.current_batch.append(txn)
# Check if we should send batch
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
self._send_current_batch()
def _should_send_batch(self) -> bool:
"""Check batch triggers"""
if not self.current_batch:
return False
# Size trigger
batch_size = sum(txn.size_bytes for txn in self.current_batch)
if batch_size >= self.batch_size_threshold:
return True
# Time trigger
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
return True
return False
def _send_current_batch(self):
"""Send current batch to disk"""
if not self.current_batch:
return
batch_id = self.next_batch_id
self.next_batch_id += 1
batch_size = sum(txn.size_bytes for txn in self.current_batch)
# Sample disk write latency
disk_latency = self._sample_disk_latency(batch_size) / 1000.0
completion_time = self.current_time + disk_latency
# Track batch
self.in_flight_batches[batch_id] = {
'transactions': self.current_batch.copy(),
'sent_time': self.current_time,
'completion_time': completion_time,
'size_bytes': batch_size
}
# Schedule completion
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
# Track disk IOPS (one write operation per batch)
self.disk_iops.append(1.0)
# Clear batch
self.current_batch.clear()
self.batch_start_time = None
def _sample_disk_latency(self, batch_size_bytes: int) -> float:
"""Sample disk write latency with realistic fsync modeling including directory sync"""
# Base latency for the write command (data goes to page cache)
write_latency = self.disk_base_latency_ms
# Throughput-based latency for data transfer to page cache
size_mb = batch_size_bytes / (1024 * 1024)
transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms
# FSYNC latency for EBS - forces write to replicated storage
# EBS has higher fsync latency due to network replication
file_fsync_base = 1.0 # Higher base fsync latency for EBS
file_fsync_variable = self.rng.gamma(2.5, 0.6) # More variable due to network
# Size-dependent fsync penalty for large batches
size_penalty = min(size_mb * 0.1, 2.0) # Max 2ms penalty
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
# Directory fsync latency - required for WeaselDB batch file creation on EBS
# EBS directory metadata sync is also network-replicated
dir_fsync_base = 0.5 # Higher directory metadata sync latency on EBS
dir_fsync_variable = self.rng.gamma(1.8, 0.3) # More variable due to network
dir_fsync_latency = dir_fsync_base + dir_fsync_variable
# Total latency: write + file fsync + directory fsync
# WeaselDB needs directory fsync for batch file durability guarantees
return write_latency + transfer_latency + file_fsync_latency + dir_fsync_latency
def _handle_custom_event(self, data):
"""Handle batch completion events"""
if data['type'] == 'batch_complete':
batch_id = data['batch_id']
if batch_id in self.in_flight_batches:
batch = self.in_flight_batches.pop(batch_id)
# Mark transactions complete
for txn in batch['transactions']:
latency_ms = (self.current_time - txn.arrival_time) * 1000
self.completed_transactions.append({
'txn_id': txn.txn_id,
'arrival_time': txn.arrival_time,
'completion_time': self.current_time,
'latency_ms': latency_ms,
'size_bytes': txn.size_bytes
})
def _calculate_storage_efficiency(self) -> float:
return 1.2 # Some overhead for batching metadata
def _get_durability_guarantee(self) -> str:
return "ACID durable after EBS replication"
def _get_consistency_model(self) -> str:
return "Strict serializable with optimistic concurrency"
def _get_recovery_time(self) -> float:
return 10.0 # EBS volume attachment + recovery
def _get_operational_complexity(self) -> int:
return 4 # Moderate - batching logic + disk management
def _get_infrastructure_cost(self) -> float:
return 1.4 # Higher than S3 due to EBS provisioned storage + replication
class DatabaseComparisonFramework:
"""Framework for comparing different database persistence patterns"""
def __init__(self, simulation_duration: float = 30.0):
self.simulation_duration = simulation_duration
def run_comparison(self,
arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]:
"""Run comparison across multiple arrival rates"""
results = defaultdict(list)
for rate in arrival_rates:
print(f"\nTesting at {rate} TPS...")
# WeaselDB S3 (optimized config)
weasel_s3 = WeaselDBSimulator(
batch_timeout_ms=1.0,
batch_size_threshold=800000,
max_in_flight=50,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
)
weasel_s3_metrics = weasel_s3.run_simulation()
results['WeaselDB S3'].append(weasel_s3_metrics)
print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms")
# WeaselDB EBS (same batching, EBS storage)
weasel_ebs = WeaselDBDiskSimulator(
batch_timeout_ms=1.0,
batch_size_threshold=800000,
max_in_flight=50,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
)
weasel_ebs_metrics = weasel_ebs.run_simulation()
results['WeaselDB EBS'].append(weasel_ebs_metrics)
print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms")
# Traditional WAL
wal = TraditionalWALSimulator(
wal_sync_interval_ms=10.0,
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
)
wal_metrics = wal.run_simulation()
results['Traditional WAL'].append(wal_metrics)
print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms")
# Synchronous
sync = SynchronousSimulator(
simulation_duration=self.simulation_duration,
arrival_rate_per_sec=rate
)
sync_metrics = sync.run_simulation()
results['Synchronous'].append(sync_metrics)
print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms")
return dict(results)
def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]):
"""Print comprehensive comparison report"""
print("\n" + "="*80)
print("DATABASE PERSISTENCE PATTERN COMPARISON")
print("="*80)
# Get arrival rates for headers
arrival_rates = [100, 500, 1000, 2000]
# Latency comparison
print(f"\nP95 LATENCY COMPARISON (ms)")
print(f"{'Pattern':<20}", end="")
for rate in arrival_rates:
print(f"{rate:>8} TPS", end="")
print()
print("-" * 60)
for pattern_name, metrics_list in results.items():
print(f"{pattern_name:<20}", end="")
for metrics in metrics_list:
print(f"{metrics.p95_latency:>8.1f}", end="")
print()
# Throughput comparison
print(f"\nTHROUGHPUT ACHIEVED (TPS)")
print(f"{'Pattern':<20}", end="")
for rate in arrival_rates:
print(f"{rate:>8} TPS", end="")
print()
print("-" * 60)
for pattern_name, metrics_list in results.items():
print(f"{pattern_name:<20}", end="")
for metrics in metrics_list:
print(f"{metrics.avg_throughput_tps:>8.1f}", end="")
print()
# Characteristics comparison
print(f"\nSYSTEM CHARACTERISTICS")
print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}")
print("-" * 85)
for pattern_name, metrics_list in results.items():
metrics = metrics_list[0] # Use first metrics for characteristics
print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
f"{metrics.infrastructure_cost:<6.1f}")
# Performance sweet spots
print(f"\nPERFORMANCE SWEET SPOTS")
print("-" * 40)
for rate in arrival_rates:
print(f"\nAt {rate} TPS:")
rate_results = [(name, metrics_list[arrival_rates.index(rate)])
for name, metrics_list in results.items()]
# Sort by P95 latency
rate_results.sort(key=lambda x: x[1].p95_latency)
for i, (name, metrics) in enumerate(rate_results):
rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
f"{metrics.avg_throughput_tps:.0f} TPS achieved")
def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]],
save_path: Optional[str] = None):
"""Plot comparison results"""
try:
arrival_rates = [100, 500, 1000, 2000]
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Database Persistence Pattern Comparison', fontsize=16)
# Plot 1: P95 Latency vs Load
for pattern_name, metrics_list in results.items():
p95_latencies = [m.p95_latency for m in metrics_list]
ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name)
ax1.set_xlabel('Arrival Rate (TPS)')
ax1.set_ylabel('P95 Latency (ms)')
ax1.set_title('P95 Latency vs Load')
ax1.legend()
ax1.grid(True, alpha=0.3)
ax1.set_yscale('log')
# Plot 2: Throughput Achieved
for pattern_name, metrics_list in results.items():
throughputs = [m.avg_throughput_tps for m in metrics_list]
ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name)
# Perfect throughput line
ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)')
ax2.set_xlabel('Target Rate (TPS)')
ax2.set_ylabel('Achieved Throughput (TPS)')
ax2.set_title('Throughput: Target vs Achieved')
ax2.legend()
ax2.grid(True, alpha=0.3)
# Plot 3: Latency Distribution at 1000 TPS
rate_idx = 2 # 1000 TPS
for pattern_name, metrics_list in results.items():
metrics = metrics_list[rate_idx]
# Plot latency percentiles
percentiles = [50, 95, 99]
values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency]
ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values,
alpha=0.7, label=pattern_name)
ax3.set_ylabel('Latency (ms)')
ax3.set_title('Latency Percentiles at 1000 TPS')
ax3.set_yscale('log')
ax3.grid(True, alpha=0.3)
# Plot 4: Cost vs Performance
for pattern_name, metrics_list in results.items():
costs = [m.infrastructure_cost for m in metrics_list]
p95s = [m.p95_latency for m in metrics_list]
# Use different markers for different patterns
markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'}
marker = markers.get(pattern_name, 'o')
ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name)
ax4.set_xlabel('Infrastructure Cost (relative)')
ax4.set_ylabel('P95 Latency (ms)')
ax4.set_title('Cost vs Performance Trade-off')
ax4.legend()
ax4.grid(True, alpha=0.3)
ax4.set_yscale('log')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Comparison plots saved to {save_path}")
else:
plt.show()
except Exception as e:
print(f"Could not generate plots: {e}")
def main():
"""Run database persistence pattern comparison"""
print("Database Persistence Pattern Comparison")
print("Comparing WeaselDB vs Traditional Database Approaches")
print()
comparison = DatabaseComparisonFramework(simulation_duration=20.0)
results = comparison.run_comparison()
comparison.print_comparison_report(results)
try:
comparison.plot_comparison_results(results, 'database_comparison.png')
except Exception as e:
print(f"Could not generate plots: {e}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,461 @@
#!/usr/bin/env python3
"""
Persistence Thread Parameter Optimization
Uses Bayesian Optimization to automatically find the optimal configuration
parameters that minimize commit latency. This is much more efficient than
grid search since it uses a probabilistic model to guide parameter exploration.
Key advantages:
- Efficiently explores high-dimensional parameter spaces
- Uses previous simulation results to guide future parameter choices
- Handles expensive objective function evaluations (our simulation)
- Provides uncertainty estimates for parameter importance
"""
import numpy as np
from typing import Dict, List, Tuple, Optional
import time
from persistence_simulation import PersistenceSimulation, print_results
# Try to import scikit-optimize for Bayesian optimization
try:
from skopt import gp_minimize, forest_minimize
from skopt.space import Real, Integer
from skopt.utils import use_named_args
from skopt.plots import plot_convergence, plot_objective
import matplotlib.pyplot as plt
OPTIMIZE_AVAILABLE = True
except ImportError:
print("scikit-optimize not available. Install with: pip install scikit-optimize")
print("Falling back to grid search...")
OPTIMIZE_AVAILABLE = False
class PersistenceOptimizer:
"""
Automated parameter optimization for the persistence thread using Bayesian optimization.
This class finds the optimal configuration parameters to minimize commit latency
by intelligently exploring the parameter space using Gaussian Process models.
"""
def __init__(self,
optimization_budget: int = 50,
simulation_duration: float = 20.0,
arrival_rate: float = 1000.0,
objective_metric: str = "p95_latency",
random_seed: int = 42):
self.optimization_budget = optimization_budget
self.simulation_duration = simulation_duration
self.arrival_rate = arrival_rate
self.objective_metric = objective_metric
self.random_seed = random_seed
# Track optimization history
self.optimization_history = []
self.best_params = None
self.best_score = float('inf')
# Define parameter search space
self.parameter_space = self._define_search_space()
self.parameter_names = [dim.name for dim in self.parameter_space]
def _define_search_space(self) -> List:
"""
Define the parameter search space for optimization.
Focus on the 3 core parameters that matter for persistence thread performance
with 100% reliable S3. Retry parameters removed since S3 never fails.
"""
return [
# Core batching parameters
Real(1.0, 50.0, name='batch_timeout_ms',
prior='log-uniform'), # Log scale since small changes matter
Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB
prior='log-uniform'),
# Flow control parameters - likely the most impactful
Integer(1, 50, name='max_in_flight_requests'),
]
def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict:
"""Run simulation with given parameters and return results"""
try:
sim = PersistenceSimulation(
batch_timeout_ms=params['batch_timeout_ms'],
batch_size_threshold=int(params['batch_size_threshold']),
max_in_flight_requests=int(params['max_in_flight_requests']),
# Retry parameters fixed since S3 is 100% reliable
max_retry_attempts=0, # No retries needed
retry_base_delay_ms=100.0, # Irrelevant but needs a value
# S3 parameters kept fixed - 100% reliable for optimization focus
s3_latency_shape=2.0, # Fixed Gamma shape
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
s3_failure_rate=0.0, # 100% reliable S3
arrival_rate_per_sec=self.arrival_rate,
simulation_duration_sec=self.simulation_duration
)
return sim.run_simulation()
except Exception as e:
print(f"Simulation failed with params {params}: {e}")
# Return a high penalty score for failed simulations
return {
'commit_metrics': {
'latency_ms': {
'mean': 10000,
'p95': 10000,
'p99': 10000
}
},
'error': str(e)
}
def _extract_objective_value(self, results: Dict) -> float:
"""Extract the objective value to minimize from simulation results"""
try:
commit_metrics = results['commit_metrics']['latency_ms']
if self.objective_metric == "mean_latency":
return commit_metrics['mean']
elif self.objective_metric == "p95_latency":
return commit_metrics['p95']
elif self.objective_metric == "p99_latency":
return commit_metrics['p99']
elif self.objective_metric == "weighted_latency":
# Weighted combination emphasizing tail latencies
return (0.3 * commit_metrics['mean'] +
0.5 * commit_metrics['p95'] +
0.2 * commit_metrics['p99'])
else:
return commit_metrics['p95'] # Default to P95
except KeyError as e:
print(f"Failed to extract objective from results: {e}")
return 10000 # High penalty for invalid results
def optimize_with_bayesian(self) -> Tuple[Dict, float]:
"""
Use Bayesian Optimization to find optimal parameters.
This uses Gaussian Process models to build a probabilistic model
of the objective function and intelligently choose where to sample next.
"""
if not OPTIMIZE_AVAILABLE:
return self.optimize_with_grid_search()
print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations")
print(f"Objective: Minimize {self.objective_metric}")
print(f"Parameter space: {len(self.parameter_space)} dimensions")
print()
@use_named_args(self.parameter_space)
def objective(**params):
"""Objective function for Bayesian optimization"""
print(f"Evaluating: {params}")
start_time = time.time()
results = self._run_simulation_with_params(params)
eval_time = time.time() - start_time
objective_value = self._extract_objective_value(results)
# Track optimization history
history_entry = {
'params': params.copy(),
'objective_value': objective_value,
'results': results,
'eval_time': eval_time,
'iteration': len(self.optimization_history) + 1
}
self.optimization_history.append(history_entry)
# Update best if improved
if objective_value < self.best_score:
self.best_score = objective_value
self.best_params = params.copy()
print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})")
else:
print(f" Score: {objective_value:.2f}ms")
print(f" Time: {eval_time:.1f}s")
print()
return objective_value
# Run Bayesian optimization
result = gp_minimize(
func=objective,
dimensions=self.parameter_space,
n_calls=self.optimization_budget,
n_initial_points=10, # Random exploration first
acq_func='EI', # Expected Improvement acquisition
random_state=self.random_seed
)
# Extract best parameters
best_params_list = result.x
best_params_dict = dict(zip(self.parameter_names, best_params_list))
best_objective = result.fun
return best_params_dict, best_objective
def optimize_with_grid_search(self) -> Tuple[Dict, float]:
"""Fallback grid search optimization if scikit-optimize not available"""
print("Using grid search optimization (install scikit-optimize for better results)")
print()
# Define a smaller grid for key parameters
grid_configs = [
# Vary max_in_flight and batch_timeout
{'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0},
{'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0},
{'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0},
{'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0},
]
best_params = None
best_score = float('inf')
for i, config in enumerate(grid_configs):
print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}")
# Use default values for unspecified parameters
full_params = {
'batch_timeout_ms': 5.0,
'batch_size_threshold': 1024 * 1024,
'max_in_flight_requests': 5
}
full_params.update(config)
results = self._run_simulation_with_params(full_params)
objective_value = self._extract_objective_value(results)
if objective_value < best_score:
best_score = objective_value
best_params = full_params.copy()
print(f"✓ NEW BEST: {objective_value:.2f}ms")
else:
print(f" Score: {objective_value:.2f}ms")
print()
return best_params, best_score
def analyze_parameter_importance(self):
"""Analyze which parameters have the most impact on performance"""
if not self.optimization_history:
print("No optimization history available")
return
print("Parameter Importance Analysis")
print("=" * 50)
# Extract parameter values and objectives
param_data = {}
objectives = []
for entry in self.optimization_history:
objectives.append(entry['objective_value'])
for param_name, param_value in entry['params'].items():
if param_name not in param_data:
param_data[param_name] = []
param_data[param_name].append(param_value)
objectives = np.array(objectives)
# Simple correlation analysis
print("Parameter correlations with objective (lower is better):")
correlations = []
for param_name, values in param_data.items():
correlation = np.corrcoef(values, objectives)[0, 1]
correlations.append((param_name, correlation))
print(f" {param_name:<25}: {correlation:+.3f}")
print("\nMost impactful parameters (by absolute correlation):")
correlations.sort(key=lambda x: abs(x[1]), reverse=True)
for param_name, correlation in correlations[:5]:
impact = "reduces latency" if correlation < 0 else "increases latency"
print(f" {param_name:<25}: {impact} (r={correlation:+.3f})")
def plot_optimization_progress(self, save_path: Optional[str] = None):
"""Plot optimization convergence"""
if not OPTIMIZE_AVAILABLE or not self.optimization_history:
return
iterations = [entry['iteration'] for entry in self.optimization_history]
objectives = [entry['objective_value'] for entry in self.optimization_history]
# Calculate running minimum (best so far)
running_min = []
current_min = float('inf')
for obj in objectives:
current_min = min(current_min, obj)
running_min.append(current_min)
plt.figure(figsize=(12, 8))
# Plot 1: Objective value over iterations
plt.subplot(2, 2, 1)
plt.scatter(iterations, objectives, alpha=0.6, s=30)
plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far')
plt.xlabel('Iteration')
plt.ylabel(f'{self.objective_metric} (ms)')
plt.title('Optimization Progress')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 2: Parameter evolution for key parameters
plt.subplot(2, 2, 2)
key_params = ['max_in_flight_requests', 'batch_timeout_ms']
for param in key_params:
if param in self.optimization_history[0]['params']:
values = [entry['params'][param] for entry in self.optimization_history]
plt.scatter(iterations, values, alpha=0.6, label=param, s=30)
plt.xlabel('Iteration')
plt.ylabel('Parameter Value')
plt.title('Key Parameter Evolution')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 3: Objective distribution
plt.subplot(2, 2, 3)
plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black')
plt.axvline(self.best_score, color='red', linestyle='--',
label=f'Best: {self.best_score:.1f}ms')
plt.xlabel(f'{self.objective_metric} (ms)')
plt.ylabel('Count')
plt.title('Objective Value Distribution')
plt.legend()
plt.grid(True, alpha=0.3)
# Plot 4: Convergence rate
plt.subplot(2, 2, 4)
improvements = []
for i, entry in enumerate(self.optimization_history):
if i == 0:
improvements.append(0)
else:
prev_best = running_min[i-1]
curr_best = running_min[i]
improvement = prev_best - curr_best
improvements.append(improvement)
plt.plot(iterations, improvements, 'g-', marker='o', markersize=3)
plt.xlabel('Iteration')
plt.ylabel('Improvement (ms)')
plt.title('Per-Iteration Improvement')
plt.grid(True, alpha=0.3)
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Optimization plots saved to {save_path}")
else:
plt.show()
def run_optimization(self) -> Dict:
"""Run the full optimization process and return results"""
start_time = time.time()
# Run optimization
if OPTIMIZE_AVAILABLE:
best_params, best_score = self.optimize_with_bayesian()
else:
best_params, best_score = self.optimize_with_grid_search()
total_time = time.time() - start_time
# Run final simulation with best parameters for detailed results
print("Running final simulation with optimal parameters...")
final_results = self._run_simulation_with_params(best_params)
# Prepare optimization summary
optimization_summary = {
'best_parameters': best_params,
'best_objective_value': best_score,
'optimization_time': total_time,
'evaluations_performed': len(self.optimization_history),
'final_simulation_results': final_results,
'optimization_history': self.optimization_history
}
return optimization_summary
def print_optimization_summary(self, summary: Dict):
"""Print a comprehensive summary of optimization results"""
print("=" * 80)
print("BAYESIAN OPTIMIZATION RESULTS")
print("=" * 80)
print(f"Optimization completed in {summary['optimization_time']:.1f} seconds")
print(f"Performed {summary['evaluations_performed']} parameter evaluations")
print(f"Best {self.objective_metric}: {summary['best_objective_value']:.2f}ms")
print()
print("OPTIMAL PARAMETERS:")
print("-" * 40)
for param, value in summary['best_parameters'].items():
if isinstance(value, float):
if param.endswith('_rate'):
print(f" {param:<25}: {value:.4f}")
else:
print(f" {param:<25}: {value:.2f}")
else:
print(f" {param:<25}: {value}")
print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:")
print("-" * 50)
final_results = summary['final_simulation_results']
print_results(final_results)
print("\nPARAMETER IMPACT ANALYSIS:")
print("-" * 30)
self.analyze_parameter_importance()
def main():
"""Main optimization workflow"""
print("Persistence Thread Parameter Optimization")
print("Using Bayesian Optimization for intelligent parameter search")
print()
# Create optimizer with different objective functions to test
objectives_to_test = ["p95_latency", "weighted_latency"]
for objective in objectives_to_test:
print(f"\n{'='*80}")
print(f"OPTIMIZING FOR: {objective.upper()}")
print(f"{'='*80}")
optimizer = PersistenceOptimizer(
optimization_budget=30, # Reasonable for demo
simulation_duration=15.0, # Shorter sims for faster optimization
arrival_rate=1000.0,
objective_metric=objective,
random_seed=42
)
# Run optimization
summary = optimizer.run_optimization()
optimizer.print_optimization_summary(summary)
# Generate plots
try:
optimizer.plot_optimization_progress(f'optimization_{objective}.png')
except Exception as e:
print(f"Could not generate plots: {e}")
print(f"\nOptimization for {objective} completed!")
print("="*80)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,748 @@
#!/usr/bin/env python3
"""
Persistence Thread Simulation
Simulates the persistence thread design from persistence.md to analyze
commit latency distributions with Poisson arrival times and realistic
S3 response characteristics.
Key metrics tracked:
- End-to-end commit latency (arrival to acknowledgment)
- Batch processing latencies
- Queue depths and flow control behavior
- Retry patterns and failure handling
"""
import heapq
import random
import statistics
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import time
@dataclass
class Commit:
"""Represents a single commit request"""
commit_id: int
arrival_time: float
size_bytes: int = 1024 # Default 1KB per commit
@dataclass
class Batch:
"""Represents a batch of commits being processed"""
batch_id: int
commits: List[Commit]
created_time: float
size_bytes: int = field(init=False)
retry_count: int = 0
def __post_init__(self):
self.size_bytes = sum(c.size_bytes for c in self.commits)
@dataclass
class InFlightRequest:
"""Tracks an in-flight S3 request"""
batch: Batch
start_time: float
expected_completion: float
connection_id: int
class PersistenceSimulation:
"""
Simulates the persistence thread behavior described in persistence.md
For S3 latency, we use a Gamma distribution which is recommended for
modeling network service response times because:
- It has a natural lower bound (minimum network RTT)
- It can model the right-skewed tail typical of network services
- It captures both typical fast responses and occasional slow responses
- Shape parameter controls the heaviness of the tail
"""
def __init__(self,
# Configuration from persistence.md defaults
batch_timeout_ms: float = 5.0,
batch_size_threshold: int = 1024 * 1024, # 1MB
max_in_flight_requests: int = 5,
max_retry_attempts: int = 3,
retry_base_delay_ms: float = 100.0,
# S3 latency modeling (Gamma distribution parameters)
s3_latency_shape: float = 2.0, # Shape parameter (α)
s3_latency_scale: float = 25.0, # Scale parameter (β)
s3_failure_rate: float = 0.01, # 1% failure rate
# Arrival rate modeling
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
# Simulation parameters
simulation_duration_sec: float = 60.0):
# Configuration
self.batch_timeout_ms = batch_timeout_ms
self.batch_size_threshold = batch_size_threshold
self.max_in_flight_requests = max_in_flight_requests
self.max_retry_attempts = max_retry_attempts
self.retry_base_delay_ms = retry_base_delay_ms
# S3 modeling parameters
self.s3_latency_shape = s3_latency_shape
self.s3_latency_scale = s3_latency_scale
self.s3_failure_rate = s3_failure_rate
# Arrival modeling
self.arrival_rate_per_sec = arrival_rate_per_sec
self.simulation_duration_sec = simulation_duration_sec
# Simulation state
self.current_time = 0.0
self.event_queue = [] # Priority queue of (time, event_type, event_data)
self.pending_commits = deque()
self.current_batch = []
self.batch_start_time = None # Will be set when first commit added to batch
self.in_flight_requests: Dict[int, InFlightRequest] = {}
self.next_batch_id = 0
self.next_connection_id = 0
self.next_commit_id = 0
# Metrics collection
self.completed_commits = []
self.batch_metrics = []
self.retry_counts = defaultdict(int)
self.queue_depth_samples = []
self.timeline_events = [] # For debugging/visualization
# Random number generators
self.rng = random.Random(42) # Reproducible results
self.np_rng = np.random.RandomState(42)
def sample_s3_latency(self, batch_size_bytes: int = 0) -> float:
"""
Sample S3 response latency using Gamma distribution with size-dependent scaling.
Gamma distribution is ideal for S3 latency because:
- Shape=2.0, Scale=15.0 gives variable latency around the mean
- Minimum 30ms RTT prevents unrealistic sub-network responses
- Right-skewed tail captures occasional slow responses
- Models the reality that most responses are fast but some are slow
Size-dependent scaling:
- Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean
- Linear scaling with size: +20ms per MB
- Large requests (1MB) average ~80ms with similar tail behavior
"""
# Minimum network RTT (realistic for cloud storage)
min_rtt = 30.0 # 30ms minimum round-trip time
# Variable latency component from Gamma distribution
variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale)
# Size-dependent scaling: +20ms per MB
size_mb = batch_size_bytes / (1024 * 1024)
size_penalty = size_mb * 20.0 # 20ms per MB
return min_rtt + variable_latency + size_penalty
def sample_inter_arrival_time(self) -> float:
"""Sample time between commit arrivals using Poisson process"""
return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec)
def sample_commit_size(self) -> int:
"""
Sample commit size with realistic distribution including large commits.
Distribution:
- 70% small commits: 500B - 10KB (typical operations)
- 25% medium commits: 10KB - 100KB (batch operations, large documents)
- 5% large commits: 100KB - 1MB (bulk data, file uploads)
This creates a realistic mix where most commits are small but some
can trigger size-based batching or become single-commit batches.
"""
rand = self.rng.random()
if rand < 0.70: # 70% small commits
return self.rng.randint(500, 10 * 1024) # 500B - 10KB
elif rand < 0.95: # 25% medium commits
return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB
else: # 5% large commits
return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB
def schedule_event(self, time: float, event_type: str, data=None):
"""Add event to priority queue"""
heapq.heappush(self.event_queue, (time, event_type, data))
def should_process_batch(self) -> bool:
"""Check if current batch should be processed based on triggers"""
if not self.current_batch:
return False
# Size trigger
batch_size = sum(c.size_bytes for c in self.current_batch)
if batch_size >= self.batch_size_threshold:
return True
# Time trigger - only if we have a valid batch start time
if self.batch_start_time is not None:
if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
return True
return False
def can_start_new_request(self) -> bool:
"""Check if we can start a new request based on flow control"""
return len(self.in_flight_requests) < self.max_in_flight_requests
def process_current_batch(self):
"""Process the current batch if conditions are met"""
if not self.current_batch or not self.can_start_new_request():
return
if self.should_process_batch():
batch = Batch(
batch_id=self.next_batch_id,
commits=self.current_batch.copy(),
created_time=self.current_time
)
self.next_batch_id += 1
# Clear current batch
self.current_batch.clear()
self.batch_start_time = None # Reset to None, will be set on next batch
self.send_batch_to_s3(batch)
def send_batch_to_s3(self, batch: Batch, is_retry: bool = False):
"""Send batch to S3 and track as in-flight request"""
if not self.can_start_new_request():
# This shouldn't happen due to flow control, but handle gracefully
self.schedule_event(self.current_time + 0.001, 'retry_batch', batch)
return
# Sample S3 response characteristics (pass batch size for latency modeling)
s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds
will_fail = self.rng.random() < self.s3_failure_rate
if will_fail:
s3_latency *= 2 # Failed requests typically take longer
completion_time = self.current_time + s3_latency
connection_id = self.next_connection_id
self.next_connection_id += 1
# Track in-flight request
in_flight = InFlightRequest(
batch=batch,
start_time=self.current_time,
expected_completion=completion_time,
connection_id=connection_id
)
self.in_flight_requests[connection_id] = in_flight
# Schedule completion event
if will_fail:
self.schedule_event(completion_time, 'batch_failed', connection_id)
else:
self.schedule_event(completion_time, 'batch_completed', connection_id)
# Log event for analysis
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_sent',
'batch_id': batch.batch_id,
'batch_size': batch.size_bytes,
'commit_count': len(batch.commits),
'retry_count': batch.retry_count,
'is_retry': is_retry
})
def handle_batch_completed(self, connection_id: int):
"""Handle successful batch completion"""
if connection_id not in self.in_flight_requests:
return
in_flight = self.in_flight_requests.pop(connection_id)
batch = in_flight.batch
# Calculate metrics
batch_latency = self.current_time - batch.created_time
self.batch_metrics.append({
'batch_id': batch.batch_id,
'latency': batch_latency,
'size_bytes': batch.size_bytes,
'commit_count': len(batch.commits),
'retry_count': batch.retry_count
})
# Mark commits as completed and calculate end-to-end latency
for commit in batch.commits:
commit_latency = self.current_time - commit.arrival_time
self.completed_commits.append({
'commit_id': commit.commit_id,
'arrival_time': commit.arrival_time,
'completion_time': self.current_time,
'latency': commit_latency,
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_completed',
'batch_id': batch.batch_id,
'latency': batch_latency,
'retry_count': batch.retry_count
})
# Try to process any pending work now that we have capacity
self.process_pending_work()
def handle_batch_failed(self, connection_id: int):
"""Handle batch failure with retry logic"""
if connection_id not in self.in_flight_requests:
return
in_flight = self.in_flight_requests.pop(connection_id)
batch = in_flight.batch
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_failed',
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
if batch.retry_count < self.max_retry_attempts:
# Exponential backoff retry
batch.retry_count += 1
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1))
retry_time = self.current_time + backoff_delay
self.schedule_event(retry_time, 'retry_batch', batch)
self.retry_counts[batch.retry_count] += 1
else:
# Max retries exhausted - this would be a fatal error in real system
self.timeline_events.append({
'time': self.current_time,
'event': 'batch_abandoned',
'batch_id': batch.batch_id,
'retry_count': batch.retry_count
})
def handle_retry_batch(self, batch: Batch):
"""Handle batch retry"""
self.send_batch_to_s3(batch, is_retry=True)
def process_pending_work(self):
"""Process any pending commits that can now be batched"""
# Move pending commits to current batch
while self.pending_commits and self.can_start_new_request():
if not self.current_batch:
self.batch_start_time = self.current_time
commit = self.pending_commits.popleft()
self.current_batch.append(commit)
# Check if we should process this batch immediately
if self.should_process_batch():
self.process_current_batch()
break
# If we have commits but no in-flight capacity, they stay in current_batch
# and will be processed when capacity becomes available
def handle_commit_arrival(self, commit: Commit):
"""Handle new commit arrival"""
# If we have in-flight capacity, try to add to current batch
if self.can_start_new_request():
if not self.current_batch:
self.batch_start_time = self.current_time
self.current_batch.append(commit)
self.process_current_batch() # Check if we should process now
else:
# Add to pending queue due to flow control
self.pending_commits.append(commit)
# Schedule timeout event for current batch if it's the first commit
if len(self.current_batch) == 1 and not self.pending_commits:
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
self.schedule_event(timeout_time, 'batch_timeout', None)
def handle_batch_timeout(self):
"""Handle batch timeout trigger"""
if self.current_batch and self.can_start_new_request():
self.process_current_batch()
def sample_queue_depth(self):
"""Sample current queue depths for analysis"""
pending_count = len(self.pending_commits)
current_batch_count = len(self.current_batch)
in_flight_count = len(self.in_flight_requests)
self.queue_depth_samples.append({
'time': self.current_time,
'pending_commits': pending_count,
'current_batch_size': current_batch_count,
'in_flight_requests': in_flight_count,
'total_unacknowledged': pending_count + current_batch_count +
sum(len(req.batch.commits) for req in self.in_flight_requests.values())
})
def generate_arrivals(self):
"""Generate Poisson arrival events for the simulation"""
current_time = 0.0
while current_time < self.simulation_duration_sec:
# Sample next arrival time
inter_arrival = self.sample_inter_arrival_time()
current_time += inter_arrival
if current_time >= self.simulation_duration_sec:
break
# Create commit
commit = Commit(
commit_id=self.next_commit_id,
arrival_time=current_time,
size_bytes=self.sample_commit_size() # Realistic size distribution
)
self.next_commit_id += 1
# Schedule arrival event
self.schedule_event(current_time, 'commit_arrival', commit)
# Schedule periodic queue depth sampling
sample_time = 0.0
while sample_time < self.simulation_duration_sec:
self.schedule_event(sample_time, 'sample_queue_depth', None)
sample_time += 0.1 # Sample every 100ms
def run_simulation(self):
"""Run the complete simulation"""
print(f"Starting persistence simulation...")
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
print(f"Duration: {self.simulation_duration_sec} seconds")
print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}")
print()
# Generate all arrival events
self.generate_arrivals()
# Process events in time order
events_processed = 0
while self.event_queue and events_processed < 1000000: # Safety limit
time, event_type, data = heapq.heappop(self.event_queue)
self.current_time = time
if time > self.simulation_duration_sec:
break
if event_type == 'commit_arrival':
self.handle_commit_arrival(data)
elif event_type == 'batch_completed':
self.handle_batch_completed(data)
elif event_type == 'batch_failed':
self.handle_batch_failed(data)
elif event_type == 'retry_batch':
self.handle_retry_batch(data)
elif event_type == 'batch_timeout':
self.handle_batch_timeout()
elif event_type == 'sample_queue_depth':
self.sample_queue_depth()
events_processed += 1
print(f"Simulation completed. Processed {events_processed} events.")
return self.analyze_results()
def analyze_results(self) -> Dict:
"""Analyze simulation results and return metrics"""
if not self.completed_commits:
return {"error": "No commits completed during simulation"}
# Calculate latency statistics
latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms
results = {
'simulation_config': {
'duration_sec': self.simulation_duration_sec,
'arrival_rate_per_sec': self.arrival_rate_per_sec,
'batch_timeout_ms': self.batch_timeout_ms,
'batch_size_threshold': self.batch_size_threshold,
'max_in_flight_requests': self.max_in_flight_requests,
's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
's3_failure_rate': self.s3_failure_rate
},
'commit_metrics': {
'total_commits': len(self.completed_commits),
'latency_ms': {
'mean': statistics.mean(latencies),
'median': statistics.median(latencies),
'std': statistics.stdev(latencies) if len(latencies) > 1 else 0,
'min': min(latencies),
'max': max(latencies),
'p95': np.percentile(latencies, 95),
'p99': np.percentile(latencies, 99)
}
},
'batch_metrics': {
'total_batches': len(self.batch_metrics),
'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]),
'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]),
'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics])
},
'retry_analysis': dict(self.retry_counts),
'queue_depth_analysis': self._analyze_queue_depths()
}
return results
def _analyze_queue_depths(self) -> Dict:
"""Analyze queue depth patterns"""
if not self.queue_depth_samples:
return {}
pending = [s['pending_commits'] for s in self.queue_depth_samples]
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
return {
'pending_commits': {
'mean': statistics.mean(pending),
'max': max(pending),
'p95': np.percentile(pending, 95)
},
'in_flight_requests': {
'mean': statistics.mean(in_flight),
'max': max(in_flight),
'p95': np.percentile(in_flight, 95)
},
'total_unacknowledged': {
'mean': statistics.mean(total_unack),
'max': max(total_unack),
'p95': np.percentile(total_unack, 95)
}
}
def plot_results(self, results: Dict, save_path: Optional[str] = None):
"""Generate visualization plots of simulation results"""
if not self.completed_commits:
print("No data to plot")
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle('Persistence Thread Simulation Results', fontsize=16)
# Plot 1: Commit latency histogram
latencies_ms = [c['latency'] * 1000 for c in self.completed_commits]
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black')
ax1.set_xlabel('Commit Latency (ms)')
ax1.set_ylabel('Count')
ax1.set_title('Commit Latency Distribution')
ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red',
linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms")
ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange',
linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms")
ax1.legend()
# Plot 2: Timeline of commit completions
completion_times = [c['completion_time'] for c in self.completed_commits]
completion_latencies = [c['latency'] * 1000 for c in self.completed_commits]
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
ax2.set_xlabel('Time (seconds)')
ax2.set_ylabel('Commit Latency (ms)')
ax2.set_title('Latency Over Time')
# Plot 3: Queue depth over time
if self.queue_depth_samples:
times = [s['time'] for s in self.queue_depth_samples]
pending = [s['pending_commits'] for s in self.queue_depth_samples]
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
ax3.plot(times, pending, label='Pending Commits', alpha=0.8)
ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8)
ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8)
ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--',
label=f'Max In-Flight Limit ({self.max_in_flight_requests})')
ax3.set_xlabel('Time (seconds)')
ax3.set_ylabel('Count')
ax3.set_title('Queue Depths Over Time')
ax3.legend()
# Plot 4: Batch size distribution
if self.batch_metrics:
batch_sizes = [b['commit_count'] for b in self.batch_metrics]
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black')
ax4.set_xlabel('Commits per Batch')
ax4.set_ylabel('Count')
ax4.set_title('Batch Size Distribution')
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches='tight')
print(f"Plots saved to {save_path}")
else:
plt.show()
def print_results(results: Dict):
"""Pretty print simulation results"""
print("=" * 80)
print("PERSISTENCE THREAD SIMULATION RESULTS")
print("=" * 80)
# Configuration
config = results['simulation_config']
print(f"\nConfiguration:")
print(f" Duration: {config['duration_sec']}s")
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
print(f" Batch Timeout: {config['batch_timeout_ms']}ms")
print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes")
print(f" Max In-Flight: {config['max_in_flight_requests']}")
print(f" S3 Latency: {config['s3_latency_params']}")
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
# Commit metrics
commit_metrics = results['commit_metrics']
latency = commit_metrics['latency_ms']
print(f"\nCommit Performance:")
print(f" Total Commits: {commit_metrics['total_commits']:,}")
print(f" Latency Mean: {latency['mean']:.2f}ms")
print(f" Latency Median: {latency['median']:.2f}ms")
print(f" Latency P95: {latency['p95']:.2f}ms")
print(f" Latency P99: {latency['p99']:.2f}ms")
print(f" Latency Std: {latency['std']:.2f}ms")
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
# Batch metrics
batch_metrics = results['batch_metrics']
print(f"\nBatching Performance:")
print(f" Total Batches: {batch_metrics['total_batches']:,}")
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
# Retry analysis
if results['retry_analysis']:
print(f"\nRetry Analysis:")
for retry_count, occurrences in results['retry_analysis'].items():
print(f" {occurrences:,} batches required {retry_count} retries")
# Queue depth analysis
if results['queue_depth_analysis']:
queue_analysis = results['queue_depth_analysis']
print(f"\nQueue Depth Analysis:")
if 'pending_commits' in queue_analysis:
pending = queue_analysis['pending_commits']
print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}")
if 'in_flight_requests' in queue_analysis:
in_flight = queue_analysis['in_flight_requests']
print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}")
if 'total_unacknowledged' in queue_analysis:
total = queue_analysis['total_unacknowledged']
print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}")
if __name__ == "__main__":
print("Running Persistence Thread Configuration Analysis")
print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)")
print("Testing different configurations to optimize latency...")
print()
# Test configurations with different max_in_flight values
configs = [
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
{"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
{"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
]
results_comparison = []
for config in configs:
print(f"\n{'='*60}")
print(f"Testing: {config['name']}")
print(f"{'='*60}")
sim = PersistenceSimulation(
arrival_rate_per_sec=1000.0,
simulation_duration_sec=30.0,
s3_latency_shape=2.0,
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=config.get("max_in_flight_requests", 5),
batch_timeout_ms=config.get("batch_timeout_ms", 5.0)
)
results = sim.run_simulation()
results["config_name"] = config["name"]
results_comparison.append(results)
# Print key metrics for quick comparison
commit_metrics = results['commit_metrics']
batch_metrics = results['batch_metrics']
queue_metrics = results.get('queue_depth_analysis', {})
print(f"\nKey Metrics:")
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms")
print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms")
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
if queue_metrics:
print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}")
print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}")
# Summary comparison
print(f"\n{'='*80}")
print("CONFIGURATION COMPARISON SUMMARY")
print(f"{'='*80}")
print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}")
print(f"{'-'*80}")
for result in results_comparison:
name = result["config_name"]
commit_metrics = result['commit_metrics']
queue_metrics = result.get('queue_depth_analysis', {})
mean_lat = commit_metrics['latency_ms']['mean']
p95_lat = commit_metrics['latency_ms']['p95']
p99_lat = commit_metrics['latency_ms']['p99']
avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0)
print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}")
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
print(f"Note: Higher in-flight allows more parallelism but may increase queue variability")
# Generate plots for best configuration
best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95'])
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
try:
# Re-run best config to get simulation object for plotting
best_params = next(c for c in configs if c['name'] == best_config['config_name'])
sim_best = PersistenceSimulation(
arrival_rate_per_sec=1000.0,
simulation_duration_sec=30.0,
s3_latency_shape=2.0,
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0)
)
sim_best.run_simulation()
sim_best.plot_results(best_config, f'persistence_optimization_results.png')
except Exception as e:
print(f"\nCould not generate plots: {e}")
print("Install matplotlib and numpy to enable visualization")

111
persistence.md Normal file
View File

@@ -0,0 +1,111 @@
# Persistence Thread Design
## Overview
The persistence thread receives commit batches from the main processing pipeline and uploads them to S3. It uses a single-threaded design with connection pooling and batching for optimal performance.
## Architecture
**Input**: Commits arrive via `ThreadPipeline` interface from upstream processing
**Output**: Batched commits uploaded to S3 persistence backend
**Transport**: Single-threaded TCP client with connection pooling
**Protocol**: Higher layers handle HTTP, authentication, and S3-specific details
## Batching Strategy
The persistence thread collects commits into batches using two trigger conditions:
1. **Time Trigger**: `batch_timeout_ms` elapsed since batch collection started
2. **Size Trigger**: `batch_size_threshold` commits collected (can be exceeded by final commit)
**Flow Control**: When `max_in_flight_requests` reached, block until responses received.
## Main Processing Loop
### 1. Batch Collection
**No In-Flight Requests**:
- Use blocking acquire to get first commit batch
- Process immediately (no batching delay)
**With In-Flight Requests**:
- Check flow control: if at `max_in_flight_requests`, block for responses
- Collect commits using non-blocking acquire until trigger condition:
- Check for available commits (non-blocking)
- If `batch_size_threshold` reached → process batch immediately
- If below threshold → use `epoll_wait(batch_timeout_ms)` for I/O and timeout
- On timeout → process collected commits
- If no commits available and no in-flight requests → switch to blocking acquire
### 2. Connection Management
- Acquire healthy connection from pool
- Create new connections if pool below `target_pool_size`
- If no healthy connections available, block until one becomes available
- Maintain automatic pool replenishment
### 3. Data Transmission
- Write batch data to S3 connection using appropriate protocol
- Publish accepted transactions to subscriber system
- Track request as in-flight for flow control
### 4. I/O Event Processing
- Handle epoll events for all in-flight connections
- Monitor connection health via heartbeats
- Process incoming responses and detect connection failures
### 5. Response Handling
- **Ordered Acknowledgment**: Only acknowledge batch after all prior batches are durable
- Release batch via `StageGuard` destructor (publishes to next pipeline stage)
- Publish durability events to subscriber system
- Return healthy connection to pool
### 6. Failure Handling
- Remove failed connection from pool
- Retry batch with exponential backoff (up to `max_retry_attempts`)
- Backoff delays only affect the specific failing batch
- If retries exhausted, abort process or escalate error
- Initiate pool replenishment if below target
## Connection Pool
**Target Size**: `target_pool_size` connections (recommended: 2x `max_in_flight_requests`)
**Replenishment**: Automatic creation when below target
**Health Monitoring**: Heartbeat-based connection validation
**Sizing Rationale**: 2x multiplier ensures availability during peak load and connection replacement
## Key Design Properties
**Batch Ordering**: Batches may be retried out-of-order for performance, but acknowledgment to next pipeline stage maintains strict ordering.
**Backpressure**: Retry delays for failing batches create natural backpressure that eventually blocks the persistence thread when in-flight limits are reached.
**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating.
## Configuration Parameters
| Parameter | Default | Description |
|-----------|---------|-------------|
| `batch_timeout_ms` | 1ms | Maximum time to wait collecting commits for batching |
| `batch_size_threshold` | 1MB | Threshold for triggering batch processing |
| `max_in_flight_requests` | 50 | Maximum concurrent requests to persistence backend |
| `target_pool_size` | 2x in-flight | Target number of connections to maintain |
| `max_retry_attempts` | 3 | Maximum retries for failed batches before aborting |
| `retry_base_delay_ms` | 100ms | Base delay for exponential backoff retries |
## Configuration Validation
**Required Constraints**:
- `batch_size_threshold` > 0 (must process at least one commit per batch)
- `max_in_flight_requests` > 0 (must allow at least one concurrent request)
- `target_pool_size` >= `max_in_flight_requests` (pool must accommodate all in-flight requests)
- `batch_timeout_ms` > 0 (timeout must be positive)
- `max_retry_attempts` >= 0 (zero disables retries)
- `retry_base_delay_ms` > 0 (delay must be positive if retries enabled)
**Performance Recommendations**:
- `target_pool_size` <= 2x `max_in_flight_requests` (optimal for performance)