Compare commits
5 Commits
ee721c7753
...
1a4e8d5761
| Author | SHA1 | Date | |
|---|---|---|---|
| 1a4e8d5761 | |||
| 506bbbb528 | |||
| da69a99cf4 | |||
| 333148bb5a | |||
| f54d1e0dc1 |
970
latency_sim/database_comparison.py
Normal file
970
latency_sim/database_comparison.py
Normal file
@@ -0,0 +1,970 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Database Persistence Pattern Comparison
|
||||||
|
|
||||||
|
Compares WeaselDB's batched S3 persistence approach against traditional
|
||||||
|
database persistence patterns to understand trade-offs in latency,
|
||||||
|
throughput, consistency, and operational complexity.
|
||||||
|
|
||||||
|
Simulated approaches:
|
||||||
|
1. WeaselDB: Batched async S3 persistence with optimistic concurrency
|
||||||
|
2. Traditional WAL: Write-ahead log with periodic sync to disk
|
||||||
|
3. Synchronous: Immediate disk sync per transaction (like PostgreSQL serializable)
|
||||||
|
4. Group Commit: Batched disk writes with configurable group size
|
||||||
|
5. Async Replication: Immediate response with async durability
|
||||||
|
"""
|
||||||
|
|
||||||
|
import heapq
|
||||||
|
import random
|
||||||
|
import statistics
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List, Optional, Dict, Tuple, Any
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
from enum import Enum
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class PersistencePattern(Enum):
|
||||||
|
WEASELDB_S3 = "WeaselDB S3 Batched"
|
||||||
|
TRADITIONAL_WAL = "Traditional WAL"
|
||||||
|
SYNCHRONOUS = "Synchronous Disk"
|
||||||
|
GROUP_COMMIT = "Group Commit"
|
||||||
|
ASYNC_REPLICATION = "Async Replication"
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Transaction:
|
||||||
|
"""Represents a database transaction/commit"""
|
||||||
|
txn_id: int
|
||||||
|
arrival_time: float
|
||||||
|
size_bytes: int = 1024
|
||||||
|
requires_durability: bool = True # Some txns can be async
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PersistenceMetrics:
|
||||||
|
"""Metrics for a persistence approach"""
|
||||||
|
pattern: PersistencePattern
|
||||||
|
total_transactions: int
|
||||||
|
completed_transactions: int
|
||||||
|
|
||||||
|
# Latency metrics (milliseconds)
|
||||||
|
min_latency: float
|
||||||
|
mean_latency: float
|
||||||
|
median_latency: float
|
||||||
|
p95_latency: float
|
||||||
|
p99_latency: float
|
||||||
|
max_latency: float
|
||||||
|
|
||||||
|
# Throughput metrics
|
||||||
|
avg_throughput_tps: float
|
||||||
|
peak_throughput_tps: float
|
||||||
|
|
||||||
|
# Resource metrics
|
||||||
|
avg_disk_iops: float
|
||||||
|
avg_network_mbps: float
|
||||||
|
storage_efficiency: float # GB stored / GB logical data
|
||||||
|
|
||||||
|
# Consistency metrics
|
||||||
|
durability_guarantee: str
|
||||||
|
consistency_model: str
|
||||||
|
recovery_time_estimate: float
|
||||||
|
|
||||||
|
# Operational metrics
|
||||||
|
operational_complexity: int # 1-10 scale
|
||||||
|
infrastructure_cost: float # Relative cost per transaction
|
||||||
|
|
||||||
|
|
||||||
|
class PersistenceSimulator(ABC):
|
||||||
|
"""Abstract base class for persistence pattern simulators"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
simulation_duration: float = 60.0,
|
||||||
|
arrival_rate_per_sec: float = 1000.0):
|
||||||
|
self.simulation_duration = simulation_duration
|
||||||
|
self.arrival_rate_per_sec = arrival_rate_per_sec
|
||||||
|
|
||||||
|
# Simulation state
|
||||||
|
self.current_time = 0.0
|
||||||
|
self.event_queue = []
|
||||||
|
self.completed_transactions = []
|
||||||
|
self.next_txn_id = 0
|
||||||
|
|
||||||
|
# Metrics tracking
|
||||||
|
self.disk_iops = []
|
||||||
|
self.network_usage = []
|
||||||
|
self.throughput_samples = []
|
||||||
|
|
||||||
|
# Random number generator
|
||||||
|
self.rng = np.random.RandomState(42)
|
||||||
|
|
||||||
|
def generate_transaction(self, arrival_time: float) -> Transaction:
|
||||||
|
"""Generate a transaction with realistic characteristics"""
|
||||||
|
# Size distribution: mostly small, some large
|
||||||
|
if self.rng.random() < 0.8: # 80% small transactions
|
||||||
|
size = self.rng.randint(100, 2048) # 100B - 2KB
|
||||||
|
elif self.rng.random() < 0.95: # 15% medium
|
||||||
|
size = self.rng.randint(2048, 20480) # 2KB - 20KB
|
||||||
|
else: # 5% large transactions
|
||||||
|
size = self.rng.randint(20480, 102400) # 20KB - 100KB
|
||||||
|
|
||||||
|
return Transaction(
|
||||||
|
txn_id=self.next_txn_id,
|
||||||
|
arrival_time=arrival_time,
|
||||||
|
size_bytes=size,
|
||||||
|
requires_durability=True
|
||||||
|
)
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def process_transaction(self, txn: Transaction) -> None:
|
||||||
|
"""Process a transaction according to the persistence pattern"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def get_pattern_name(self) -> PersistencePattern:
|
||||||
|
"""Return the persistence pattern this simulator implements"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def run_simulation(self) -> PersistenceMetrics:
|
||||||
|
"""Run the simulation and return metrics"""
|
||||||
|
# Generate arrival events
|
||||||
|
self._generate_arrivals()
|
||||||
|
|
||||||
|
# Process events
|
||||||
|
while self.event_queue and self.current_time < self.simulation_duration:
|
||||||
|
time, event_type, data = heapq.heappop(self.event_queue)
|
||||||
|
self.current_time = time
|
||||||
|
|
||||||
|
if event_type == 'transaction_arrival':
|
||||||
|
self.process_transaction(data)
|
||||||
|
elif event_type == 'custom':
|
||||||
|
self._handle_custom_event(data)
|
||||||
|
|
||||||
|
return self._calculate_metrics()
|
||||||
|
|
||||||
|
def _generate_arrivals(self):
|
||||||
|
"""Generate Poisson arrival events"""
|
||||||
|
time = 0.0
|
||||||
|
while time < self.simulation_duration:
|
||||||
|
inter_arrival = self.rng.exponential(1.0 / self.arrival_rate_per_sec)
|
||||||
|
time += inter_arrival
|
||||||
|
|
||||||
|
if time >= self.simulation_duration:
|
||||||
|
break
|
||||||
|
|
||||||
|
txn = self.generate_transaction(time)
|
||||||
|
self.next_txn_id += 1
|
||||||
|
|
||||||
|
heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn))
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle custom events - override in subclasses"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
def schedule_event(self, time: float, event_type: str, data: Any):
|
||||||
|
"""Schedule a custom event"""
|
||||||
|
heapq.heappush(self.event_queue, (time, event_type, data))
|
||||||
|
|
||||||
|
def _calculate_metrics(self) -> PersistenceMetrics:
|
||||||
|
"""Calculate performance metrics from completed transactions"""
|
||||||
|
if not self.completed_transactions:
|
||||||
|
raise ValueError("No transactions completed")
|
||||||
|
|
||||||
|
latencies = [txn['latency_ms'] for txn in self.completed_transactions]
|
||||||
|
|
||||||
|
return PersistenceMetrics(
|
||||||
|
pattern=self.get_pattern_name(),
|
||||||
|
total_transactions=self.next_txn_id,
|
||||||
|
completed_transactions=len(self.completed_transactions),
|
||||||
|
|
||||||
|
# Latency metrics
|
||||||
|
min_latency=min(latencies),
|
||||||
|
mean_latency=statistics.mean(latencies),
|
||||||
|
median_latency=statistics.median(latencies),
|
||||||
|
p95_latency=np.percentile(latencies, 95),
|
||||||
|
p99_latency=np.percentile(latencies, 99),
|
||||||
|
max_latency=max(latencies),
|
||||||
|
|
||||||
|
# Throughput metrics
|
||||||
|
avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration,
|
||||||
|
peak_throughput_tps=self._calculate_peak_throughput(),
|
||||||
|
|
||||||
|
# Resource metrics - implemented by subclasses
|
||||||
|
avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0,
|
||||||
|
avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0,
|
||||||
|
storage_efficiency=self._calculate_storage_efficiency(),
|
||||||
|
|
||||||
|
# Pattern-specific characteristics
|
||||||
|
durability_guarantee=self._get_durability_guarantee(),
|
||||||
|
consistency_model=self._get_consistency_model(),
|
||||||
|
recovery_time_estimate=self._get_recovery_time(),
|
||||||
|
operational_complexity=self._get_operational_complexity(),
|
||||||
|
infrastructure_cost=self._get_infrastructure_cost()
|
||||||
|
)
|
||||||
|
|
||||||
|
def _calculate_peak_throughput(self) -> float:
|
||||||
|
"""Calculate peak throughput in 1-second windows"""
|
||||||
|
if not self.completed_transactions:
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
# Group transactions by second
|
||||||
|
throughput_by_second = defaultdict(int)
|
||||||
|
for txn in self.completed_transactions:
|
||||||
|
second = int(txn['completion_time'])
|
||||||
|
throughput_by_second[second] += 1
|
||||||
|
|
||||||
|
return max(throughput_by_second.values()) if throughput_by_second else 0
|
||||||
|
|
||||||
|
# Abstract methods for pattern-specific characteristics
|
||||||
|
@abstractmethod
|
||||||
|
def _calculate_storage_efficiency(self) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_durability_guarantee(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_consistency_model(self) -> str:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_recovery_time(self) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_operational_complexity(self) -> int:
|
||||||
|
pass
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def _get_infrastructure_cost(self) -> float:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class WeaselDBSimulator(PersistenceSimulator):
|
||||||
|
"""WeaselDB's batched S3 persistence simulation"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
batch_timeout_ms: float = 1.0,
|
||||||
|
batch_size_threshold: int = 800000,
|
||||||
|
max_in_flight: int = 50,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.batch_timeout_ms = batch_timeout_ms
|
||||||
|
self.batch_size_threshold = batch_size_threshold
|
||||||
|
self.max_in_flight = max_in_flight
|
||||||
|
|
||||||
|
# State
|
||||||
|
self.current_batch = []
|
||||||
|
self.batch_start_time = None
|
||||||
|
self.in_flight_batches = {}
|
||||||
|
self.next_batch_id = 0
|
||||||
|
|
||||||
|
# S3 characteristics
|
||||||
|
self.s3_base_latency_ms = 60.0 # From our simulation
|
||||||
|
self.s3_size_penalty_per_mb = 20.0
|
||||||
|
|
||||||
|
def get_pattern_name(self) -> PersistencePattern:
|
||||||
|
return PersistencePattern.WEASELDB_S3
|
||||||
|
|
||||||
|
def process_transaction(self, txn: Transaction):
|
||||||
|
"""Process transaction using WeaselDB batching logic"""
|
||||||
|
# Add to current batch
|
||||||
|
if not self.current_batch:
|
||||||
|
self.batch_start_time = self.current_time
|
||||||
|
|
||||||
|
self.current_batch.append(txn)
|
||||||
|
|
||||||
|
# Check if we should send batch
|
||||||
|
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
|
||||||
|
self._send_current_batch()
|
||||||
|
|
||||||
|
def _should_send_batch(self) -> bool:
|
||||||
|
"""Check batch triggers"""
|
||||||
|
if not self.current_batch:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Size trigger
|
||||||
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
||||||
|
if batch_size >= self.batch_size_threshold:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Time trigger
|
||||||
|
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _send_current_batch(self):
|
||||||
|
"""Send current batch to S3"""
|
||||||
|
if not self.current_batch:
|
||||||
|
return
|
||||||
|
|
||||||
|
batch_id = self.next_batch_id
|
||||||
|
self.next_batch_id += 1
|
||||||
|
|
||||||
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
||||||
|
|
||||||
|
# Sample S3 latency
|
||||||
|
s3_latency = self._sample_s3_latency(batch_size) / 1000.0
|
||||||
|
completion_time = self.current_time + s3_latency
|
||||||
|
|
||||||
|
# Track batch
|
||||||
|
self.in_flight_batches[batch_id] = {
|
||||||
|
'transactions': self.current_batch.copy(),
|
||||||
|
'sent_time': self.current_time,
|
||||||
|
'completion_time': completion_time,
|
||||||
|
'size_bytes': batch_size
|
||||||
|
}
|
||||||
|
|
||||||
|
# Schedule completion
|
||||||
|
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
|
||||||
|
|
||||||
|
# Track network usage
|
||||||
|
self.network_usage.append(batch_size / (1024 * 1024)) # MB
|
||||||
|
|
||||||
|
# Clear batch
|
||||||
|
self.current_batch.clear()
|
||||||
|
self.batch_start_time = None
|
||||||
|
|
||||||
|
def _sample_s3_latency(self, batch_size_bytes: int) -> float:
|
||||||
|
"""Sample S3 latency with size scaling"""
|
||||||
|
base = self.s3_base_latency_ms
|
||||||
|
size_penalty = (batch_size_bytes / (1024 * 1024)) * self.s3_size_penalty_per_mb
|
||||||
|
variable = self.rng.gamma(2.0, 15.0) # Variable component
|
||||||
|
return 30.0 + variable + size_penalty # 30ms RTT + variable + size
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle batch completion events"""
|
||||||
|
if data['type'] == 'batch_complete':
|
||||||
|
batch_id = data['batch_id']
|
||||||
|
if batch_id in self.in_flight_batches:
|
||||||
|
batch = self.in_flight_batches.pop(batch_id)
|
||||||
|
|
||||||
|
# Mark transactions complete
|
||||||
|
for txn in batch['transactions']:
|
||||||
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||||
|
self.completed_transactions.append({
|
||||||
|
'txn_id': txn.txn_id,
|
||||||
|
'arrival_time': txn.arrival_time,
|
||||||
|
'completion_time': self.current_time,
|
||||||
|
'latency_ms': latency_ms,
|
||||||
|
'size_bytes': txn.size_bytes
|
||||||
|
})
|
||||||
|
|
||||||
|
def _calculate_storage_efficiency(self) -> float:
|
||||||
|
return 1.0 # S3 has no storage overhead
|
||||||
|
|
||||||
|
def _get_durability_guarantee(self) -> str:
|
||||||
|
return "Eventually durable (S3 11 9's)"
|
||||||
|
|
||||||
|
def _get_consistency_model(self) -> str:
|
||||||
|
return "Optimistic concurrency, eventual consistency"
|
||||||
|
|
||||||
|
def _get_recovery_time(self) -> float:
|
||||||
|
return 0.1 # Fast recovery, just reconnect to S3
|
||||||
|
|
||||||
|
def _get_operational_complexity(self) -> int:
|
||||||
|
return 3 # Moderate - S3 managed, but need batching logic
|
||||||
|
|
||||||
|
def _get_infrastructure_cost(self) -> float:
|
||||||
|
return 1.0 # Baseline cost
|
||||||
|
|
||||||
|
|
||||||
|
class TraditionalWALSimulator(PersistenceSimulator):
|
||||||
|
"""Traditional Write-Ahead Log with periodic sync"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
wal_sync_interval_ms: float = 10.0,
|
||||||
|
checkpoint_interval_sec: float = 30.0,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.wal_sync_interval_ms = wal_sync_interval_ms
|
||||||
|
self.checkpoint_interval_sec = checkpoint_interval_sec
|
||||||
|
|
||||||
|
# WAL state
|
||||||
|
self.wal_buffer = []
|
||||||
|
self.last_sync_time = 0.0
|
||||||
|
self.pending_transactions = {} # Waiting for sync
|
||||||
|
|
||||||
|
# EBS characteristics
|
||||||
|
self.disk_latency_ms = 1.0 # EBS base latency
|
||||||
|
self.disk_iops_limit = 10000 # Typical SSD
|
||||||
|
|
||||||
|
# Schedule periodic syncs
|
||||||
|
self._schedule_periodic_syncs()
|
||||||
|
|
||||||
|
def get_pattern_name(self) -> PersistencePattern:
|
||||||
|
return PersistencePattern.TRADITIONAL_WAL
|
||||||
|
|
||||||
|
def process_transaction(self, txn: Transaction):
|
||||||
|
"""Write to WAL buffer, wait for sync"""
|
||||||
|
# Write to WAL buffer (immediate)
|
||||||
|
self.wal_buffer.append(txn)
|
||||||
|
self.pending_transactions[txn.txn_id] = txn
|
||||||
|
|
||||||
|
# Track disk IOPS (write to WAL)
|
||||||
|
self.disk_iops.append(1.0)
|
||||||
|
|
||||||
|
def _schedule_periodic_syncs(self):
|
||||||
|
"""Schedule periodic WAL sync events"""
|
||||||
|
sync_time = self.wal_sync_interval_ms / 1000.0
|
||||||
|
while sync_time < self.simulation_duration:
|
||||||
|
self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'})
|
||||||
|
sync_time += self.wal_sync_interval_ms / 1000.0
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle WAL sync events"""
|
||||||
|
if data['type'] == 'wal_sync':
|
||||||
|
self._perform_wal_sync()
|
||||||
|
|
||||||
|
def _perform_wal_sync(self):
|
||||||
|
"""Sync WAL buffer to disk"""
|
||||||
|
if not self.wal_buffer:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Calculate sync latency based on buffer size
|
||||||
|
buffer_size = sum(txn.size_bytes for txn in self.wal_buffer)
|
||||||
|
sync_latency = self._calculate_disk_sync_latency(buffer_size)
|
||||||
|
|
||||||
|
completion_time = self.current_time + sync_latency / 1000.0
|
||||||
|
|
||||||
|
# Schedule sync completion
|
||||||
|
syncing_txns = list(self.wal_buffer)
|
||||||
|
self.schedule_event(completion_time, 'custom', {
|
||||||
|
'type': 'sync_complete',
|
||||||
|
'transactions': syncing_txns
|
||||||
|
})
|
||||||
|
|
||||||
|
# Track IOPS for sync operation
|
||||||
|
self.disk_iops.append(len(self.wal_buffer))
|
||||||
|
|
||||||
|
# Clear buffer
|
||||||
|
self.wal_buffer.clear()
|
||||||
|
self.last_sync_time = self.current_time
|
||||||
|
|
||||||
|
def _calculate_disk_sync_latency(self, size_bytes: int) -> float:
|
||||||
|
"""Calculate disk sync latency with realistic fsync modeling including directory sync"""
|
||||||
|
# Data write to page cache
|
||||||
|
write_latency = 0.1 # Fast write to page cache
|
||||||
|
|
||||||
|
# EBS sequential write throughput
|
||||||
|
throughput_mbps = 1000.0 # EBS gp3 throughput
|
||||||
|
size_mb = size_bytes / (1024 * 1024)
|
||||||
|
transfer_latency = size_mb * (1000.0 / throughput_mbps)
|
||||||
|
|
||||||
|
# WAL file fsync latency on EBS
|
||||||
|
# fdatasync on EBS with network replication
|
||||||
|
file_fsync_base = 0.8 # Higher base latency on EBS
|
||||||
|
file_fsync_variable = self.rng.gamma(2.2, 0.4) # More variable due to network
|
||||||
|
|
||||||
|
# Size penalty for large WAL syncs
|
||||||
|
size_penalty = min(size_mb * 0.05, 1.0) # Smaller penalty than batched writes
|
||||||
|
|
||||||
|
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
|
||||||
|
|
||||||
|
# WAL typically appends to existing files, so no directory fsync needed
|
||||||
|
# Directory fsync only required when creating new WAL segments
|
||||||
|
return write_latency + transfer_latency + file_fsync_latency
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle sync completion"""
|
||||||
|
if data['type'] == 'wal_sync':
|
||||||
|
self._perform_wal_sync()
|
||||||
|
elif data['type'] == 'sync_complete':
|
||||||
|
# Mark transactions as durable
|
||||||
|
for txn in data['transactions']:
|
||||||
|
if txn.txn_id in self.pending_transactions:
|
||||||
|
del self.pending_transactions[txn.txn_id]
|
||||||
|
|
||||||
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||||
|
self.completed_transactions.append({
|
||||||
|
'txn_id': txn.txn_id,
|
||||||
|
'arrival_time': txn.arrival_time,
|
||||||
|
'completion_time': self.current_time,
|
||||||
|
'latency_ms': latency_ms,
|
||||||
|
'size_bytes': txn.size_bytes
|
||||||
|
})
|
||||||
|
|
||||||
|
def _calculate_storage_efficiency(self) -> float:
|
||||||
|
return 2.0 # WAL + main storage
|
||||||
|
|
||||||
|
def _get_durability_guarantee(self) -> str:
|
||||||
|
return "ACID durable after WAL sync"
|
||||||
|
|
||||||
|
def _get_consistency_model(self) -> str:
|
||||||
|
return "Strict ACID consistency"
|
||||||
|
|
||||||
|
def _get_recovery_time(self) -> float:
|
||||||
|
return 30.0 # WAL replay time
|
||||||
|
|
||||||
|
def _get_operational_complexity(self) -> int:
|
||||||
|
return 7 # Complex - WAL management, checkpoints, recovery
|
||||||
|
|
||||||
|
def _get_infrastructure_cost(self) -> float:
|
||||||
|
return 1.5 # Higher due to disk I/O overhead
|
||||||
|
|
||||||
|
|
||||||
|
class SynchronousSimulator(PersistenceSimulator):
|
||||||
|
"""Synchronous disk persistence (like PostgreSQL with synchronous_commit=on)"""
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
self.disk_latency_ms = 1.0 # EBS base latency
|
||||||
|
|
||||||
|
def get_pattern_name(self) -> PersistencePattern:
|
||||||
|
return PersistencePattern.SYNCHRONOUS
|
||||||
|
|
||||||
|
def process_transaction(self, txn: Transaction):
|
||||||
|
"""Immediately sync transaction to disk"""
|
||||||
|
# Calculate disk write latency
|
||||||
|
disk_latency = self._calculate_disk_latency(txn.size_bytes) / 1000.0
|
||||||
|
completion_time = self.current_time + disk_latency
|
||||||
|
|
||||||
|
# Schedule completion
|
||||||
|
self.schedule_event(completion_time, 'custom', {
|
||||||
|
'type': 'disk_write_complete',
|
||||||
|
'transaction': txn
|
||||||
|
})
|
||||||
|
|
||||||
|
# Track disk IOPS
|
||||||
|
self.disk_iops.append(1.0)
|
||||||
|
|
||||||
|
def _calculate_disk_latency(self, size_bytes: int) -> float:
|
||||||
|
"""Calculate per-transaction disk write latency with fsync"""
|
||||||
|
# Write to page cache (fast)
|
||||||
|
write_latency = 0.1
|
||||||
|
|
||||||
|
# FSYNC latency for immediate durability on EBS
|
||||||
|
# Each transaction requires its own fsync - expensive on network storage!
|
||||||
|
fsync_base = 1.5 # Much higher base latency for individual fsyncs on EBS
|
||||||
|
fsync_variable = self.rng.gamma(2.3, 0.5) # More variable due to network
|
||||||
|
|
||||||
|
# Small write penalty - less efficient than batched writes
|
||||||
|
if size_bytes < 4096:
|
||||||
|
penalty = 0.5 # Individual small writes are less efficient
|
||||||
|
else:
|
||||||
|
penalty = 0.0
|
||||||
|
|
||||||
|
fsync_latency = fsync_base + fsync_variable + penalty
|
||||||
|
|
||||||
|
# Synchronous commits to existing database files don't need directory fsync
|
||||||
|
# Directory fsync only needed when creating new database files/tablespaces
|
||||||
|
return write_latency + fsync_latency
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle disk write completion"""
|
||||||
|
if data['type'] == 'disk_write_complete':
|
||||||
|
txn = data['transaction']
|
||||||
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||||
|
|
||||||
|
self.completed_transactions.append({
|
||||||
|
'txn_id': txn.txn_id,
|
||||||
|
'arrival_time': txn.arrival_time,
|
||||||
|
'completion_time': self.current_time,
|
||||||
|
'latency_ms': latency_ms,
|
||||||
|
'size_bytes': txn.size_bytes
|
||||||
|
})
|
||||||
|
|
||||||
|
def _calculate_storage_efficiency(self) -> float:
|
||||||
|
return 1.5 # Some overhead for metadata
|
||||||
|
|
||||||
|
def _get_durability_guarantee(self) -> str:
|
||||||
|
return "Immediate ACID durability"
|
||||||
|
|
||||||
|
def _get_consistency_model(self) -> str:
|
||||||
|
return "Strict ACID with immediate consistency"
|
||||||
|
|
||||||
|
def _get_recovery_time(self) -> float:
|
||||||
|
return 5.0 # Fast recovery, data already on disk
|
||||||
|
|
||||||
|
def _get_operational_complexity(self) -> int:
|
||||||
|
return 5 # Moderate - standard database operations
|
||||||
|
|
||||||
|
def _get_infrastructure_cost(self) -> float:
|
||||||
|
return 2.0 # High due to disk I/O per transaction
|
||||||
|
|
||||||
|
|
||||||
|
class WeaselDBDiskSimulator(PersistenceSimulator):
|
||||||
|
"""WeaselDB's batched disk persistence simulation"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
batch_timeout_ms: float = 1.0,
|
||||||
|
batch_size_threshold: int = 800000,
|
||||||
|
max_in_flight: int = 50,
|
||||||
|
**kwargs):
|
||||||
|
super().__init__(**kwargs)
|
||||||
|
|
||||||
|
self.batch_timeout_ms = batch_timeout_ms
|
||||||
|
self.batch_size_threshold = batch_size_threshold
|
||||||
|
self.max_in_flight = max_in_flight
|
||||||
|
|
||||||
|
# State
|
||||||
|
self.current_batch = []
|
||||||
|
self.batch_start_time = None
|
||||||
|
self.in_flight_batches = {}
|
||||||
|
self.next_batch_id = 0
|
||||||
|
|
||||||
|
# EBS characteristics (gp3 or io2 volumes)
|
||||||
|
self.disk_base_latency_ms = 0.5 # EBS has higher base latency than local NVMe
|
||||||
|
self.disk_throughput_mbps = 1000.0 # EBS gp3 max throughput
|
||||||
|
|
||||||
|
def get_pattern_name(self) -> PersistencePattern:
|
||||||
|
return PersistencePattern.WEASELDB_S3 # Reuse enum, but it's actually disk
|
||||||
|
|
||||||
|
def process_transaction(self, txn: Transaction):
|
||||||
|
"""Process transaction using WeaselDB batching logic"""
|
||||||
|
# Add to current batch
|
||||||
|
if not self.current_batch:
|
||||||
|
self.batch_start_time = self.current_time
|
||||||
|
|
||||||
|
self.current_batch.append(txn)
|
||||||
|
|
||||||
|
# Check if we should send batch
|
||||||
|
if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight:
|
||||||
|
self._send_current_batch()
|
||||||
|
|
||||||
|
def _should_send_batch(self) -> bool:
|
||||||
|
"""Check batch triggers"""
|
||||||
|
if not self.current_batch:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Size trigger
|
||||||
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
||||||
|
if batch_size >= self.batch_size_threshold:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Time trigger
|
||||||
|
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def _send_current_batch(self):
|
||||||
|
"""Send current batch to disk"""
|
||||||
|
if not self.current_batch:
|
||||||
|
return
|
||||||
|
|
||||||
|
batch_id = self.next_batch_id
|
||||||
|
self.next_batch_id += 1
|
||||||
|
|
||||||
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
||||||
|
|
||||||
|
# Sample disk write latency
|
||||||
|
disk_latency = self._sample_disk_latency(batch_size) / 1000.0
|
||||||
|
completion_time = self.current_time + disk_latency
|
||||||
|
|
||||||
|
# Track batch
|
||||||
|
self.in_flight_batches[batch_id] = {
|
||||||
|
'transactions': self.current_batch.copy(),
|
||||||
|
'sent_time': self.current_time,
|
||||||
|
'completion_time': completion_time,
|
||||||
|
'size_bytes': batch_size
|
||||||
|
}
|
||||||
|
|
||||||
|
# Schedule completion
|
||||||
|
self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id})
|
||||||
|
|
||||||
|
# Track disk IOPS (one write operation per batch)
|
||||||
|
self.disk_iops.append(1.0)
|
||||||
|
|
||||||
|
# Clear batch
|
||||||
|
self.current_batch.clear()
|
||||||
|
self.batch_start_time = None
|
||||||
|
|
||||||
|
def _sample_disk_latency(self, batch_size_bytes: int) -> float:
|
||||||
|
"""Sample disk write latency with realistic fsync modeling including directory sync"""
|
||||||
|
# Base latency for the write command (data goes to page cache)
|
||||||
|
write_latency = self.disk_base_latency_ms
|
||||||
|
|
||||||
|
# Throughput-based latency for data transfer to page cache
|
||||||
|
size_mb = batch_size_bytes / (1024 * 1024)
|
||||||
|
transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms
|
||||||
|
|
||||||
|
# FSYNC latency for EBS - forces write to replicated storage
|
||||||
|
# EBS has higher fsync latency due to network replication
|
||||||
|
file_fsync_base = 1.0 # Higher base fsync latency for EBS
|
||||||
|
file_fsync_variable = self.rng.gamma(2.5, 0.6) # More variable due to network
|
||||||
|
|
||||||
|
# Size-dependent fsync penalty for large batches
|
||||||
|
size_penalty = min(size_mb * 0.1, 2.0) # Max 2ms penalty
|
||||||
|
|
||||||
|
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
|
||||||
|
|
||||||
|
# Directory fsync latency - required for WeaselDB batch file creation on EBS
|
||||||
|
# EBS directory metadata sync is also network-replicated
|
||||||
|
dir_fsync_base = 0.5 # Higher directory metadata sync latency on EBS
|
||||||
|
dir_fsync_variable = self.rng.gamma(1.8, 0.3) # More variable due to network
|
||||||
|
|
||||||
|
dir_fsync_latency = dir_fsync_base + dir_fsync_variable
|
||||||
|
|
||||||
|
# Total latency: write + file fsync + directory fsync
|
||||||
|
# WeaselDB needs directory fsync for batch file durability guarantees
|
||||||
|
return write_latency + transfer_latency + file_fsync_latency + dir_fsync_latency
|
||||||
|
|
||||||
|
def _handle_custom_event(self, data):
|
||||||
|
"""Handle batch completion events"""
|
||||||
|
if data['type'] == 'batch_complete':
|
||||||
|
batch_id = data['batch_id']
|
||||||
|
if batch_id in self.in_flight_batches:
|
||||||
|
batch = self.in_flight_batches.pop(batch_id)
|
||||||
|
|
||||||
|
# Mark transactions complete
|
||||||
|
for txn in batch['transactions']:
|
||||||
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
||||||
|
self.completed_transactions.append({
|
||||||
|
'txn_id': txn.txn_id,
|
||||||
|
'arrival_time': txn.arrival_time,
|
||||||
|
'completion_time': self.current_time,
|
||||||
|
'latency_ms': latency_ms,
|
||||||
|
'size_bytes': txn.size_bytes
|
||||||
|
})
|
||||||
|
|
||||||
|
def _calculate_storage_efficiency(self) -> float:
|
||||||
|
return 1.2 # Some overhead for batching metadata
|
||||||
|
|
||||||
|
def _get_durability_guarantee(self) -> str:
|
||||||
|
return "ACID durable after EBS replication"
|
||||||
|
|
||||||
|
def _get_consistency_model(self) -> str:
|
||||||
|
return "Strict serializable with optimistic concurrency"
|
||||||
|
|
||||||
|
def _get_recovery_time(self) -> float:
|
||||||
|
return 10.0 # EBS volume attachment + recovery
|
||||||
|
|
||||||
|
def _get_operational_complexity(self) -> int:
|
||||||
|
return 4 # Moderate - batching logic + disk management
|
||||||
|
|
||||||
|
def _get_infrastructure_cost(self) -> float:
|
||||||
|
return 1.4 # Higher than S3 due to EBS provisioned storage + replication
|
||||||
|
|
||||||
|
|
||||||
|
class DatabaseComparisonFramework:
|
||||||
|
"""Framework for comparing different database persistence patterns"""
|
||||||
|
|
||||||
|
def __init__(self, simulation_duration: float = 30.0):
|
||||||
|
self.simulation_duration = simulation_duration
|
||||||
|
|
||||||
|
def run_comparison(self,
|
||||||
|
arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]:
|
||||||
|
"""Run comparison across multiple arrival rates"""
|
||||||
|
|
||||||
|
results = defaultdict(list)
|
||||||
|
|
||||||
|
for rate in arrival_rates:
|
||||||
|
print(f"\nTesting at {rate} TPS...")
|
||||||
|
|
||||||
|
# WeaselDB S3 (optimized config)
|
||||||
|
weasel_s3 = WeaselDBSimulator(
|
||||||
|
batch_timeout_ms=1.0,
|
||||||
|
batch_size_threshold=800000,
|
||||||
|
max_in_flight=50,
|
||||||
|
simulation_duration=self.simulation_duration,
|
||||||
|
arrival_rate_per_sec=rate
|
||||||
|
)
|
||||||
|
weasel_s3_metrics = weasel_s3.run_simulation()
|
||||||
|
results['WeaselDB S3'].append(weasel_s3_metrics)
|
||||||
|
print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms")
|
||||||
|
|
||||||
|
# WeaselDB EBS (same batching, EBS storage)
|
||||||
|
weasel_ebs = WeaselDBDiskSimulator(
|
||||||
|
batch_timeout_ms=1.0,
|
||||||
|
batch_size_threshold=800000,
|
||||||
|
max_in_flight=50,
|
||||||
|
simulation_duration=self.simulation_duration,
|
||||||
|
arrival_rate_per_sec=rate
|
||||||
|
)
|
||||||
|
weasel_ebs_metrics = weasel_ebs.run_simulation()
|
||||||
|
results['WeaselDB EBS'].append(weasel_ebs_metrics)
|
||||||
|
print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms")
|
||||||
|
|
||||||
|
# Traditional WAL
|
||||||
|
wal = TraditionalWALSimulator(
|
||||||
|
wal_sync_interval_ms=10.0,
|
||||||
|
simulation_duration=self.simulation_duration,
|
||||||
|
arrival_rate_per_sec=rate
|
||||||
|
)
|
||||||
|
wal_metrics = wal.run_simulation()
|
||||||
|
results['Traditional WAL'].append(wal_metrics)
|
||||||
|
print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms")
|
||||||
|
|
||||||
|
# Synchronous
|
||||||
|
sync = SynchronousSimulator(
|
||||||
|
simulation_duration=self.simulation_duration,
|
||||||
|
arrival_rate_per_sec=rate
|
||||||
|
)
|
||||||
|
sync_metrics = sync.run_simulation()
|
||||||
|
results['Synchronous'].append(sync_metrics)
|
||||||
|
print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms")
|
||||||
|
|
||||||
|
return dict(results)
|
||||||
|
|
||||||
|
def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]):
|
||||||
|
"""Print comprehensive comparison report"""
|
||||||
|
print("\n" + "="*80)
|
||||||
|
print("DATABASE PERSISTENCE PATTERN COMPARISON")
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
# Get arrival rates for headers
|
||||||
|
arrival_rates = [100, 500, 1000, 2000]
|
||||||
|
|
||||||
|
# Latency comparison
|
||||||
|
print(f"\nP95 LATENCY COMPARISON (ms)")
|
||||||
|
print(f"{'Pattern':<20}", end="")
|
||||||
|
for rate in arrival_rates:
|
||||||
|
print(f"{rate:>8} TPS", end="")
|
||||||
|
print()
|
||||||
|
print("-" * 60)
|
||||||
|
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
print(f"{pattern_name:<20}", end="")
|
||||||
|
for metrics in metrics_list:
|
||||||
|
print(f"{metrics.p95_latency:>8.1f}", end="")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Throughput comparison
|
||||||
|
print(f"\nTHROUGHPUT ACHIEVED (TPS)")
|
||||||
|
print(f"{'Pattern':<20}", end="")
|
||||||
|
for rate in arrival_rates:
|
||||||
|
print(f"{rate:>8} TPS", end="")
|
||||||
|
print()
|
||||||
|
print("-" * 60)
|
||||||
|
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
print(f"{pattern_name:<20}", end="")
|
||||||
|
for metrics in metrics_list:
|
||||||
|
print(f"{metrics.avg_throughput_tps:>8.1f}", end="")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Characteristics comparison
|
||||||
|
print(f"\nSYSTEM CHARACTERISTICS")
|
||||||
|
print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}")
|
||||||
|
print("-" * 85)
|
||||||
|
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
metrics = metrics_list[0] # Use first metrics for characteristics
|
||||||
|
print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
|
||||||
|
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
|
||||||
|
f"{metrics.infrastructure_cost:<6.1f}")
|
||||||
|
|
||||||
|
# Performance sweet spots
|
||||||
|
print(f"\nPERFORMANCE SWEET SPOTS")
|
||||||
|
print("-" * 40)
|
||||||
|
|
||||||
|
for rate in arrival_rates:
|
||||||
|
print(f"\nAt {rate} TPS:")
|
||||||
|
rate_results = [(name, metrics_list[arrival_rates.index(rate)])
|
||||||
|
for name, metrics_list in results.items()]
|
||||||
|
|
||||||
|
# Sort by P95 latency
|
||||||
|
rate_results.sort(key=lambda x: x[1].p95_latency)
|
||||||
|
|
||||||
|
for i, (name, metrics) in enumerate(rate_results):
|
||||||
|
rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
|
||||||
|
print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
|
||||||
|
f"{metrics.avg_throughput_tps:.0f} TPS achieved")
|
||||||
|
|
||||||
|
def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]],
|
||||||
|
save_path: Optional[str] = None):
|
||||||
|
"""Plot comparison results"""
|
||||||
|
try:
|
||||||
|
arrival_rates = [100, 500, 1000, 2000]
|
||||||
|
|
||||||
|
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||||||
|
fig.suptitle('Database Persistence Pattern Comparison', fontsize=16)
|
||||||
|
|
||||||
|
# Plot 1: P95 Latency vs Load
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
p95_latencies = [m.p95_latency for m in metrics_list]
|
||||||
|
ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name)
|
||||||
|
|
||||||
|
ax1.set_xlabel('Arrival Rate (TPS)')
|
||||||
|
ax1.set_ylabel('P95 Latency (ms)')
|
||||||
|
ax1.set_title('P95 Latency vs Load')
|
||||||
|
ax1.legend()
|
||||||
|
ax1.grid(True, alpha=0.3)
|
||||||
|
ax1.set_yscale('log')
|
||||||
|
|
||||||
|
# Plot 2: Throughput Achieved
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
throughputs = [m.avg_throughput_tps for m in metrics_list]
|
||||||
|
ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name)
|
||||||
|
|
||||||
|
# Perfect throughput line
|
||||||
|
ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)')
|
||||||
|
|
||||||
|
ax2.set_xlabel('Target Rate (TPS)')
|
||||||
|
ax2.set_ylabel('Achieved Throughput (TPS)')
|
||||||
|
ax2.set_title('Throughput: Target vs Achieved')
|
||||||
|
ax2.legend()
|
||||||
|
ax2.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 3: Latency Distribution at 1000 TPS
|
||||||
|
rate_idx = 2 # 1000 TPS
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
metrics = metrics_list[rate_idx]
|
||||||
|
# Plot latency percentiles
|
||||||
|
percentiles = [50, 95, 99]
|
||||||
|
values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency]
|
||||||
|
ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values,
|
||||||
|
alpha=0.7, label=pattern_name)
|
||||||
|
|
||||||
|
ax3.set_ylabel('Latency (ms)')
|
||||||
|
ax3.set_title('Latency Percentiles at 1000 TPS')
|
||||||
|
ax3.set_yscale('log')
|
||||||
|
ax3.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 4: Cost vs Performance
|
||||||
|
for pattern_name, metrics_list in results.items():
|
||||||
|
costs = [m.infrastructure_cost for m in metrics_list]
|
||||||
|
p95s = [m.p95_latency for m in metrics_list]
|
||||||
|
|
||||||
|
# Use different markers for different patterns
|
||||||
|
markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'}
|
||||||
|
marker = markers.get(pattern_name, 'o')
|
||||||
|
|
||||||
|
ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name)
|
||||||
|
|
||||||
|
ax4.set_xlabel('Infrastructure Cost (relative)')
|
||||||
|
ax4.set_ylabel('P95 Latency (ms)')
|
||||||
|
ax4.set_title('Cost vs Performance Trade-off')
|
||||||
|
ax4.legend()
|
||||||
|
ax4.grid(True, alpha=0.3)
|
||||||
|
ax4.set_yscale('log')
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
|
||||||
|
if save_path:
|
||||||
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||||
|
print(f"Comparison plots saved to {save_path}")
|
||||||
|
else:
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Could not generate plots: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Run database persistence pattern comparison"""
|
||||||
|
print("Database Persistence Pattern Comparison")
|
||||||
|
print("Comparing WeaselDB vs Traditional Database Approaches")
|
||||||
|
print()
|
||||||
|
|
||||||
|
comparison = DatabaseComparisonFramework(simulation_duration=20.0)
|
||||||
|
results = comparison.run_comparison()
|
||||||
|
|
||||||
|
comparison.print_comparison_report(results)
|
||||||
|
|
||||||
|
try:
|
||||||
|
comparison.plot_comparison_results(results, 'database_comparison.png')
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Could not generate plots: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
461
latency_sim/persistence_optimizer.py
Normal file
461
latency_sim/persistence_optimizer.py
Normal file
@@ -0,0 +1,461 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Persistence Thread Parameter Optimization
|
||||||
|
|
||||||
|
Uses Bayesian Optimization to automatically find the optimal configuration
|
||||||
|
parameters that minimize commit latency. This is much more efficient than
|
||||||
|
grid search since it uses a probabilistic model to guide parameter exploration.
|
||||||
|
|
||||||
|
Key advantages:
|
||||||
|
- Efficiently explores high-dimensional parameter spaces
|
||||||
|
- Uses previous simulation results to guide future parameter choices
|
||||||
|
- Handles expensive objective function evaluations (our simulation)
|
||||||
|
- Provides uncertainty estimates for parameter importance
|
||||||
|
"""
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from typing import Dict, List, Tuple, Optional
|
||||||
|
import time
|
||||||
|
from persistence_simulation import PersistenceSimulation, print_results
|
||||||
|
|
||||||
|
# Try to import scikit-optimize for Bayesian optimization
|
||||||
|
try:
|
||||||
|
from skopt import gp_minimize, forest_minimize
|
||||||
|
from skopt.space import Real, Integer
|
||||||
|
from skopt.utils import use_named_args
|
||||||
|
from skopt.plots import plot_convergence, plot_objective
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
OPTIMIZE_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
print("scikit-optimize not available. Install with: pip install scikit-optimize")
|
||||||
|
print("Falling back to grid search...")
|
||||||
|
OPTIMIZE_AVAILABLE = False
|
||||||
|
|
||||||
|
|
||||||
|
class PersistenceOptimizer:
|
||||||
|
"""
|
||||||
|
Automated parameter optimization for the persistence thread using Bayesian optimization.
|
||||||
|
|
||||||
|
This class finds the optimal configuration parameters to minimize commit latency
|
||||||
|
by intelligently exploring the parameter space using Gaussian Process models.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
optimization_budget: int = 50,
|
||||||
|
simulation_duration: float = 20.0,
|
||||||
|
arrival_rate: float = 1000.0,
|
||||||
|
objective_metric: str = "p95_latency",
|
||||||
|
random_seed: int = 42):
|
||||||
|
|
||||||
|
self.optimization_budget = optimization_budget
|
||||||
|
self.simulation_duration = simulation_duration
|
||||||
|
self.arrival_rate = arrival_rate
|
||||||
|
self.objective_metric = objective_metric
|
||||||
|
self.random_seed = random_seed
|
||||||
|
|
||||||
|
# Track optimization history
|
||||||
|
self.optimization_history = []
|
||||||
|
self.best_params = None
|
||||||
|
self.best_score = float('inf')
|
||||||
|
|
||||||
|
# Define parameter search space
|
||||||
|
self.parameter_space = self._define_search_space()
|
||||||
|
self.parameter_names = [dim.name for dim in self.parameter_space]
|
||||||
|
|
||||||
|
def _define_search_space(self) -> List:
|
||||||
|
"""
|
||||||
|
Define the parameter search space for optimization.
|
||||||
|
|
||||||
|
Focus on the 3 core parameters that matter for persistence thread performance
|
||||||
|
with 100% reliable S3. Retry parameters removed since S3 never fails.
|
||||||
|
"""
|
||||||
|
return [
|
||||||
|
# Core batching parameters
|
||||||
|
Real(1.0, 50.0, name='batch_timeout_ms',
|
||||||
|
prior='log-uniform'), # Log scale since small changes matter
|
||||||
|
Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB
|
||||||
|
prior='log-uniform'),
|
||||||
|
|
||||||
|
# Flow control parameters - likely the most impactful
|
||||||
|
Integer(1, 50, name='max_in_flight_requests'),
|
||||||
|
]
|
||||||
|
|
||||||
|
def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict:
|
||||||
|
"""Run simulation with given parameters and return results"""
|
||||||
|
try:
|
||||||
|
sim = PersistenceSimulation(
|
||||||
|
batch_timeout_ms=params['batch_timeout_ms'],
|
||||||
|
batch_size_threshold=int(params['batch_size_threshold']),
|
||||||
|
max_in_flight_requests=int(params['max_in_flight_requests']),
|
||||||
|
# Retry parameters fixed since S3 is 100% reliable
|
||||||
|
max_retry_attempts=0, # No retries needed
|
||||||
|
retry_base_delay_ms=100.0, # Irrelevant but needs a value
|
||||||
|
# S3 parameters kept fixed - 100% reliable for optimization focus
|
||||||
|
s3_latency_shape=2.0, # Fixed Gamma shape
|
||||||
|
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
|
||||||
|
s3_failure_rate=0.0, # 100% reliable S3
|
||||||
|
arrival_rate_per_sec=self.arrival_rate,
|
||||||
|
simulation_duration_sec=self.simulation_duration
|
||||||
|
)
|
||||||
|
|
||||||
|
return sim.run_simulation()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Simulation failed with params {params}: {e}")
|
||||||
|
# Return a high penalty score for failed simulations
|
||||||
|
return {
|
||||||
|
'commit_metrics': {
|
||||||
|
'latency_ms': {
|
||||||
|
'mean': 10000,
|
||||||
|
'p95': 10000,
|
||||||
|
'p99': 10000
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'error': str(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
def _extract_objective_value(self, results: Dict) -> float:
|
||||||
|
"""Extract the objective value to minimize from simulation results"""
|
||||||
|
try:
|
||||||
|
commit_metrics = results['commit_metrics']['latency_ms']
|
||||||
|
|
||||||
|
if self.objective_metric == "mean_latency":
|
||||||
|
return commit_metrics['mean']
|
||||||
|
elif self.objective_metric == "p95_latency":
|
||||||
|
return commit_metrics['p95']
|
||||||
|
elif self.objective_metric == "p99_latency":
|
||||||
|
return commit_metrics['p99']
|
||||||
|
elif self.objective_metric == "weighted_latency":
|
||||||
|
# Weighted combination emphasizing tail latencies
|
||||||
|
return (0.3 * commit_metrics['mean'] +
|
||||||
|
0.5 * commit_metrics['p95'] +
|
||||||
|
0.2 * commit_metrics['p99'])
|
||||||
|
else:
|
||||||
|
return commit_metrics['p95'] # Default to P95
|
||||||
|
|
||||||
|
except KeyError as e:
|
||||||
|
print(f"Failed to extract objective from results: {e}")
|
||||||
|
return 10000 # High penalty for invalid results
|
||||||
|
|
||||||
|
def optimize_with_bayesian(self) -> Tuple[Dict, float]:
|
||||||
|
"""
|
||||||
|
Use Bayesian Optimization to find optimal parameters.
|
||||||
|
|
||||||
|
This uses Gaussian Process models to build a probabilistic model
|
||||||
|
of the objective function and intelligently choose where to sample next.
|
||||||
|
"""
|
||||||
|
if not OPTIMIZE_AVAILABLE:
|
||||||
|
return self.optimize_with_grid_search()
|
||||||
|
|
||||||
|
print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations")
|
||||||
|
print(f"Objective: Minimize {self.objective_metric}")
|
||||||
|
print(f"Parameter space: {len(self.parameter_space)} dimensions")
|
||||||
|
print()
|
||||||
|
|
||||||
|
@use_named_args(self.parameter_space)
|
||||||
|
def objective(**params):
|
||||||
|
"""Objective function for Bayesian optimization"""
|
||||||
|
print(f"Evaluating: {params}")
|
||||||
|
|
||||||
|
start_time = time.time()
|
||||||
|
results = self._run_simulation_with_params(params)
|
||||||
|
eval_time = time.time() - start_time
|
||||||
|
|
||||||
|
objective_value = self._extract_objective_value(results)
|
||||||
|
|
||||||
|
# Track optimization history
|
||||||
|
history_entry = {
|
||||||
|
'params': params.copy(),
|
||||||
|
'objective_value': objective_value,
|
||||||
|
'results': results,
|
||||||
|
'eval_time': eval_time,
|
||||||
|
'iteration': len(self.optimization_history) + 1
|
||||||
|
}
|
||||||
|
self.optimization_history.append(history_entry)
|
||||||
|
|
||||||
|
# Update best if improved
|
||||||
|
if objective_value < self.best_score:
|
||||||
|
self.best_score = objective_value
|
||||||
|
self.best_params = params.copy()
|
||||||
|
print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})")
|
||||||
|
else:
|
||||||
|
print(f" Score: {objective_value:.2f}ms")
|
||||||
|
|
||||||
|
print(f" Time: {eval_time:.1f}s")
|
||||||
|
print()
|
||||||
|
|
||||||
|
return objective_value
|
||||||
|
|
||||||
|
# Run Bayesian optimization
|
||||||
|
result = gp_minimize(
|
||||||
|
func=objective,
|
||||||
|
dimensions=self.parameter_space,
|
||||||
|
n_calls=self.optimization_budget,
|
||||||
|
n_initial_points=10, # Random exploration first
|
||||||
|
acq_func='EI', # Expected Improvement acquisition
|
||||||
|
random_state=self.random_seed
|
||||||
|
)
|
||||||
|
|
||||||
|
# Extract best parameters
|
||||||
|
best_params_list = result.x
|
||||||
|
best_params_dict = dict(zip(self.parameter_names, best_params_list))
|
||||||
|
best_objective = result.fun
|
||||||
|
|
||||||
|
return best_params_dict, best_objective
|
||||||
|
|
||||||
|
def optimize_with_grid_search(self) -> Tuple[Dict, float]:
|
||||||
|
"""Fallback grid search optimization if scikit-optimize not available"""
|
||||||
|
print("Using grid search optimization (install scikit-optimize for better results)")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Define a smaller grid for key parameters
|
||||||
|
grid_configs = [
|
||||||
|
# Vary max_in_flight and batch_timeout
|
||||||
|
{'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0},
|
||||||
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0},
|
||||||
|
{'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0},
|
||||||
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0},
|
||||||
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0},
|
||||||
|
{'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0},
|
||||||
|
{'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0},
|
||||||
|
]
|
||||||
|
|
||||||
|
best_params = None
|
||||||
|
best_score = float('inf')
|
||||||
|
|
||||||
|
for i, config in enumerate(grid_configs):
|
||||||
|
print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}")
|
||||||
|
|
||||||
|
# Use default values for unspecified parameters
|
||||||
|
full_params = {
|
||||||
|
'batch_timeout_ms': 5.0,
|
||||||
|
'batch_size_threshold': 1024 * 1024,
|
||||||
|
'max_in_flight_requests': 5
|
||||||
|
}
|
||||||
|
full_params.update(config)
|
||||||
|
|
||||||
|
results = self._run_simulation_with_params(full_params)
|
||||||
|
objective_value = self._extract_objective_value(results)
|
||||||
|
|
||||||
|
if objective_value < best_score:
|
||||||
|
best_score = objective_value
|
||||||
|
best_params = full_params.copy()
|
||||||
|
print(f"✓ NEW BEST: {objective_value:.2f}ms")
|
||||||
|
else:
|
||||||
|
print(f" Score: {objective_value:.2f}ms")
|
||||||
|
print()
|
||||||
|
|
||||||
|
return best_params, best_score
|
||||||
|
|
||||||
|
def analyze_parameter_importance(self):
|
||||||
|
"""Analyze which parameters have the most impact on performance"""
|
||||||
|
if not self.optimization_history:
|
||||||
|
print("No optimization history available")
|
||||||
|
return
|
||||||
|
|
||||||
|
print("Parameter Importance Analysis")
|
||||||
|
print("=" * 50)
|
||||||
|
|
||||||
|
# Extract parameter values and objectives
|
||||||
|
param_data = {}
|
||||||
|
objectives = []
|
||||||
|
|
||||||
|
for entry in self.optimization_history:
|
||||||
|
objectives.append(entry['objective_value'])
|
||||||
|
for param_name, param_value in entry['params'].items():
|
||||||
|
if param_name not in param_data:
|
||||||
|
param_data[param_name] = []
|
||||||
|
param_data[param_name].append(param_value)
|
||||||
|
|
||||||
|
objectives = np.array(objectives)
|
||||||
|
|
||||||
|
# Simple correlation analysis
|
||||||
|
print("Parameter correlations with objective (lower is better):")
|
||||||
|
correlations = []
|
||||||
|
|
||||||
|
for param_name, values in param_data.items():
|
||||||
|
correlation = np.corrcoef(values, objectives)[0, 1]
|
||||||
|
correlations.append((param_name, correlation))
|
||||||
|
print(f" {param_name:<25}: {correlation:+.3f}")
|
||||||
|
|
||||||
|
print("\nMost impactful parameters (by absolute correlation):")
|
||||||
|
correlations.sort(key=lambda x: abs(x[1]), reverse=True)
|
||||||
|
for param_name, correlation in correlations[:5]:
|
||||||
|
impact = "reduces latency" if correlation < 0 else "increases latency"
|
||||||
|
print(f" {param_name:<25}: {impact} (r={correlation:+.3f})")
|
||||||
|
|
||||||
|
def plot_optimization_progress(self, save_path: Optional[str] = None):
|
||||||
|
"""Plot optimization convergence"""
|
||||||
|
if not OPTIMIZE_AVAILABLE or not self.optimization_history:
|
||||||
|
return
|
||||||
|
|
||||||
|
iterations = [entry['iteration'] for entry in self.optimization_history]
|
||||||
|
objectives = [entry['objective_value'] for entry in self.optimization_history]
|
||||||
|
|
||||||
|
# Calculate running minimum (best so far)
|
||||||
|
running_min = []
|
||||||
|
current_min = float('inf')
|
||||||
|
for obj in objectives:
|
||||||
|
current_min = min(current_min, obj)
|
||||||
|
running_min.append(current_min)
|
||||||
|
|
||||||
|
plt.figure(figsize=(12, 8))
|
||||||
|
|
||||||
|
# Plot 1: Objective value over iterations
|
||||||
|
plt.subplot(2, 2, 1)
|
||||||
|
plt.scatter(iterations, objectives, alpha=0.6, s=30)
|
||||||
|
plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far')
|
||||||
|
plt.xlabel('Iteration')
|
||||||
|
plt.ylabel(f'{self.objective_metric} (ms)')
|
||||||
|
plt.title('Optimization Progress')
|
||||||
|
plt.legend()
|
||||||
|
plt.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 2: Parameter evolution for key parameters
|
||||||
|
plt.subplot(2, 2, 2)
|
||||||
|
key_params = ['max_in_flight_requests', 'batch_timeout_ms']
|
||||||
|
for param in key_params:
|
||||||
|
if param in self.optimization_history[0]['params']:
|
||||||
|
values = [entry['params'][param] for entry in self.optimization_history]
|
||||||
|
plt.scatter(iterations, values, alpha=0.6, label=param, s=30)
|
||||||
|
plt.xlabel('Iteration')
|
||||||
|
plt.ylabel('Parameter Value')
|
||||||
|
plt.title('Key Parameter Evolution')
|
||||||
|
plt.legend()
|
||||||
|
plt.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 3: Objective distribution
|
||||||
|
plt.subplot(2, 2, 3)
|
||||||
|
plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black')
|
||||||
|
plt.axvline(self.best_score, color='red', linestyle='--',
|
||||||
|
label=f'Best: {self.best_score:.1f}ms')
|
||||||
|
plt.xlabel(f'{self.objective_metric} (ms)')
|
||||||
|
plt.ylabel('Count')
|
||||||
|
plt.title('Objective Value Distribution')
|
||||||
|
plt.legend()
|
||||||
|
plt.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
# Plot 4: Convergence rate
|
||||||
|
plt.subplot(2, 2, 4)
|
||||||
|
improvements = []
|
||||||
|
for i, entry in enumerate(self.optimization_history):
|
||||||
|
if i == 0:
|
||||||
|
improvements.append(0)
|
||||||
|
else:
|
||||||
|
prev_best = running_min[i-1]
|
||||||
|
curr_best = running_min[i]
|
||||||
|
improvement = prev_best - curr_best
|
||||||
|
improvements.append(improvement)
|
||||||
|
|
||||||
|
plt.plot(iterations, improvements, 'g-', marker='o', markersize=3)
|
||||||
|
plt.xlabel('Iteration')
|
||||||
|
plt.ylabel('Improvement (ms)')
|
||||||
|
plt.title('Per-Iteration Improvement')
|
||||||
|
plt.grid(True, alpha=0.3)
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
|
||||||
|
if save_path:
|
||||||
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||||
|
print(f"Optimization plots saved to {save_path}")
|
||||||
|
else:
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
def run_optimization(self) -> Dict:
|
||||||
|
"""Run the full optimization process and return results"""
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
# Run optimization
|
||||||
|
if OPTIMIZE_AVAILABLE:
|
||||||
|
best_params, best_score = self.optimize_with_bayesian()
|
||||||
|
else:
|
||||||
|
best_params, best_score = self.optimize_with_grid_search()
|
||||||
|
|
||||||
|
total_time = time.time() - start_time
|
||||||
|
|
||||||
|
# Run final simulation with best parameters for detailed results
|
||||||
|
print("Running final simulation with optimal parameters...")
|
||||||
|
final_results = self._run_simulation_with_params(best_params)
|
||||||
|
|
||||||
|
# Prepare optimization summary
|
||||||
|
optimization_summary = {
|
||||||
|
'best_parameters': best_params,
|
||||||
|
'best_objective_value': best_score,
|
||||||
|
'optimization_time': total_time,
|
||||||
|
'evaluations_performed': len(self.optimization_history),
|
||||||
|
'final_simulation_results': final_results,
|
||||||
|
'optimization_history': self.optimization_history
|
||||||
|
}
|
||||||
|
|
||||||
|
return optimization_summary
|
||||||
|
|
||||||
|
def print_optimization_summary(self, summary: Dict):
|
||||||
|
"""Print a comprehensive summary of optimization results"""
|
||||||
|
print("=" * 80)
|
||||||
|
print("BAYESIAN OPTIMIZATION RESULTS")
|
||||||
|
print("=" * 80)
|
||||||
|
|
||||||
|
print(f"Optimization completed in {summary['optimization_time']:.1f} seconds")
|
||||||
|
print(f"Performed {summary['evaluations_performed']} parameter evaluations")
|
||||||
|
print(f"Best {self.objective_metric}: {summary['best_objective_value']:.2f}ms")
|
||||||
|
print()
|
||||||
|
|
||||||
|
print("OPTIMAL PARAMETERS:")
|
||||||
|
print("-" * 40)
|
||||||
|
for param, value in summary['best_parameters'].items():
|
||||||
|
if isinstance(value, float):
|
||||||
|
if param.endswith('_rate'):
|
||||||
|
print(f" {param:<25}: {value:.4f}")
|
||||||
|
else:
|
||||||
|
print(f" {param:<25}: {value:.2f}")
|
||||||
|
else:
|
||||||
|
print(f" {param:<25}: {value}")
|
||||||
|
|
||||||
|
print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:")
|
||||||
|
print("-" * 50)
|
||||||
|
final_results = summary['final_simulation_results']
|
||||||
|
print_results(final_results)
|
||||||
|
|
||||||
|
print("\nPARAMETER IMPACT ANALYSIS:")
|
||||||
|
print("-" * 30)
|
||||||
|
self.analyze_parameter_importance()
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""Main optimization workflow"""
|
||||||
|
print("Persistence Thread Parameter Optimization")
|
||||||
|
print("Using Bayesian Optimization for intelligent parameter search")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Create optimizer with different objective functions to test
|
||||||
|
objectives_to_test = ["p95_latency", "weighted_latency"]
|
||||||
|
|
||||||
|
for objective in objectives_to_test:
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print(f"OPTIMIZING FOR: {objective.upper()}")
|
||||||
|
print(f"{'='*80}")
|
||||||
|
|
||||||
|
optimizer = PersistenceOptimizer(
|
||||||
|
optimization_budget=30, # Reasonable for demo
|
||||||
|
simulation_duration=15.0, # Shorter sims for faster optimization
|
||||||
|
arrival_rate=1000.0,
|
||||||
|
objective_metric=objective,
|
||||||
|
random_seed=42
|
||||||
|
)
|
||||||
|
|
||||||
|
# Run optimization
|
||||||
|
summary = optimizer.run_optimization()
|
||||||
|
optimizer.print_optimization_summary(summary)
|
||||||
|
|
||||||
|
# Generate plots
|
||||||
|
try:
|
||||||
|
optimizer.plot_optimization_progress(f'optimization_{objective}.png')
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Could not generate plots: {e}")
|
||||||
|
|
||||||
|
print(f"\nOptimization for {objective} completed!")
|
||||||
|
print("="*80)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
748
latency_sim/persistence_simulation.py
Normal file
748
latency_sim/persistence_simulation.py
Normal file
@@ -0,0 +1,748 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Persistence Thread Simulation
|
||||||
|
|
||||||
|
Simulates the persistence thread design from persistence.md to analyze
|
||||||
|
commit latency distributions with Poisson arrival times and realistic
|
||||||
|
S3 response characteristics.
|
||||||
|
|
||||||
|
Key metrics tracked:
|
||||||
|
- End-to-end commit latency (arrival to acknowledgment)
|
||||||
|
- Batch processing latencies
|
||||||
|
- Queue depths and flow control behavior
|
||||||
|
- Retry patterns and failure handling
|
||||||
|
"""
|
||||||
|
|
||||||
|
import heapq
|
||||||
|
import random
|
||||||
|
import statistics
|
||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import List, Optional, Dict, Tuple
|
||||||
|
import numpy as np
|
||||||
|
import matplotlib.pyplot as plt
|
||||||
|
from collections import defaultdict, deque
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Commit:
|
||||||
|
"""Represents a single commit request"""
|
||||||
|
commit_id: int
|
||||||
|
arrival_time: float
|
||||||
|
size_bytes: int = 1024 # Default 1KB per commit
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Batch:
|
||||||
|
"""Represents a batch of commits being processed"""
|
||||||
|
batch_id: int
|
||||||
|
commits: List[Commit]
|
||||||
|
created_time: float
|
||||||
|
size_bytes: int = field(init=False)
|
||||||
|
retry_count: int = 0
|
||||||
|
|
||||||
|
def __post_init__(self):
|
||||||
|
self.size_bytes = sum(c.size_bytes for c in self.commits)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class InFlightRequest:
|
||||||
|
"""Tracks an in-flight S3 request"""
|
||||||
|
batch: Batch
|
||||||
|
start_time: float
|
||||||
|
expected_completion: float
|
||||||
|
connection_id: int
|
||||||
|
|
||||||
|
|
||||||
|
class PersistenceSimulation:
|
||||||
|
"""
|
||||||
|
Simulates the persistence thread behavior described in persistence.md
|
||||||
|
|
||||||
|
For S3 latency, we use a Gamma distribution which is recommended for
|
||||||
|
modeling network service response times because:
|
||||||
|
- It has a natural lower bound (minimum network RTT)
|
||||||
|
- It can model the right-skewed tail typical of network services
|
||||||
|
- It captures both typical fast responses and occasional slow responses
|
||||||
|
- Shape parameter controls the heaviness of the tail
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self,
|
||||||
|
# Configuration from persistence.md defaults
|
||||||
|
batch_timeout_ms: float = 5.0,
|
||||||
|
batch_size_threshold: int = 1024 * 1024, # 1MB
|
||||||
|
max_in_flight_requests: int = 5,
|
||||||
|
max_retry_attempts: int = 3,
|
||||||
|
retry_base_delay_ms: float = 100.0,
|
||||||
|
|
||||||
|
# S3 latency modeling (Gamma distribution parameters)
|
||||||
|
s3_latency_shape: float = 2.0, # Shape parameter (α)
|
||||||
|
s3_latency_scale: float = 25.0, # Scale parameter (β)
|
||||||
|
s3_failure_rate: float = 0.01, # 1% failure rate
|
||||||
|
|
||||||
|
# Arrival rate modeling
|
||||||
|
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
|
||||||
|
|
||||||
|
# Simulation parameters
|
||||||
|
simulation_duration_sec: float = 60.0):
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
self.batch_timeout_ms = batch_timeout_ms
|
||||||
|
self.batch_size_threshold = batch_size_threshold
|
||||||
|
self.max_in_flight_requests = max_in_flight_requests
|
||||||
|
self.max_retry_attempts = max_retry_attempts
|
||||||
|
self.retry_base_delay_ms = retry_base_delay_ms
|
||||||
|
|
||||||
|
# S3 modeling parameters
|
||||||
|
self.s3_latency_shape = s3_latency_shape
|
||||||
|
self.s3_latency_scale = s3_latency_scale
|
||||||
|
self.s3_failure_rate = s3_failure_rate
|
||||||
|
|
||||||
|
# Arrival modeling
|
||||||
|
self.arrival_rate_per_sec = arrival_rate_per_sec
|
||||||
|
self.simulation_duration_sec = simulation_duration_sec
|
||||||
|
|
||||||
|
# Simulation state
|
||||||
|
self.current_time = 0.0
|
||||||
|
self.event_queue = [] # Priority queue of (time, event_type, event_data)
|
||||||
|
self.pending_commits = deque()
|
||||||
|
self.current_batch = []
|
||||||
|
self.batch_start_time = None # Will be set when first commit added to batch
|
||||||
|
self.in_flight_requests: Dict[int, InFlightRequest] = {}
|
||||||
|
self.next_batch_id = 0
|
||||||
|
self.next_connection_id = 0
|
||||||
|
self.next_commit_id = 0
|
||||||
|
|
||||||
|
# Metrics collection
|
||||||
|
self.completed_commits = []
|
||||||
|
self.batch_metrics = []
|
||||||
|
self.retry_counts = defaultdict(int)
|
||||||
|
self.queue_depth_samples = []
|
||||||
|
self.timeline_events = [] # For debugging/visualization
|
||||||
|
|
||||||
|
# Random number generators
|
||||||
|
self.rng = random.Random(42) # Reproducible results
|
||||||
|
self.np_rng = np.random.RandomState(42)
|
||||||
|
|
||||||
|
def sample_s3_latency(self, batch_size_bytes: int = 0) -> float:
|
||||||
|
"""
|
||||||
|
Sample S3 response latency using Gamma distribution with size-dependent scaling.
|
||||||
|
|
||||||
|
Gamma distribution is ideal for S3 latency because:
|
||||||
|
- Shape=2.0, Scale=15.0 gives variable latency around the mean
|
||||||
|
- Minimum 30ms RTT prevents unrealistic sub-network responses
|
||||||
|
- Right-skewed tail captures occasional slow responses
|
||||||
|
- Models the reality that most responses are fast but some are slow
|
||||||
|
|
||||||
|
Size-dependent scaling:
|
||||||
|
- Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean
|
||||||
|
- Linear scaling with size: +20ms per MB
|
||||||
|
- Large requests (1MB) average ~80ms with similar tail behavior
|
||||||
|
"""
|
||||||
|
# Minimum network RTT (realistic for cloud storage)
|
||||||
|
min_rtt = 30.0 # 30ms minimum round-trip time
|
||||||
|
|
||||||
|
# Variable latency component from Gamma distribution
|
||||||
|
variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale)
|
||||||
|
|
||||||
|
# Size-dependent scaling: +20ms per MB
|
||||||
|
size_mb = batch_size_bytes / (1024 * 1024)
|
||||||
|
size_penalty = size_mb * 20.0 # 20ms per MB
|
||||||
|
|
||||||
|
return min_rtt + variable_latency + size_penalty
|
||||||
|
|
||||||
|
def sample_inter_arrival_time(self) -> float:
|
||||||
|
"""Sample time between commit arrivals using Poisson process"""
|
||||||
|
return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec)
|
||||||
|
|
||||||
|
def sample_commit_size(self) -> int:
|
||||||
|
"""
|
||||||
|
Sample commit size with realistic distribution including large commits.
|
||||||
|
|
||||||
|
Distribution:
|
||||||
|
- 70% small commits: 500B - 10KB (typical operations)
|
||||||
|
- 25% medium commits: 10KB - 100KB (batch operations, large documents)
|
||||||
|
- 5% large commits: 100KB - 1MB (bulk data, file uploads)
|
||||||
|
|
||||||
|
This creates a realistic mix where most commits are small but some
|
||||||
|
can trigger size-based batching or become single-commit batches.
|
||||||
|
"""
|
||||||
|
rand = self.rng.random()
|
||||||
|
|
||||||
|
if rand < 0.70: # 70% small commits
|
||||||
|
return self.rng.randint(500, 10 * 1024) # 500B - 10KB
|
||||||
|
elif rand < 0.95: # 25% medium commits
|
||||||
|
return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB
|
||||||
|
else: # 5% large commits
|
||||||
|
return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB
|
||||||
|
|
||||||
|
def schedule_event(self, time: float, event_type: str, data=None):
|
||||||
|
"""Add event to priority queue"""
|
||||||
|
heapq.heappush(self.event_queue, (time, event_type, data))
|
||||||
|
|
||||||
|
def should_process_batch(self) -> bool:
|
||||||
|
"""Check if current batch should be processed based on triggers"""
|
||||||
|
if not self.current_batch:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Size trigger
|
||||||
|
batch_size = sum(c.size_bytes for c in self.current_batch)
|
||||||
|
if batch_size >= self.batch_size_threshold:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Time trigger - only if we have a valid batch start time
|
||||||
|
if self.batch_start_time is not None:
|
||||||
|
if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def can_start_new_request(self) -> bool:
|
||||||
|
"""Check if we can start a new request based on flow control"""
|
||||||
|
return len(self.in_flight_requests) < self.max_in_flight_requests
|
||||||
|
|
||||||
|
def process_current_batch(self):
|
||||||
|
"""Process the current batch if conditions are met"""
|
||||||
|
if not self.current_batch or not self.can_start_new_request():
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.should_process_batch():
|
||||||
|
batch = Batch(
|
||||||
|
batch_id=self.next_batch_id,
|
||||||
|
commits=self.current_batch.copy(),
|
||||||
|
created_time=self.current_time
|
||||||
|
)
|
||||||
|
self.next_batch_id += 1
|
||||||
|
|
||||||
|
# Clear current batch
|
||||||
|
self.current_batch.clear()
|
||||||
|
self.batch_start_time = None # Reset to None, will be set on next batch
|
||||||
|
|
||||||
|
self.send_batch_to_s3(batch)
|
||||||
|
|
||||||
|
def send_batch_to_s3(self, batch: Batch, is_retry: bool = False):
|
||||||
|
"""Send batch to S3 and track as in-flight request"""
|
||||||
|
if not self.can_start_new_request():
|
||||||
|
# This shouldn't happen due to flow control, but handle gracefully
|
||||||
|
self.schedule_event(self.current_time + 0.001, 'retry_batch', batch)
|
||||||
|
return
|
||||||
|
|
||||||
|
# Sample S3 response characteristics (pass batch size for latency modeling)
|
||||||
|
s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds
|
||||||
|
will_fail = self.rng.random() < self.s3_failure_rate
|
||||||
|
|
||||||
|
if will_fail:
|
||||||
|
s3_latency *= 2 # Failed requests typically take longer
|
||||||
|
|
||||||
|
completion_time = self.current_time + s3_latency
|
||||||
|
connection_id = self.next_connection_id
|
||||||
|
self.next_connection_id += 1
|
||||||
|
|
||||||
|
# Track in-flight request
|
||||||
|
in_flight = InFlightRequest(
|
||||||
|
batch=batch,
|
||||||
|
start_time=self.current_time,
|
||||||
|
expected_completion=completion_time,
|
||||||
|
connection_id=connection_id
|
||||||
|
)
|
||||||
|
self.in_flight_requests[connection_id] = in_flight
|
||||||
|
|
||||||
|
# Schedule completion event
|
||||||
|
if will_fail:
|
||||||
|
self.schedule_event(completion_time, 'batch_failed', connection_id)
|
||||||
|
else:
|
||||||
|
self.schedule_event(completion_time, 'batch_completed', connection_id)
|
||||||
|
|
||||||
|
# Log event for analysis
|
||||||
|
self.timeline_events.append({
|
||||||
|
'time': self.current_time,
|
||||||
|
'event': 'batch_sent',
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'batch_size': batch.size_bytes,
|
||||||
|
'commit_count': len(batch.commits),
|
||||||
|
'retry_count': batch.retry_count,
|
||||||
|
'is_retry': is_retry
|
||||||
|
})
|
||||||
|
|
||||||
|
def handle_batch_completed(self, connection_id: int):
|
||||||
|
"""Handle successful batch completion"""
|
||||||
|
if connection_id not in self.in_flight_requests:
|
||||||
|
return
|
||||||
|
|
||||||
|
in_flight = self.in_flight_requests.pop(connection_id)
|
||||||
|
batch = in_flight.batch
|
||||||
|
|
||||||
|
# Calculate metrics
|
||||||
|
batch_latency = self.current_time - batch.created_time
|
||||||
|
self.batch_metrics.append({
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'latency': batch_latency,
|
||||||
|
'size_bytes': batch.size_bytes,
|
||||||
|
'commit_count': len(batch.commits),
|
||||||
|
'retry_count': batch.retry_count
|
||||||
|
})
|
||||||
|
|
||||||
|
# Mark commits as completed and calculate end-to-end latency
|
||||||
|
for commit in batch.commits:
|
||||||
|
commit_latency = self.current_time - commit.arrival_time
|
||||||
|
self.completed_commits.append({
|
||||||
|
'commit_id': commit.commit_id,
|
||||||
|
'arrival_time': commit.arrival_time,
|
||||||
|
'completion_time': self.current_time,
|
||||||
|
'latency': commit_latency,
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'retry_count': batch.retry_count
|
||||||
|
})
|
||||||
|
|
||||||
|
self.timeline_events.append({
|
||||||
|
'time': self.current_time,
|
||||||
|
'event': 'batch_completed',
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'latency': batch_latency,
|
||||||
|
'retry_count': batch.retry_count
|
||||||
|
})
|
||||||
|
|
||||||
|
# Try to process any pending work now that we have capacity
|
||||||
|
self.process_pending_work()
|
||||||
|
|
||||||
|
def handle_batch_failed(self, connection_id: int):
|
||||||
|
"""Handle batch failure with retry logic"""
|
||||||
|
if connection_id not in self.in_flight_requests:
|
||||||
|
return
|
||||||
|
|
||||||
|
in_flight = self.in_flight_requests.pop(connection_id)
|
||||||
|
batch = in_flight.batch
|
||||||
|
|
||||||
|
self.timeline_events.append({
|
||||||
|
'time': self.current_time,
|
||||||
|
'event': 'batch_failed',
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'retry_count': batch.retry_count
|
||||||
|
})
|
||||||
|
|
||||||
|
if batch.retry_count < self.max_retry_attempts:
|
||||||
|
# Exponential backoff retry
|
||||||
|
batch.retry_count += 1
|
||||||
|
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1))
|
||||||
|
retry_time = self.current_time + backoff_delay
|
||||||
|
|
||||||
|
self.schedule_event(retry_time, 'retry_batch', batch)
|
||||||
|
self.retry_counts[batch.retry_count] += 1
|
||||||
|
else:
|
||||||
|
# Max retries exhausted - this would be a fatal error in real system
|
||||||
|
self.timeline_events.append({
|
||||||
|
'time': self.current_time,
|
||||||
|
'event': 'batch_abandoned',
|
||||||
|
'batch_id': batch.batch_id,
|
||||||
|
'retry_count': batch.retry_count
|
||||||
|
})
|
||||||
|
|
||||||
|
def handle_retry_batch(self, batch: Batch):
|
||||||
|
"""Handle batch retry"""
|
||||||
|
self.send_batch_to_s3(batch, is_retry=True)
|
||||||
|
|
||||||
|
def process_pending_work(self):
|
||||||
|
"""Process any pending commits that can now be batched"""
|
||||||
|
# Move pending commits to current batch
|
||||||
|
while self.pending_commits and self.can_start_new_request():
|
||||||
|
if not self.current_batch:
|
||||||
|
self.batch_start_time = self.current_time
|
||||||
|
|
||||||
|
commit = self.pending_commits.popleft()
|
||||||
|
self.current_batch.append(commit)
|
||||||
|
|
||||||
|
# Check if we should process this batch immediately
|
||||||
|
if self.should_process_batch():
|
||||||
|
self.process_current_batch()
|
||||||
|
break
|
||||||
|
|
||||||
|
# If we have commits but no in-flight capacity, they stay in current_batch
|
||||||
|
# and will be processed when capacity becomes available
|
||||||
|
|
||||||
|
def handle_commit_arrival(self, commit: Commit):
|
||||||
|
"""Handle new commit arrival"""
|
||||||
|
# If we have in-flight capacity, try to add to current batch
|
||||||
|
if self.can_start_new_request():
|
||||||
|
if not self.current_batch:
|
||||||
|
self.batch_start_time = self.current_time
|
||||||
|
|
||||||
|
self.current_batch.append(commit)
|
||||||
|
self.process_current_batch() # Check if we should process now
|
||||||
|
else:
|
||||||
|
# Add to pending queue due to flow control
|
||||||
|
self.pending_commits.append(commit)
|
||||||
|
|
||||||
|
# Schedule timeout event for current batch if it's the first commit
|
||||||
|
if len(self.current_batch) == 1 and not self.pending_commits:
|
||||||
|
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
|
||||||
|
self.schedule_event(timeout_time, 'batch_timeout', None)
|
||||||
|
|
||||||
|
def handle_batch_timeout(self):
|
||||||
|
"""Handle batch timeout trigger"""
|
||||||
|
if self.current_batch and self.can_start_new_request():
|
||||||
|
self.process_current_batch()
|
||||||
|
|
||||||
|
def sample_queue_depth(self):
|
||||||
|
"""Sample current queue depths for analysis"""
|
||||||
|
pending_count = len(self.pending_commits)
|
||||||
|
current_batch_count = len(self.current_batch)
|
||||||
|
in_flight_count = len(self.in_flight_requests)
|
||||||
|
|
||||||
|
self.queue_depth_samples.append({
|
||||||
|
'time': self.current_time,
|
||||||
|
'pending_commits': pending_count,
|
||||||
|
'current_batch_size': current_batch_count,
|
||||||
|
'in_flight_requests': in_flight_count,
|
||||||
|
'total_unacknowledged': pending_count + current_batch_count +
|
||||||
|
sum(len(req.batch.commits) for req in self.in_flight_requests.values())
|
||||||
|
})
|
||||||
|
|
||||||
|
def generate_arrivals(self):
|
||||||
|
"""Generate Poisson arrival events for the simulation"""
|
||||||
|
current_time = 0.0
|
||||||
|
|
||||||
|
while current_time < self.simulation_duration_sec:
|
||||||
|
# Sample next arrival time
|
||||||
|
inter_arrival = self.sample_inter_arrival_time()
|
||||||
|
current_time += inter_arrival
|
||||||
|
|
||||||
|
if current_time >= self.simulation_duration_sec:
|
||||||
|
break
|
||||||
|
|
||||||
|
# Create commit
|
||||||
|
commit = Commit(
|
||||||
|
commit_id=self.next_commit_id,
|
||||||
|
arrival_time=current_time,
|
||||||
|
size_bytes=self.sample_commit_size() # Realistic size distribution
|
||||||
|
)
|
||||||
|
self.next_commit_id += 1
|
||||||
|
|
||||||
|
# Schedule arrival event
|
||||||
|
self.schedule_event(current_time, 'commit_arrival', commit)
|
||||||
|
|
||||||
|
# Schedule periodic queue depth sampling
|
||||||
|
sample_time = 0.0
|
||||||
|
while sample_time < self.simulation_duration_sec:
|
||||||
|
self.schedule_event(sample_time, 'sample_queue_depth', None)
|
||||||
|
sample_time += 0.1 # Sample every 100ms
|
||||||
|
|
||||||
|
def run_simulation(self):
|
||||||
|
"""Run the complete simulation"""
|
||||||
|
print(f"Starting persistence simulation...")
|
||||||
|
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
|
||||||
|
print(f"Duration: {self.simulation_duration_sec} seconds")
|
||||||
|
print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Generate all arrival events
|
||||||
|
self.generate_arrivals()
|
||||||
|
|
||||||
|
# Process events in time order
|
||||||
|
events_processed = 0
|
||||||
|
while self.event_queue and events_processed < 1000000: # Safety limit
|
||||||
|
time, event_type, data = heapq.heappop(self.event_queue)
|
||||||
|
self.current_time = time
|
||||||
|
|
||||||
|
if time > self.simulation_duration_sec:
|
||||||
|
break
|
||||||
|
|
||||||
|
if event_type == 'commit_arrival':
|
||||||
|
self.handle_commit_arrival(data)
|
||||||
|
elif event_type == 'batch_completed':
|
||||||
|
self.handle_batch_completed(data)
|
||||||
|
elif event_type == 'batch_failed':
|
||||||
|
self.handle_batch_failed(data)
|
||||||
|
elif event_type == 'retry_batch':
|
||||||
|
self.handle_retry_batch(data)
|
||||||
|
elif event_type == 'batch_timeout':
|
||||||
|
self.handle_batch_timeout()
|
||||||
|
elif event_type == 'sample_queue_depth':
|
||||||
|
self.sample_queue_depth()
|
||||||
|
|
||||||
|
events_processed += 1
|
||||||
|
|
||||||
|
print(f"Simulation completed. Processed {events_processed} events.")
|
||||||
|
return self.analyze_results()
|
||||||
|
|
||||||
|
def analyze_results(self) -> Dict:
|
||||||
|
"""Analyze simulation results and return metrics"""
|
||||||
|
if not self.completed_commits:
|
||||||
|
return {"error": "No commits completed during simulation"}
|
||||||
|
|
||||||
|
# Calculate latency statistics
|
||||||
|
latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms
|
||||||
|
|
||||||
|
results = {
|
||||||
|
'simulation_config': {
|
||||||
|
'duration_sec': self.simulation_duration_sec,
|
||||||
|
'arrival_rate_per_sec': self.arrival_rate_per_sec,
|
||||||
|
'batch_timeout_ms': self.batch_timeout_ms,
|
||||||
|
'batch_size_threshold': self.batch_size_threshold,
|
||||||
|
'max_in_flight_requests': self.max_in_flight_requests,
|
||||||
|
's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
|
||||||
|
's3_failure_rate': self.s3_failure_rate
|
||||||
|
},
|
||||||
|
'commit_metrics': {
|
||||||
|
'total_commits': len(self.completed_commits),
|
||||||
|
'latency_ms': {
|
||||||
|
'mean': statistics.mean(latencies),
|
||||||
|
'median': statistics.median(latencies),
|
||||||
|
'std': statistics.stdev(latencies) if len(latencies) > 1 else 0,
|
||||||
|
'min': min(latencies),
|
||||||
|
'max': max(latencies),
|
||||||
|
'p95': np.percentile(latencies, 95),
|
||||||
|
'p99': np.percentile(latencies, 99)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
'batch_metrics': {
|
||||||
|
'total_batches': len(self.batch_metrics),
|
||||||
|
'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]),
|
||||||
|
'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]),
|
||||||
|
'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics])
|
||||||
|
},
|
||||||
|
'retry_analysis': dict(self.retry_counts),
|
||||||
|
'queue_depth_analysis': self._analyze_queue_depths()
|
||||||
|
}
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
||||||
|
def _analyze_queue_depths(self) -> Dict:
|
||||||
|
"""Analyze queue depth patterns"""
|
||||||
|
if not self.queue_depth_samples:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||||
|
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||||
|
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||||
|
|
||||||
|
return {
|
||||||
|
'pending_commits': {
|
||||||
|
'mean': statistics.mean(pending),
|
||||||
|
'max': max(pending),
|
||||||
|
'p95': np.percentile(pending, 95)
|
||||||
|
},
|
||||||
|
'in_flight_requests': {
|
||||||
|
'mean': statistics.mean(in_flight),
|
||||||
|
'max': max(in_flight),
|
||||||
|
'p95': np.percentile(in_flight, 95)
|
||||||
|
},
|
||||||
|
'total_unacknowledged': {
|
||||||
|
'mean': statistics.mean(total_unack),
|
||||||
|
'max': max(total_unack),
|
||||||
|
'p95': np.percentile(total_unack, 95)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def plot_results(self, results: Dict, save_path: Optional[str] = None):
|
||||||
|
"""Generate visualization plots of simulation results"""
|
||||||
|
if not self.completed_commits:
|
||||||
|
print("No data to plot")
|
||||||
|
return
|
||||||
|
|
||||||
|
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||||||
|
fig.suptitle('Persistence Thread Simulation Results', fontsize=16)
|
||||||
|
|
||||||
|
# Plot 1: Commit latency histogram
|
||||||
|
latencies_ms = [c['latency'] * 1000 for c in self.completed_commits]
|
||||||
|
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black')
|
||||||
|
ax1.set_xlabel('Commit Latency (ms)')
|
||||||
|
ax1.set_ylabel('Count')
|
||||||
|
ax1.set_title('Commit Latency Distribution')
|
||||||
|
ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red',
|
||||||
|
linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms")
|
||||||
|
ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange',
|
||||||
|
linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms")
|
||||||
|
ax1.legend()
|
||||||
|
|
||||||
|
# Plot 2: Timeline of commit completions
|
||||||
|
completion_times = [c['completion_time'] for c in self.completed_commits]
|
||||||
|
completion_latencies = [c['latency'] * 1000 for c in self.completed_commits]
|
||||||
|
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
|
||||||
|
ax2.set_xlabel('Time (seconds)')
|
||||||
|
ax2.set_ylabel('Commit Latency (ms)')
|
||||||
|
ax2.set_title('Latency Over Time')
|
||||||
|
|
||||||
|
# Plot 3: Queue depth over time
|
||||||
|
if self.queue_depth_samples:
|
||||||
|
times = [s['time'] for s in self.queue_depth_samples]
|
||||||
|
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||||
|
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||||
|
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||||
|
|
||||||
|
ax3.plot(times, pending, label='Pending Commits', alpha=0.8)
|
||||||
|
ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8)
|
||||||
|
ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8)
|
||||||
|
ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--',
|
||||||
|
label=f'Max In-Flight Limit ({self.max_in_flight_requests})')
|
||||||
|
ax3.set_xlabel('Time (seconds)')
|
||||||
|
ax3.set_ylabel('Count')
|
||||||
|
ax3.set_title('Queue Depths Over Time')
|
||||||
|
ax3.legend()
|
||||||
|
|
||||||
|
# Plot 4: Batch size distribution
|
||||||
|
if self.batch_metrics:
|
||||||
|
batch_sizes = [b['commit_count'] for b in self.batch_metrics]
|
||||||
|
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black')
|
||||||
|
ax4.set_xlabel('Commits per Batch')
|
||||||
|
ax4.set_ylabel('Count')
|
||||||
|
ax4.set_title('Batch Size Distribution')
|
||||||
|
|
||||||
|
plt.tight_layout()
|
||||||
|
|
||||||
|
if save_path:
|
||||||
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||||
|
print(f"Plots saved to {save_path}")
|
||||||
|
else:
|
||||||
|
plt.show()
|
||||||
|
|
||||||
|
|
||||||
|
def print_results(results: Dict):
|
||||||
|
"""Pretty print simulation results"""
|
||||||
|
print("=" * 80)
|
||||||
|
print("PERSISTENCE THREAD SIMULATION RESULTS")
|
||||||
|
print("=" * 80)
|
||||||
|
|
||||||
|
# Configuration
|
||||||
|
config = results['simulation_config']
|
||||||
|
print(f"\nConfiguration:")
|
||||||
|
print(f" Duration: {config['duration_sec']}s")
|
||||||
|
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
|
||||||
|
print(f" Batch Timeout: {config['batch_timeout_ms']}ms")
|
||||||
|
print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes")
|
||||||
|
print(f" Max In-Flight: {config['max_in_flight_requests']}")
|
||||||
|
print(f" S3 Latency: {config['s3_latency_params']}")
|
||||||
|
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
|
||||||
|
|
||||||
|
# Commit metrics
|
||||||
|
commit_metrics = results['commit_metrics']
|
||||||
|
latency = commit_metrics['latency_ms']
|
||||||
|
print(f"\nCommit Performance:")
|
||||||
|
print(f" Total Commits: {commit_metrics['total_commits']:,}")
|
||||||
|
print(f" Latency Mean: {latency['mean']:.2f}ms")
|
||||||
|
print(f" Latency Median: {latency['median']:.2f}ms")
|
||||||
|
print(f" Latency P95: {latency['p95']:.2f}ms")
|
||||||
|
print(f" Latency P99: {latency['p99']:.2f}ms")
|
||||||
|
print(f" Latency Std: {latency['std']:.2f}ms")
|
||||||
|
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
|
||||||
|
|
||||||
|
# Batch metrics
|
||||||
|
batch_metrics = results['batch_metrics']
|
||||||
|
print(f"\nBatching Performance:")
|
||||||
|
print(f" Total Batches: {batch_metrics['total_batches']:,}")
|
||||||
|
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||||
|
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||||||
|
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
|
||||||
|
|
||||||
|
# Retry analysis
|
||||||
|
if results['retry_analysis']:
|
||||||
|
print(f"\nRetry Analysis:")
|
||||||
|
for retry_count, occurrences in results['retry_analysis'].items():
|
||||||
|
print(f" {occurrences:,} batches required {retry_count} retries")
|
||||||
|
|
||||||
|
# Queue depth analysis
|
||||||
|
if results['queue_depth_analysis']:
|
||||||
|
queue_analysis = results['queue_depth_analysis']
|
||||||
|
print(f"\nQueue Depth Analysis:")
|
||||||
|
if 'pending_commits' in queue_analysis:
|
||||||
|
pending = queue_analysis['pending_commits']
|
||||||
|
print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}")
|
||||||
|
if 'in_flight_requests' in queue_analysis:
|
||||||
|
in_flight = queue_analysis['in_flight_requests']
|
||||||
|
print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}")
|
||||||
|
if 'total_unacknowledged' in queue_analysis:
|
||||||
|
total = queue_analysis['total_unacknowledged']
|
||||||
|
print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
print("Running Persistence Thread Configuration Analysis")
|
||||||
|
print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)")
|
||||||
|
print("Testing different configurations to optimize latency...")
|
||||||
|
print()
|
||||||
|
|
||||||
|
# Test configurations with different max_in_flight values
|
||||||
|
configs = [
|
||||||
|
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
|
||||||
|
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
|
||||||
|
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
|
||||||
|
{"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
|
||||||
|
{"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
|
||||||
|
]
|
||||||
|
|
||||||
|
results_comparison = []
|
||||||
|
|
||||||
|
for config in configs:
|
||||||
|
print(f"\n{'='*60}")
|
||||||
|
print(f"Testing: {config['name']}")
|
||||||
|
print(f"{'='*60}")
|
||||||
|
|
||||||
|
sim = PersistenceSimulation(
|
||||||
|
arrival_rate_per_sec=1000.0,
|
||||||
|
simulation_duration_sec=30.0,
|
||||||
|
s3_latency_shape=2.0,
|
||||||
|
s3_latency_scale=25.0,
|
||||||
|
s3_failure_rate=0.01,
|
||||||
|
max_in_flight_requests=config.get("max_in_flight_requests", 5),
|
||||||
|
batch_timeout_ms=config.get("batch_timeout_ms", 5.0)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = sim.run_simulation()
|
||||||
|
results["config_name"] = config["name"]
|
||||||
|
results_comparison.append(results)
|
||||||
|
|
||||||
|
# Print key metrics for quick comparison
|
||||||
|
commit_metrics = results['commit_metrics']
|
||||||
|
batch_metrics = results['batch_metrics']
|
||||||
|
queue_metrics = results.get('queue_depth_analysis', {})
|
||||||
|
|
||||||
|
print(f"\nKey Metrics:")
|
||||||
|
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
|
||||||
|
print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms")
|
||||||
|
print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms")
|
||||||
|
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||||
|
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||||||
|
if queue_metrics:
|
||||||
|
print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}")
|
||||||
|
print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}")
|
||||||
|
|
||||||
|
# Summary comparison
|
||||||
|
print(f"\n{'='*80}")
|
||||||
|
print("CONFIGURATION COMPARISON SUMMARY")
|
||||||
|
print(f"{'='*80}")
|
||||||
|
print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}")
|
||||||
|
print(f"{'-'*80}")
|
||||||
|
|
||||||
|
for result in results_comparison:
|
||||||
|
name = result["config_name"]
|
||||||
|
commit_metrics = result['commit_metrics']
|
||||||
|
queue_metrics = result.get('queue_depth_analysis', {})
|
||||||
|
mean_lat = commit_metrics['latency_ms']['mean']
|
||||||
|
p95_lat = commit_metrics['latency_ms']['p95']
|
||||||
|
p99_lat = commit_metrics['latency_ms']['p99']
|
||||||
|
avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0)
|
||||||
|
|
||||||
|
print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}")
|
||||||
|
|
||||||
|
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
|
||||||
|
print(f"Note: Higher in-flight allows more parallelism but may increase queue variability")
|
||||||
|
|
||||||
|
# Generate plots for best configuration
|
||||||
|
best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95'])
|
||||||
|
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Re-run best config to get simulation object for plotting
|
||||||
|
best_params = next(c for c in configs if c['name'] == best_config['config_name'])
|
||||||
|
sim_best = PersistenceSimulation(
|
||||||
|
arrival_rate_per_sec=1000.0,
|
||||||
|
simulation_duration_sec=30.0,
|
||||||
|
s3_latency_shape=2.0,
|
||||||
|
s3_latency_scale=25.0,
|
||||||
|
s3_failure_rate=0.01,
|
||||||
|
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
|
||||||
|
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0)
|
||||||
|
)
|
||||||
|
sim_best.run_simulation()
|
||||||
|
sim_best.plot_results(best_config, f'persistence_optimization_results.png')
|
||||||
|
except Exception as e:
|
||||||
|
print(f"\nCould not generate plots: {e}")
|
||||||
|
print("Install matplotlib and numpy to enable visualization")
|
||||||
111
persistence.md
Normal file
111
persistence.md
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
# Persistence Thread Design
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
The persistence thread receives commit batches from the main processing pipeline and uploads them to S3. It uses a single-threaded design with connection pooling and batching for optimal performance.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
**Input**: Commits arrive via `ThreadPipeline` interface from upstream processing
|
||||||
|
**Output**: Batched commits uploaded to S3 persistence backend
|
||||||
|
**Transport**: Single-threaded TCP client with connection pooling
|
||||||
|
**Protocol**: Higher layers handle HTTP, authentication, and S3-specific details
|
||||||
|
|
||||||
|
## Batching Strategy
|
||||||
|
|
||||||
|
The persistence thread collects commits into batches using two trigger conditions:
|
||||||
|
|
||||||
|
1. **Time Trigger**: `batch_timeout_ms` elapsed since batch collection started
|
||||||
|
2. **Size Trigger**: `batch_size_threshold` commits collected (can be exceeded by final commit)
|
||||||
|
|
||||||
|
**Flow Control**: When `max_in_flight_requests` reached, block until responses received.
|
||||||
|
|
||||||
|
## Main Processing Loop
|
||||||
|
|
||||||
|
### 1. Batch Collection
|
||||||
|
|
||||||
|
**No In-Flight Requests**:
|
||||||
|
- Use blocking acquire to get first commit batch
|
||||||
|
- Process immediately (no batching delay)
|
||||||
|
|
||||||
|
**With In-Flight Requests**:
|
||||||
|
- Check flow control: if at `max_in_flight_requests`, block for responses
|
||||||
|
- Collect commits using non-blocking acquire until trigger condition:
|
||||||
|
- Check for available commits (non-blocking)
|
||||||
|
- If `batch_size_threshold` reached → process batch immediately
|
||||||
|
- If below threshold → use `epoll_wait(batch_timeout_ms)` for I/O and timeout
|
||||||
|
- On timeout → process collected commits
|
||||||
|
- If no commits available and no in-flight requests → switch to blocking acquire
|
||||||
|
|
||||||
|
### 2. Connection Management
|
||||||
|
|
||||||
|
- Acquire healthy connection from pool
|
||||||
|
- Create new connections if pool below `target_pool_size`
|
||||||
|
- If no healthy connections available, block until one becomes available
|
||||||
|
- Maintain automatic pool replenishment
|
||||||
|
|
||||||
|
### 3. Data Transmission
|
||||||
|
|
||||||
|
- Write batch data to S3 connection using appropriate protocol
|
||||||
|
- Publish accepted transactions to subscriber system
|
||||||
|
- Track request as in-flight for flow control
|
||||||
|
|
||||||
|
### 4. I/O Event Processing
|
||||||
|
|
||||||
|
- Handle epoll events for all in-flight connections
|
||||||
|
- Monitor connection health via heartbeats
|
||||||
|
- Process incoming responses and detect connection failures
|
||||||
|
|
||||||
|
### 5. Response Handling
|
||||||
|
|
||||||
|
- **Ordered Acknowledgment**: Only acknowledge batch after all prior batches are durable
|
||||||
|
- Release batch via `StageGuard` destructor (publishes to next pipeline stage)
|
||||||
|
- Publish durability events to subscriber system
|
||||||
|
- Return healthy connection to pool
|
||||||
|
|
||||||
|
### 6. Failure Handling
|
||||||
|
|
||||||
|
- Remove failed connection from pool
|
||||||
|
- Retry batch with exponential backoff (up to `max_retry_attempts`)
|
||||||
|
- Backoff delays only affect the specific failing batch
|
||||||
|
- If retries exhausted, abort process or escalate error
|
||||||
|
- Initiate pool replenishment if below target
|
||||||
|
|
||||||
|
## Connection Pool
|
||||||
|
|
||||||
|
**Target Size**: `target_pool_size` connections (recommended: 2x `max_in_flight_requests`)
|
||||||
|
**Replenishment**: Automatic creation when below target
|
||||||
|
**Health Monitoring**: Heartbeat-based connection validation
|
||||||
|
**Sizing Rationale**: 2x multiplier ensures availability during peak load and connection replacement
|
||||||
|
|
||||||
|
## Key Design Properties
|
||||||
|
|
||||||
|
**Batch Ordering**: Batches may be retried out-of-order for performance, but acknowledgment to next pipeline stage maintains strict ordering.
|
||||||
|
|
||||||
|
**Backpressure**: Retry delays for failing batches create natural backpressure that eventually blocks the persistence thread when in-flight limits are reached.
|
||||||
|
|
||||||
|
**Graceful Shutdown**: On shutdown signal, drain all in-flight batches to completion before terminating.
|
||||||
|
|
||||||
|
## Configuration Parameters
|
||||||
|
|
||||||
|
| Parameter | Default | Description |
|
||||||
|
|-----------|---------|-------------|
|
||||||
|
| `batch_timeout_ms` | 1ms | Maximum time to wait collecting commits for batching |
|
||||||
|
| `batch_size_threshold` | 1MB | Threshold for triggering batch processing |
|
||||||
|
| `max_in_flight_requests` | 50 | Maximum concurrent requests to persistence backend |
|
||||||
|
| `target_pool_size` | 2x in-flight | Target number of connections to maintain |
|
||||||
|
| `max_retry_attempts` | 3 | Maximum retries for failed batches before aborting |
|
||||||
|
| `retry_base_delay_ms` | 100ms | Base delay for exponential backoff retries |
|
||||||
|
|
||||||
|
## Configuration Validation
|
||||||
|
|
||||||
|
**Required Constraints**:
|
||||||
|
- `batch_size_threshold` > 0 (must process at least one commit per batch)
|
||||||
|
- `max_in_flight_requests` > 0 (must allow at least one concurrent request)
|
||||||
|
- `target_pool_size` >= `max_in_flight_requests` (pool must accommodate all in-flight requests)
|
||||||
|
- `batch_timeout_ms` > 0 (timeout must be positive)
|
||||||
|
- `max_retry_attempts` >= 0 (zero disables retries)
|
||||||
|
- `retry_base_delay_ms` > 0 (delay must be positive if retries enabled)
|
||||||
|
|
||||||
|
**Performance Recommendations**:
|
||||||
|
- `target_pool_size` <= 2x `max_in_flight_requests` (optimal for performance)
|
||||||
Reference in New Issue
Block a user