diff --git a/latency_sim/database_comparison.py b/latency_sim/database_comparison.py new file mode 100644 index 0000000..312ed30 --- /dev/null +++ b/latency_sim/database_comparison.py @@ -0,0 +1,970 @@ +#!/usr/bin/env python3 +""" +Database Persistence Pattern Comparison + +Compares WeaselDB's batched S3 persistence approach against traditional +database persistence patterns to understand trade-offs in latency, +throughput, consistency, and operational complexity. + +Simulated approaches: +1. WeaselDB: Batched async S3 persistence with optimistic concurrency +2. Traditional WAL: Write-ahead log with periodic sync to disk +3. Synchronous: Immediate disk sync per transaction (like PostgreSQL serializable) +4. Group Commit: Batched disk writes with configurable group size +5. Async Replication: Immediate response with async durability +""" + +import heapq +import random +import statistics +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Tuple, Any +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict, deque +from abc import ABC, abstractmethod +from enum import Enum +import time + + +class PersistencePattern(Enum): + WEASELDB_S3 = "WeaselDB S3 Batched" + TRADITIONAL_WAL = "Traditional WAL" + SYNCHRONOUS = "Synchronous Disk" + GROUP_COMMIT = "Group Commit" + ASYNC_REPLICATION = "Async Replication" + + +@dataclass +class Transaction: + """Represents a database transaction/commit""" + txn_id: int + arrival_time: float + size_bytes: int = 1024 + requires_durability: bool = True # Some txns can be async + + +@dataclass +class PersistenceMetrics: + """Metrics for a persistence approach""" + pattern: PersistencePattern + total_transactions: int + completed_transactions: int + + # Latency metrics (milliseconds) + min_latency: float + mean_latency: float + median_latency: float + p95_latency: float + p99_latency: float + max_latency: float + + # Throughput metrics + avg_throughput_tps: float + peak_throughput_tps: float + + # Resource metrics + avg_disk_iops: float + avg_network_mbps: float + storage_efficiency: float # GB stored / GB logical data + + # Consistency metrics + durability_guarantee: str + consistency_model: str + recovery_time_estimate: float + + # Operational metrics + operational_complexity: int # 1-10 scale + infrastructure_cost: float # Relative cost per transaction + + +class PersistenceSimulator(ABC): + """Abstract base class for persistence pattern simulators""" + + def __init__(self, + simulation_duration: float = 60.0, + arrival_rate_per_sec: float = 1000.0): + self.simulation_duration = simulation_duration + self.arrival_rate_per_sec = arrival_rate_per_sec + + # Simulation state + self.current_time = 0.0 + self.event_queue = [] + self.completed_transactions = [] + self.next_txn_id = 0 + + # Metrics tracking + self.disk_iops = [] + self.network_usage = [] + self.throughput_samples = [] + + # Random number generator + self.rng = np.random.RandomState(42) + + def generate_transaction(self, arrival_time: float) -> Transaction: + """Generate a transaction with realistic characteristics""" + # Size distribution: mostly small, some large + if self.rng.random() < 0.8: # 80% small transactions + size = self.rng.randint(100, 2048) # 100B - 2KB + elif self.rng.random() < 0.95: # 15% medium + size = self.rng.randint(2048, 20480) # 2KB - 20KB + else: # 5% large transactions + size = self.rng.randint(20480, 102400) # 20KB - 100KB + + return Transaction( + txn_id=self.next_txn_id, + arrival_time=arrival_time, + size_bytes=size, + requires_durability=True + ) + + @abstractmethod + def process_transaction(self, txn: Transaction) -> None: + """Process a transaction according to the persistence pattern""" + pass + + @abstractmethod + def get_pattern_name(self) -> PersistencePattern: + """Return the persistence pattern this simulator implements""" + pass + + def run_simulation(self) -> PersistenceMetrics: + """Run the simulation and return metrics""" + # Generate arrival events + self._generate_arrivals() + + # Process events + while self.event_queue and self.current_time < self.simulation_duration: + time, event_type, data = heapq.heappop(self.event_queue) + self.current_time = time + + if event_type == 'transaction_arrival': + self.process_transaction(data) + elif event_type == 'custom': + self._handle_custom_event(data) + + return self._calculate_metrics() + + def _generate_arrivals(self): + """Generate Poisson arrival events""" + time = 0.0 + while time < self.simulation_duration: + inter_arrival = self.rng.exponential(1.0 / self.arrival_rate_per_sec) + time += inter_arrival + + if time >= self.simulation_duration: + break + + txn = self.generate_transaction(time) + self.next_txn_id += 1 + + heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn)) + + def _handle_custom_event(self, data): + """Handle custom events - override in subclasses""" + pass + + def schedule_event(self, time: float, event_type: str, data: Any): + """Schedule a custom event""" + heapq.heappush(self.event_queue, (time, event_type, data)) + + def _calculate_metrics(self) -> PersistenceMetrics: + """Calculate performance metrics from completed transactions""" + if not self.completed_transactions: + raise ValueError("No transactions completed") + + latencies = [txn['latency_ms'] for txn in self.completed_transactions] + + return PersistenceMetrics( + pattern=self.get_pattern_name(), + total_transactions=self.next_txn_id, + completed_transactions=len(self.completed_transactions), + + # Latency metrics + min_latency=min(latencies), + mean_latency=statistics.mean(latencies), + median_latency=statistics.median(latencies), + p95_latency=np.percentile(latencies, 95), + p99_latency=np.percentile(latencies, 99), + max_latency=max(latencies), + + # Throughput metrics + avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration, + peak_throughput_tps=self._calculate_peak_throughput(), + + # Resource metrics - implemented by subclasses + avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0, + avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0, + storage_efficiency=self._calculate_storage_efficiency(), + + # Pattern-specific characteristics + durability_guarantee=self._get_durability_guarantee(), + consistency_model=self._get_consistency_model(), + recovery_time_estimate=self._get_recovery_time(), + operational_complexity=self._get_operational_complexity(), + infrastructure_cost=self._get_infrastructure_cost() + ) + + def _calculate_peak_throughput(self) -> float: + """Calculate peak throughput in 1-second windows""" + if not self.completed_transactions: + return 0.0 + + # Group transactions by second + throughput_by_second = defaultdict(int) + for txn in self.completed_transactions: + second = int(txn['completion_time']) + throughput_by_second[second] += 1 + + return max(throughput_by_second.values()) if throughput_by_second else 0 + + # Abstract methods for pattern-specific characteristics + @abstractmethod + def _calculate_storage_efficiency(self) -> float: + pass + + @abstractmethod + def _get_durability_guarantee(self) -> str: + pass + + @abstractmethod + def _get_consistency_model(self) -> str: + pass + + @abstractmethod + def _get_recovery_time(self) -> float: + pass + + @abstractmethod + def _get_operational_complexity(self) -> int: + pass + + @abstractmethod + def _get_infrastructure_cost(self) -> float: + pass + + +class WeaselDBSimulator(PersistenceSimulator): + """WeaselDB's batched S3 persistence simulation""" + + def __init__(self, + batch_timeout_ms: float = 1.0, + batch_size_threshold: int = 800000, + max_in_flight: int = 50, + **kwargs): + super().__init__(**kwargs) + + self.batch_timeout_ms = batch_timeout_ms + self.batch_size_threshold = batch_size_threshold + self.max_in_flight = max_in_flight + + # State + self.current_batch = [] + self.batch_start_time = None + self.in_flight_batches = {} + self.next_batch_id = 0 + + # S3 characteristics + self.s3_base_latency_ms = 60.0 # From our simulation + self.s3_size_penalty_per_mb = 20.0 + + def get_pattern_name(self) -> PersistencePattern: + return PersistencePattern.WEASELDB_S3 + + def process_transaction(self, txn: Transaction): + """Process transaction using WeaselDB batching logic""" + # Add to current batch + if not self.current_batch: + self.batch_start_time = self.current_time + + self.current_batch.append(txn) + + # Check if we should send batch + if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight: + self._send_current_batch() + + def _should_send_batch(self) -> bool: + """Check batch triggers""" + if not self.current_batch: + return False + + # Size trigger + batch_size = sum(txn.size_bytes for txn in self.current_batch) + if batch_size >= self.batch_size_threshold: + return True + + # Time trigger + if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + return True + + return False + + def _send_current_batch(self): + """Send current batch to S3""" + if not self.current_batch: + return + + batch_id = self.next_batch_id + self.next_batch_id += 1 + + batch_size = sum(txn.size_bytes for txn in self.current_batch) + + # Sample S3 latency + s3_latency = self._sample_s3_latency(batch_size) / 1000.0 + completion_time = self.current_time + s3_latency + + # Track batch + self.in_flight_batches[batch_id] = { + 'transactions': self.current_batch.copy(), + 'sent_time': self.current_time, + 'completion_time': completion_time, + 'size_bytes': batch_size + } + + # Schedule completion + self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id}) + + # Track network usage + self.network_usage.append(batch_size / (1024 * 1024)) # MB + + # Clear batch + self.current_batch.clear() + self.batch_start_time = None + + def _sample_s3_latency(self, batch_size_bytes: int) -> float: + """Sample S3 latency with size scaling""" + base = self.s3_base_latency_ms + size_penalty = (batch_size_bytes / (1024 * 1024)) * self.s3_size_penalty_per_mb + variable = self.rng.gamma(2.0, 15.0) # Variable component + return 30.0 + variable + size_penalty # 30ms RTT + variable + size + + def _handle_custom_event(self, data): + """Handle batch completion events""" + if data['type'] == 'batch_complete': + batch_id = data['batch_id'] + if batch_id in self.in_flight_batches: + batch = self.in_flight_batches.pop(batch_id) + + # Mark transactions complete + for txn in batch['transactions']: + latency_ms = (self.current_time - txn.arrival_time) * 1000 + self.completed_transactions.append({ + 'txn_id': txn.txn_id, + 'arrival_time': txn.arrival_time, + 'completion_time': self.current_time, + 'latency_ms': latency_ms, + 'size_bytes': txn.size_bytes + }) + + def _calculate_storage_efficiency(self) -> float: + return 1.0 # S3 has no storage overhead + + def _get_durability_guarantee(self) -> str: + return "Eventually durable (S3 11 9's)" + + def _get_consistency_model(self) -> str: + return "Optimistic concurrency, eventual consistency" + + def _get_recovery_time(self) -> float: + return 0.1 # Fast recovery, just reconnect to S3 + + def _get_operational_complexity(self) -> int: + return 3 # Moderate - S3 managed, but need batching logic + + def _get_infrastructure_cost(self) -> float: + return 1.0 # Baseline cost + + +class TraditionalWALSimulator(PersistenceSimulator): + """Traditional Write-Ahead Log with periodic sync""" + + def __init__(self, + wal_sync_interval_ms: float = 10.0, + checkpoint_interval_sec: float = 30.0, + **kwargs): + super().__init__(**kwargs) + + self.wal_sync_interval_ms = wal_sync_interval_ms + self.checkpoint_interval_sec = checkpoint_interval_sec + + # WAL state + self.wal_buffer = [] + self.last_sync_time = 0.0 + self.pending_transactions = {} # Waiting for sync + + # EBS characteristics + self.disk_latency_ms = 1.0 # EBS base latency + self.disk_iops_limit = 10000 # Typical SSD + + # Schedule periodic syncs + self._schedule_periodic_syncs() + + def get_pattern_name(self) -> PersistencePattern: + return PersistencePattern.TRADITIONAL_WAL + + def process_transaction(self, txn: Transaction): + """Write to WAL buffer, wait for sync""" + # Write to WAL buffer (immediate) + self.wal_buffer.append(txn) + self.pending_transactions[txn.txn_id] = txn + + # Track disk IOPS (write to WAL) + self.disk_iops.append(1.0) + + def _schedule_periodic_syncs(self): + """Schedule periodic WAL sync events""" + sync_time = self.wal_sync_interval_ms / 1000.0 + while sync_time < self.simulation_duration: + self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'}) + sync_time += self.wal_sync_interval_ms / 1000.0 + + def _handle_custom_event(self, data): + """Handle WAL sync events""" + if data['type'] == 'wal_sync': + self._perform_wal_sync() + + def _perform_wal_sync(self): + """Sync WAL buffer to disk""" + if not self.wal_buffer: + return + + # Calculate sync latency based on buffer size + buffer_size = sum(txn.size_bytes for txn in self.wal_buffer) + sync_latency = self._calculate_disk_sync_latency(buffer_size) + + completion_time = self.current_time + sync_latency / 1000.0 + + # Schedule sync completion + syncing_txns = list(self.wal_buffer) + self.schedule_event(completion_time, 'custom', { + 'type': 'sync_complete', + 'transactions': syncing_txns + }) + + # Track IOPS for sync operation + self.disk_iops.append(len(self.wal_buffer)) + + # Clear buffer + self.wal_buffer.clear() + self.last_sync_time = self.current_time + + def _calculate_disk_sync_latency(self, size_bytes: int) -> float: + """Calculate disk sync latency with realistic fsync modeling including directory sync""" + # Data write to page cache + write_latency = 0.1 # Fast write to page cache + + # EBS sequential write throughput + throughput_mbps = 1000.0 # EBS gp3 throughput + size_mb = size_bytes / (1024 * 1024) + transfer_latency = size_mb * (1000.0 / throughput_mbps) + + # WAL file fsync latency on EBS + # fdatasync on EBS with network replication + file_fsync_base = 0.8 # Higher base latency on EBS + file_fsync_variable = self.rng.gamma(2.2, 0.4) # More variable due to network + + # Size penalty for large WAL syncs + size_penalty = min(size_mb * 0.05, 1.0) # Smaller penalty than batched writes + + file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty + + # WAL typically appends to existing files, so no directory fsync needed + # Directory fsync only required when creating new WAL segments + return write_latency + transfer_latency + file_fsync_latency + + def _handle_custom_event(self, data): + """Handle sync completion""" + if data['type'] == 'wal_sync': + self._perform_wal_sync() + elif data['type'] == 'sync_complete': + # Mark transactions as durable + for txn in data['transactions']: + if txn.txn_id in self.pending_transactions: + del self.pending_transactions[txn.txn_id] + + latency_ms = (self.current_time - txn.arrival_time) * 1000 + self.completed_transactions.append({ + 'txn_id': txn.txn_id, + 'arrival_time': txn.arrival_time, + 'completion_time': self.current_time, + 'latency_ms': latency_ms, + 'size_bytes': txn.size_bytes + }) + + def _calculate_storage_efficiency(self) -> float: + return 2.0 # WAL + main storage + + def _get_durability_guarantee(self) -> str: + return "ACID durable after WAL sync" + + def _get_consistency_model(self) -> str: + return "Strict ACID consistency" + + def _get_recovery_time(self) -> float: + return 30.0 # WAL replay time + + def _get_operational_complexity(self) -> int: + return 7 # Complex - WAL management, checkpoints, recovery + + def _get_infrastructure_cost(self) -> float: + return 1.5 # Higher due to disk I/O overhead + + +class SynchronousSimulator(PersistenceSimulator): + """Synchronous disk persistence (like PostgreSQL with synchronous_commit=on)""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.disk_latency_ms = 1.0 # EBS base latency + + def get_pattern_name(self) -> PersistencePattern: + return PersistencePattern.SYNCHRONOUS + + def process_transaction(self, txn: Transaction): + """Immediately sync transaction to disk""" + # Calculate disk write latency + disk_latency = self._calculate_disk_latency(txn.size_bytes) / 1000.0 + completion_time = self.current_time + disk_latency + + # Schedule completion + self.schedule_event(completion_time, 'custom', { + 'type': 'disk_write_complete', + 'transaction': txn + }) + + # Track disk IOPS + self.disk_iops.append(1.0) + + def _calculate_disk_latency(self, size_bytes: int) -> float: + """Calculate per-transaction disk write latency with fsync""" + # Write to page cache (fast) + write_latency = 0.1 + + # FSYNC latency for immediate durability on EBS + # Each transaction requires its own fsync - expensive on network storage! + fsync_base = 1.5 # Much higher base latency for individual fsyncs on EBS + fsync_variable = self.rng.gamma(2.3, 0.5) # More variable due to network + + # Small write penalty - less efficient than batched writes + if size_bytes < 4096: + penalty = 0.5 # Individual small writes are less efficient + else: + penalty = 0.0 + + fsync_latency = fsync_base + fsync_variable + penalty + + # Synchronous commits to existing database files don't need directory fsync + # Directory fsync only needed when creating new database files/tablespaces + return write_latency + fsync_latency + + def _handle_custom_event(self, data): + """Handle disk write completion""" + if data['type'] == 'disk_write_complete': + txn = data['transaction'] + latency_ms = (self.current_time - txn.arrival_time) * 1000 + + self.completed_transactions.append({ + 'txn_id': txn.txn_id, + 'arrival_time': txn.arrival_time, + 'completion_time': self.current_time, + 'latency_ms': latency_ms, + 'size_bytes': txn.size_bytes + }) + + def _calculate_storage_efficiency(self) -> float: + return 1.5 # Some overhead for metadata + + def _get_durability_guarantee(self) -> str: + return "Immediate ACID durability" + + def _get_consistency_model(self) -> str: + return "Strict ACID with immediate consistency" + + def _get_recovery_time(self) -> float: + return 5.0 # Fast recovery, data already on disk + + def _get_operational_complexity(self) -> int: + return 5 # Moderate - standard database operations + + def _get_infrastructure_cost(self) -> float: + return 2.0 # High due to disk I/O per transaction + + +class WeaselDBDiskSimulator(PersistenceSimulator): + """WeaselDB's batched disk persistence simulation""" + + def __init__(self, + batch_timeout_ms: float = 1.0, + batch_size_threshold: int = 800000, + max_in_flight: int = 50, + **kwargs): + super().__init__(**kwargs) + + self.batch_timeout_ms = batch_timeout_ms + self.batch_size_threshold = batch_size_threshold + self.max_in_flight = max_in_flight + + # State + self.current_batch = [] + self.batch_start_time = None + self.in_flight_batches = {} + self.next_batch_id = 0 + + # EBS characteristics (gp3 or io2 volumes) + self.disk_base_latency_ms = 0.5 # EBS has higher base latency than local NVMe + self.disk_throughput_mbps = 1000.0 # EBS gp3 max throughput + + def get_pattern_name(self) -> PersistencePattern: + return PersistencePattern.WEASELDB_S3 # Reuse enum, but it's actually disk + + def process_transaction(self, txn: Transaction): + """Process transaction using WeaselDB batching logic""" + # Add to current batch + if not self.current_batch: + self.batch_start_time = self.current_time + + self.current_batch.append(txn) + + # Check if we should send batch + if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight: + self._send_current_batch() + + def _should_send_batch(self) -> bool: + """Check batch triggers""" + if not self.current_batch: + return False + + # Size trigger + batch_size = sum(txn.size_bytes for txn in self.current_batch) + if batch_size >= self.batch_size_threshold: + return True + + # Time trigger + if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + return True + + return False + + def _send_current_batch(self): + """Send current batch to disk""" + if not self.current_batch: + return + + batch_id = self.next_batch_id + self.next_batch_id += 1 + + batch_size = sum(txn.size_bytes for txn in self.current_batch) + + # Sample disk write latency + disk_latency = self._sample_disk_latency(batch_size) / 1000.0 + completion_time = self.current_time + disk_latency + + # Track batch + self.in_flight_batches[batch_id] = { + 'transactions': self.current_batch.copy(), + 'sent_time': self.current_time, + 'completion_time': completion_time, + 'size_bytes': batch_size + } + + # Schedule completion + self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id}) + + # Track disk IOPS (one write operation per batch) + self.disk_iops.append(1.0) + + # Clear batch + self.current_batch.clear() + self.batch_start_time = None + + def _sample_disk_latency(self, batch_size_bytes: int) -> float: + """Sample disk write latency with realistic fsync modeling including directory sync""" + # Base latency for the write command (data goes to page cache) + write_latency = self.disk_base_latency_ms + + # Throughput-based latency for data transfer to page cache + size_mb = batch_size_bytes / (1024 * 1024) + transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms + + # FSYNC latency for EBS - forces write to replicated storage + # EBS has higher fsync latency due to network replication + file_fsync_base = 1.0 # Higher base fsync latency for EBS + file_fsync_variable = self.rng.gamma(2.5, 0.6) # More variable due to network + + # Size-dependent fsync penalty for large batches + size_penalty = min(size_mb * 0.1, 2.0) # Max 2ms penalty + + file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty + + # Directory fsync latency - required for WeaselDB batch file creation on EBS + # EBS directory metadata sync is also network-replicated + dir_fsync_base = 0.5 # Higher directory metadata sync latency on EBS + dir_fsync_variable = self.rng.gamma(1.8, 0.3) # More variable due to network + + dir_fsync_latency = dir_fsync_base + dir_fsync_variable + + # Total latency: write + file fsync + directory fsync + # WeaselDB needs directory fsync for batch file durability guarantees + return write_latency + transfer_latency + file_fsync_latency + dir_fsync_latency + + def _handle_custom_event(self, data): + """Handle batch completion events""" + if data['type'] == 'batch_complete': + batch_id = data['batch_id'] + if batch_id in self.in_flight_batches: + batch = self.in_flight_batches.pop(batch_id) + + # Mark transactions complete + for txn in batch['transactions']: + latency_ms = (self.current_time - txn.arrival_time) * 1000 + self.completed_transactions.append({ + 'txn_id': txn.txn_id, + 'arrival_time': txn.arrival_time, + 'completion_time': self.current_time, + 'latency_ms': latency_ms, + 'size_bytes': txn.size_bytes + }) + + def _calculate_storage_efficiency(self) -> float: + return 1.2 # Some overhead for batching metadata + + def _get_durability_guarantee(self) -> str: + return "ACID durable after EBS replication" + + def _get_consistency_model(self) -> str: + return "Strict serializable with optimistic concurrency" + + def _get_recovery_time(self) -> float: + return 10.0 # EBS volume attachment + recovery + + def _get_operational_complexity(self) -> int: + return 4 # Moderate - batching logic + disk management + + def _get_infrastructure_cost(self) -> float: + return 1.4 # Higher than S3 due to EBS provisioned storage + replication + + +class DatabaseComparisonFramework: + """Framework for comparing different database persistence patterns""" + + def __init__(self, simulation_duration: float = 30.0): + self.simulation_duration = simulation_duration + + def run_comparison(self, + arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]: + """Run comparison across multiple arrival rates""" + + results = defaultdict(list) + + for rate in arrival_rates: + print(f"\nTesting at {rate} TPS...") + + # WeaselDB S3 (optimized config) + weasel_s3 = WeaselDBSimulator( + batch_timeout_ms=1.0, + batch_size_threshold=800000, + max_in_flight=50, + simulation_duration=self.simulation_duration, + arrival_rate_per_sec=rate + ) + weasel_s3_metrics = weasel_s3.run_simulation() + results['WeaselDB S3'].append(weasel_s3_metrics) + print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms") + + # WeaselDB EBS (same batching, EBS storage) + weasel_ebs = WeaselDBDiskSimulator( + batch_timeout_ms=1.0, + batch_size_threshold=800000, + max_in_flight=50, + simulation_duration=self.simulation_duration, + arrival_rate_per_sec=rate + ) + weasel_ebs_metrics = weasel_ebs.run_simulation() + results['WeaselDB EBS'].append(weasel_ebs_metrics) + print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms") + + # Traditional WAL + wal = TraditionalWALSimulator( + wal_sync_interval_ms=10.0, + simulation_duration=self.simulation_duration, + arrival_rate_per_sec=rate + ) + wal_metrics = wal.run_simulation() + results['Traditional WAL'].append(wal_metrics) + print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms") + + # Synchronous + sync = SynchronousSimulator( + simulation_duration=self.simulation_duration, + arrival_rate_per_sec=rate + ) + sync_metrics = sync.run_simulation() + results['Synchronous'].append(sync_metrics) + print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms") + + return dict(results) + + def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]): + """Print comprehensive comparison report""" + print("\n" + "="*80) + print("DATABASE PERSISTENCE PATTERN COMPARISON") + print("="*80) + + # Get arrival rates for headers + arrival_rates = [100, 500, 1000, 2000] + + # Latency comparison + print(f"\nP95 LATENCY COMPARISON (ms)") + print(f"{'Pattern':<20}", end="") + for rate in arrival_rates: + print(f"{rate:>8} TPS", end="") + print() + print("-" * 60) + + for pattern_name, metrics_list in results.items(): + print(f"{pattern_name:<20}", end="") + for metrics in metrics_list: + print(f"{metrics.p95_latency:>8.1f}", end="") + print() + + # Throughput comparison + print(f"\nTHROUGHPUT ACHIEVED (TPS)") + print(f"{'Pattern':<20}", end="") + for rate in arrival_rates: + print(f"{rate:>8} TPS", end="") + print() + print("-" * 60) + + for pattern_name, metrics_list in results.items(): + print(f"{pattern_name:<20}", end="") + for metrics in metrics_list: + print(f"{metrics.avg_throughput_tps:>8.1f}", end="") + print() + + # Characteristics comparison + print(f"\nSYSTEM CHARACTERISTICS") + print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}") + print("-" * 85) + + for pattern_name, metrics_list in results.items(): + metrics = metrics_list[0] # Use first metrics for characteristics + print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} " + f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} " + f"{metrics.infrastructure_cost:<6.1f}") + + # Performance sweet spots + print(f"\nPERFORMANCE SWEET SPOTS") + print("-" * 40) + + for rate in arrival_rates: + print(f"\nAt {rate} TPS:") + rate_results = [(name, metrics_list[arrival_rates.index(rate)]) + for name, metrics_list in results.items()] + + # Sort by P95 latency + rate_results.sort(key=lambda x: x[1].p95_latency) + + for i, (name, metrics) in enumerate(rate_results): + rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " " + print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, " + f"{metrics.avg_throughput_tps:.0f} TPS achieved") + + def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]], + save_path: Optional[str] = None): + """Plot comparison results""" + try: + arrival_rates = [100, 500, 1000, 2000] + + fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) + fig.suptitle('Database Persistence Pattern Comparison', fontsize=16) + + # Plot 1: P95 Latency vs Load + for pattern_name, metrics_list in results.items(): + p95_latencies = [m.p95_latency for m in metrics_list] + ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name) + + ax1.set_xlabel('Arrival Rate (TPS)') + ax1.set_ylabel('P95 Latency (ms)') + ax1.set_title('P95 Latency vs Load') + ax1.legend() + ax1.grid(True, alpha=0.3) + ax1.set_yscale('log') + + # Plot 2: Throughput Achieved + for pattern_name, metrics_list in results.items(): + throughputs = [m.avg_throughput_tps for m in metrics_list] + ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name) + + # Perfect throughput line + ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)') + + ax2.set_xlabel('Target Rate (TPS)') + ax2.set_ylabel('Achieved Throughput (TPS)') + ax2.set_title('Throughput: Target vs Achieved') + ax2.legend() + ax2.grid(True, alpha=0.3) + + # Plot 3: Latency Distribution at 1000 TPS + rate_idx = 2 # 1000 TPS + for pattern_name, metrics_list in results.items(): + metrics = metrics_list[rate_idx] + # Plot latency percentiles + percentiles = [50, 95, 99] + values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency] + ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values, + alpha=0.7, label=pattern_name) + + ax3.set_ylabel('Latency (ms)') + ax3.set_title('Latency Percentiles at 1000 TPS') + ax3.set_yscale('log') + ax3.grid(True, alpha=0.3) + + # Plot 4: Cost vs Performance + for pattern_name, metrics_list in results.items(): + costs = [m.infrastructure_cost for m in metrics_list] + p95s = [m.p95_latency for m in metrics_list] + + # Use different markers for different patterns + markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'} + marker = markers.get(pattern_name, 'o') + + ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name) + + ax4.set_xlabel('Infrastructure Cost (relative)') + ax4.set_ylabel('P95 Latency (ms)') + ax4.set_title('Cost vs Performance Trade-off') + ax4.legend() + ax4.grid(True, alpha=0.3) + ax4.set_yscale('log') + + plt.tight_layout() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"Comparison plots saved to {save_path}") + else: + plt.show() + + except Exception as e: + print(f"Could not generate plots: {e}") + + +def main(): + """Run database persistence pattern comparison""" + print("Database Persistence Pattern Comparison") + print("Comparing WeaselDB vs Traditional Database Approaches") + print() + + comparison = DatabaseComparisonFramework(simulation_duration=20.0) + results = comparison.run_comparison() + + comparison.print_comparison_report(results) + + try: + comparison.plot_comparison_results(results, 'database_comparison.png') + except Exception as e: + print(f"Could not generate plots: {e}") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/latency_sim/persistence_optimizer.py b/latency_sim/persistence_optimizer.py new file mode 100644 index 0000000..dac4e86 --- /dev/null +++ b/latency_sim/persistence_optimizer.py @@ -0,0 +1,461 @@ +#!/usr/bin/env python3 +""" +Persistence Thread Parameter Optimization + +Uses Bayesian Optimization to automatically find the optimal configuration +parameters that minimize commit latency. This is much more efficient than +grid search since it uses a probabilistic model to guide parameter exploration. + +Key advantages: +- Efficiently explores high-dimensional parameter spaces +- Uses previous simulation results to guide future parameter choices +- Handles expensive objective function evaluations (our simulation) +- Provides uncertainty estimates for parameter importance +""" + +import numpy as np +from typing import Dict, List, Tuple, Optional +import time +from persistence_simulation import PersistenceSimulation, print_results + +# Try to import scikit-optimize for Bayesian optimization +try: + from skopt import gp_minimize, forest_minimize + from skopt.space import Real, Integer + from skopt.utils import use_named_args + from skopt.plots import plot_convergence, plot_objective + import matplotlib.pyplot as plt + OPTIMIZE_AVAILABLE = True +except ImportError: + print("scikit-optimize not available. Install with: pip install scikit-optimize") + print("Falling back to grid search...") + OPTIMIZE_AVAILABLE = False + + +class PersistenceOptimizer: + """ + Automated parameter optimization for the persistence thread using Bayesian optimization. + + This class finds the optimal configuration parameters to minimize commit latency + by intelligently exploring the parameter space using Gaussian Process models. + """ + + def __init__(self, + optimization_budget: int = 50, + simulation_duration: float = 20.0, + arrival_rate: float = 1000.0, + objective_metric: str = "p95_latency", + random_seed: int = 42): + + self.optimization_budget = optimization_budget + self.simulation_duration = simulation_duration + self.arrival_rate = arrival_rate + self.objective_metric = objective_metric + self.random_seed = random_seed + + # Track optimization history + self.optimization_history = [] + self.best_params = None + self.best_score = float('inf') + + # Define parameter search space + self.parameter_space = self._define_search_space() + self.parameter_names = [dim.name for dim in self.parameter_space] + + def _define_search_space(self) -> List: + """ + Define the parameter search space for optimization. + + Focus on the 3 core parameters that matter for persistence thread performance + with 100% reliable S3. Retry parameters removed since S3 never fails. + """ + return [ + # Core batching parameters + Real(1.0, 50.0, name='batch_timeout_ms', + prior='log-uniform'), # Log scale since small changes matter + Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB + prior='log-uniform'), + + # Flow control parameters - likely the most impactful + Integer(1, 50, name='max_in_flight_requests'), + ] + + def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict: + """Run simulation with given parameters and return results""" + try: + sim = PersistenceSimulation( + batch_timeout_ms=params['batch_timeout_ms'], + batch_size_threshold=int(params['batch_size_threshold']), + max_in_flight_requests=int(params['max_in_flight_requests']), + # Retry parameters fixed since S3 is 100% reliable + max_retry_attempts=0, # No retries needed + retry_base_delay_ms=100.0, # Irrelevant but needs a value + # S3 parameters kept fixed - 100% reliable for optimization focus + s3_latency_shape=2.0, # Fixed Gamma shape + s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean) + s3_failure_rate=0.0, # 100% reliable S3 + arrival_rate_per_sec=self.arrival_rate, + simulation_duration_sec=self.simulation_duration + ) + + return sim.run_simulation() + + except Exception as e: + print(f"Simulation failed with params {params}: {e}") + # Return a high penalty score for failed simulations + return { + 'commit_metrics': { + 'latency_ms': { + 'mean': 10000, + 'p95': 10000, + 'p99': 10000 + } + }, + 'error': str(e) + } + + def _extract_objective_value(self, results: Dict) -> float: + """Extract the objective value to minimize from simulation results""" + try: + commit_metrics = results['commit_metrics']['latency_ms'] + + if self.objective_metric == "mean_latency": + return commit_metrics['mean'] + elif self.objective_metric == "p95_latency": + return commit_metrics['p95'] + elif self.objective_metric == "p99_latency": + return commit_metrics['p99'] + elif self.objective_metric == "weighted_latency": + # Weighted combination emphasizing tail latencies + return (0.3 * commit_metrics['mean'] + + 0.5 * commit_metrics['p95'] + + 0.2 * commit_metrics['p99']) + else: + return commit_metrics['p95'] # Default to P95 + + except KeyError as e: + print(f"Failed to extract objective from results: {e}") + return 10000 # High penalty for invalid results + + def optimize_with_bayesian(self) -> Tuple[Dict, float]: + """ + Use Bayesian Optimization to find optimal parameters. + + This uses Gaussian Process models to build a probabilistic model + of the objective function and intelligently choose where to sample next. + """ + if not OPTIMIZE_AVAILABLE: + return self.optimize_with_grid_search() + + print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations") + print(f"Objective: Minimize {self.objective_metric}") + print(f"Parameter space: {len(self.parameter_space)} dimensions") + print() + + @use_named_args(self.parameter_space) + def objective(**params): + """Objective function for Bayesian optimization""" + print(f"Evaluating: {params}") + + start_time = time.time() + results = self._run_simulation_with_params(params) + eval_time = time.time() - start_time + + objective_value = self._extract_objective_value(results) + + # Track optimization history + history_entry = { + 'params': params.copy(), + 'objective_value': objective_value, + 'results': results, + 'eval_time': eval_time, + 'iteration': len(self.optimization_history) + 1 + } + self.optimization_history.append(history_entry) + + # Update best if improved + if objective_value < self.best_score: + self.best_score = objective_value + self.best_params = params.copy() + print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})") + else: + print(f" Score: {objective_value:.2f}ms") + + print(f" Time: {eval_time:.1f}s") + print() + + return objective_value + + # Run Bayesian optimization + result = gp_minimize( + func=objective, + dimensions=self.parameter_space, + n_calls=self.optimization_budget, + n_initial_points=10, # Random exploration first + acq_func='EI', # Expected Improvement acquisition + random_state=self.random_seed + ) + + # Extract best parameters + best_params_list = result.x + best_params_dict = dict(zip(self.parameter_names, best_params_list)) + best_objective = result.fun + + return best_params_dict, best_objective + + def optimize_with_grid_search(self) -> Tuple[Dict, float]: + """Fallback grid search optimization if scikit-optimize not available""" + print("Using grid search optimization (install scikit-optimize for better results)") + print() + + # Define a smaller grid for key parameters + grid_configs = [ + # Vary max_in_flight and batch_timeout + {'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0}, + {'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0}, + {'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0}, + {'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0}, + {'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0}, + {'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0}, + {'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0}, + ] + + best_params = None + best_score = float('inf') + + for i, config in enumerate(grid_configs): + print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}") + + # Use default values for unspecified parameters + full_params = { + 'batch_timeout_ms': 5.0, + 'batch_size_threshold': 1024 * 1024, + 'max_in_flight_requests': 5 + } + full_params.update(config) + + results = self._run_simulation_with_params(full_params) + objective_value = self._extract_objective_value(results) + + if objective_value < best_score: + best_score = objective_value + best_params = full_params.copy() + print(f"✓ NEW BEST: {objective_value:.2f}ms") + else: + print(f" Score: {objective_value:.2f}ms") + print() + + return best_params, best_score + + def analyze_parameter_importance(self): + """Analyze which parameters have the most impact on performance""" + if not self.optimization_history: + print("No optimization history available") + return + + print("Parameter Importance Analysis") + print("=" * 50) + + # Extract parameter values and objectives + param_data = {} + objectives = [] + + for entry in self.optimization_history: + objectives.append(entry['objective_value']) + for param_name, param_value in entry['params'].items(): + if param_name not in param_data: + param_data[param_name] = [] + param_data[param_name].append(param_value) + + objectives = np.array(objectives) + + # Simple correlation analysis + print("Parameter correlations with objective (lower is better):") + correlations = [] + + for param_name, values in param_data.items(): + correlation = np.corrcoef(values, objectives)[0, 1] + correlations.append((param_name, correlation)) + print(f" {param_name:<25}: {correlation:+.3f}") + + print("\nMost impactful parameters (by absolute correlation):") + correlations.sort(key=lambda x: abs(x[1]), reverse=True) + for param_name, correlation in correlations[:5]: + impact = "reduces latency" if correlation < 0 else "increases latency" + print(f" {param_name:<25}: {impact} (r={correlation:+.3f})") + + def plot_optimization_progress(self, save_path: Optional[str] = None): + """Plot optimization convergence""" + if not OPTIMIZE_AVAILABLE or not self.optimization_history: + return + + iterations = [entry['iteration'] for entry in self.optimization_history] + objectives = [entry['objective_value'] for entry in self.optimization_history] + + # Calculate running minimum (best so far) + running_min = [] + current_min = float('inf') + for obj in objectives: + current_min = min(current_min, obj) + running_min.append(current_min) + + plt.figure(figsize=(12, 8)) + + # Plot 1: Objective value over iterations + plt.subplot(2, 2, 1) + plt.scatter(iterations, objectives, alpha=0.6, s=30) + plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far') + plt.xlabel('Iteration') + plt.ylabel(f'{self.objective_metric} (ms)') + plt.title('Optimization Progress') + plt.legend() + plt.grid(True, alpha=0.3) + + # Plot 2: Parameter evolution for key parameters + plt.subplot(2, 2, 2) + key_params = ['max_in_flight_requests', 'batch_timeout_ms'] + for param in key_params: + if param in self.optimization_history[0]['params']: + values = [entry['params'][param] for entry in self.optimization_history] + plt.scatter(iterations, values, alpha=0.6, label=param, s=30) + plt.xlabel('Iteration') + plt.ylabel('Parameter Value') + plt.title('Key Parameter Evolution') + plt.legend() + plt.grid(True, alpha=0.3) + + # Plot 3: Objective distribution + plt.subplot(2, 2, 3) + plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black') + plt.axvline(self.best_score, color='red', linestyle='--', + label=f'Best: {self.best_score:.1f}ms') + plt.xlabel(f'{self.objective_metric} (ms)') + plt.ylabel('Count') + plt.title('Objective Value Distribution') + plt.legend() + plt.grid(True, alpha=0.3) + + # Plot 4: Convergence rate + plt.subplot(2, 2, 4) + improvements = [] + for i, entry in enumerate(self.optimization_history): + if i == 0: + improvements.append(0) + else: + prev_best = running_min[i-1] + curr_best = running_min[i] + improvement = prev_best - curr_best + improvements.append(improvement) + + plt.plot(iterations, improvements, 'g-', marker='o', markersize=3) + plt.xlabel('Iteration') + plt.ylabel('Improvement (ms)') + plt.title('Per-Iteration Improvement') + plt.grid(True, alpha=0.3) + + plt.tight_layout() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"Optimization plots saved to {save_path}") + else: + plt.show() + + def run_optimization(self) -> Dict: + """Run the full optimization process and return results""" + start_time = time.time() + + # Run optimization + if OPTIMIZE_AVAILABLE: + best_params, best_score = self.optimize_with_bayesian() + else: + best_params, best_score = self.optimize_with_grid_search() + + total_time = time.time() - start_time + + # Run final simulation with best parameters for detailed results + print("Running final simulation with optimal parameters...") + final_results = self._run_simulation_with_params(best_params) + + # Prepare optimization summary + optimization_summary = { + 'best_parameters': best_params, + 'best_objective_value': best_score, + 'optimization_time': total_time, + 'evaluations_performed': len(self.optimization_history), + 'final_simulation_results': final_results, + 'optimization_history': self.optimization_history + } + + return optimization_summary + + def print_optimization_summary(self, summary: Dict): + """Print a comprehensive summary of optimization results""" + print("=" * 80) + print("BAYESIAN OPTIMIZATION RESULTS") + print("=" * 80) + + print(f"Optimization completed in {summary['optimization_time']:.1f} seconds") + print(f"Performed {summary['evaluations_performed']} parameter evaluations") + print(f"Best {self.objective_metric}: {summary['best_objective_value']:.2f}ms") + print() + + print("OPTIMAL PARAMETERS:") + print("-" * 40) + for param, value in summary['best_parameters'].items(): + if isinstance(value, float): + if param.endswith('_rate'): + print(f" {param:<25}: {value:.4f}") + else: + print(f" {param:<25}: {value:.2f}") + else: + print(f" {param:<25}: {value}") + + print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:") + print("-" * 50) + final_results = summary['final_simulation_results'] + print_results(final_results) + + print("\nPARAMETER IMPACT ANALYSIS:") + print("-" * 30) + self.analyze_parameter_importance() + + +def main(): + """Main optimization workflow""" + print("Persistence Thread Parameter Optimization") + print("Using Bayesian Optimization for intelligent parameter search") + print() + + # Create optimizer with different objective functions to test + objectives_to_test = ["p95_latency", "weighted_latency"] + + for objective in objectives_to_test: + print(f"\n{'='*80}") + print(f"OPTIMIZING FOR: {objective.upper()}") + print(f"{'='*80}") + + optimizer = PersistenceOptimizer( + optimization_budget=30, # Reasonable for demo + simulation_duration=15.0, # Shorter sims for faster optimization + arrival_rate=1000.0, + objective_metric=objective, + random_seed=42 + ) + + # Run optimization + summary = optimizer.run_optimization() + optimizer.print_optimization_summary(summary) + + # Generate plots + try: + optimizer.plot_optimization_progress(f'optimization_{objective}.png') + except Exception as e: + print(f"Could not generate plots: {e}") + + print(f"\nOptimization for {objective} completed!") + print("="*80) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/latency_sim/persistence_simulation.py b/latency_sim/persistence_simulation.py new file mode 100644 index 0000000..d718d69 --- /dev/null +++ b/latency_sim/persistence_simulation.py @@ -0,0 +1,748 @@ +#!/usr/bin/env python3 +""" +Persistence Thread Simulation + +Simulates the persistence thread design from persistence.md to analyze +commit latency distributions with Poisson arrival times and realistic +S3 response characteristics. + +Key metrics tracked: +- End-to-end commit latency (arrival to acknowledgment) +- Batch processing latencies +- Queue depths and flow control behavior +- Retry patterns and failure handling +""" + +import heapq +import random +import statistics +from dataclasses import dataclass, field +from typing import List, Optional, Dict, Tuple +import numpy as np +import matplotlib.pyplot as plt +from collections import defaultdict, deque +import time + + +@dataclass +class Commit: + """Represents a single commit request""" + commit_id: int + arrival_time: float + size_bytes: int = 1024 # Default 1KB per commit + + +@dataclass +class Batch: + """Represents a batch of commits being processed""" + batch_id: int + commits: List[Commit] + created_time: float + size_bytes: int = field(init=False) + retry_count: int = 0 + + def __post_init__(self): + self.size_bytes = sum(c.size_bytes for c in self.commits) + + +@dataclass +class InFlightRequest: + """Tracks an in-flight S3 request""" + batch: Batch + start_time: float + expected_completion: float + connection_id: int + + +class PersistenceSimulation: + """ + Simulates the persistence thread behavior described in persistence.md + + For S3 latency, we use a Gamma distribution which is recommended for + modeling network service response times because: + - It has a natural lower bound (minimum network RTT) + - It can model the right-skewed tail typical of network services + - It captures both typical fast responses and occasional slow responses + - Shape parameter controls the heaviness of the tail + """ + + def __init__(self, + # Configuration from persistence.md defaults + batch_timeout_ms: float = 5.0, + batch_size_threshold: int = 1024 * 1024, # 1MB + max_in_flight_requests: int = 5, + max_retry_attempts: int = 3, + retry_base_delay_ms: float = 100.0, + + # S3 latency modeling (Gamma distribution parameters) + s3_latency_shape: float = 2.0, # Shape parameter (α) + s3_latency_scale: float = 25.0, # Scale parameter (β) + s3_failure_rate: float = 0.01, # 1% failure rate + + # Arrival rate modeling + arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson + + # Simulation parameters + simulation_duration_sec: float = 60.0): + + # Configuration + self.batch_timeout_ms = batch_timeout_ms + self.batch_size_threshold = batch_size_threshold + self.max_in_flight_requests = max_in_flight_requests + self.max_retry_attempts = max_retry_attempts + self.retry_base_delay_ms = retry_base_delay_ms + + # S3 modeling parameters + self.s3_latency_shape = s3_latency_shape + self.s3_latency_scale = s3_latency_scale + self.s3_failure_rate = s3_failure_rate + + # Arrival modeling + self.arrival_rate_per_sec = arrival_rate_per_sec + self.simulation_duration_sec = simulation_duration_sec + + # Simulation state + self.current_time = 0.0 + self.event_queue = [] # Priority queue of (time, event_type, event_data) + self.pending_commits = deque() + self.current_batch = [] + self.batch_start_time = None # Will be set when first commit added to batch + self.in_flight_requests: Dict[int, InFlightRequest] = {} + self.next_batch_id = 0 + self.next_connection_id = 0 + self.next_commit_id = 0 + + # Metrics collection + self.completed_commits = [] + self.batch_metrics = [] + self.retry_counts = defaultdict(int) + self.queue_depth_samples = [] + self.timeline_events = [] # For debugging/visualization + + # Random number generators + self.rng = random.Random(42) # Reproducible results + self.np_rng = np.random.RandomState(42) + + def sample_s3_latency(self, batch_size_bytes: int = 0) -> float: + """ + Sample S3 response latency using Gamma distribution with size-dependent scaling. + + Gamma distribution is ideal for S3 latency because: + - Shape=2.0, Scale=15.0 gives variable latency around the mean + - Minimum 30ms RTT prevents unrealistic sub-network responses + - Right-skewed tail captures occasional slow responses + - Models the reality that most responses are fast but some are slow + + Size-dependent scaling: + - Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean + - Linear scaling with size: +20ms per MB + - Large requests (1MB) average ~80ms with similar tail behavior + """ + # Minimum network RTT (realistic for cloud storage) + min_rtt = 30.0 # 30ms minimum round-trip time + + # Variable latency component from Gamma distribution + variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale) + + # Size-dependent scaling: +20ms per MB + size_mb = batch_size_bytes / (1024 * 1024) + size_penalty = size_mb * 20.0 # 20ms per MB + + return min_rtt + variable_latency + size_penalty + + def sample_inter_arrival_time(self) -> float: + """Sample time between commit arrivals using Poisson process""" + return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec) + + def sample_commit_size(self) -> int: + """ + Sample commit size with realistic distribution including large commits. + + Distribution: + - 70% small commits: 500B - 10KB (typical operations) + - 25% medium commits: 10KB - 100KB (batch operations, large documents) + - 5% large commits: 100KB - 1MB (bulk data, file uploads) + + This creates a realistic mix where most commits are small but some + can trigger size-based batching or become single-commit batches. + """ + rand = self.rng.random() + + if rand < 0.70: # 70% small commits + return self.rng.randint(500, 10 * 1024) # 500B - 10KB + elif rand < 0.95: # 25% medium commits + return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB + else: # 5% large commits + return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB + + def schedule_event(self, time: float, event_type: str, data=None): + """Add event to priority queue""" + heapq.heappush(self.event_queue, (time, event_type, data)) + + def should_process_batch(self) -> bool: + """Check if current batch should be processed based on triggers""" + if not self.current_batch: + return False + + # Size trigger + batch_size = sum(c.size_bytes for c in self.current_batch) + if batch_size >= self.batch_size_threshold: + return True + + # Time trigger - only if we have a valid batch start time + if self.batch_start_time is not None: + if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + return True + + return False + + def can_start_new_request(self) -> bool: + """Check if we can start a new request based on flow control""" + return len(self.in_flight_requests) < self.max_in_flight_requests + + def process_current_batch(self): + """Process the current batch if conditions are met""" + if not self.current_batch or not self.can_start_new_request(): + return + + if self.should_process_batch(): + batch = Batch( + batch_id=self.next_batch_id, + commits=self.current_batch.copy(), + created_time=self.current_time + ) + self.next_batch_id += 1 + + # Clear current batch + self.current_batch.clear() + self.batch_start_time = None # Reset to None, will be set on next batch + + self.send_batch_to_s3(batch) + + def send_batch_to_s3(self, batch: Batch, is_retry: bool = False): + """Send batch to S3 and track as in-flight request""" + if not self.can_start_new_request(): + # This shouldn't happen due to flow control, but handle gracefully + self.schedule_event(self.current_time + 0.001, 'retry_batch', batch) + return + + # Sample S3 response characteristics (pass batch size for latency modeling) + s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds + will_fail = self.rng.random() < self.s3_failure_rate + + if will_fail: + s3_latency *= 2 # Failed requests typically take longer + + completion_time = self.current_time + s3_latency + connection_id = self.next_connection_id + self.next_connection_id += 1 + + # Track in-flight request + in_flight = InFlightRequest( + batch=batch, + start_time=self.current_time, + expected_completion=completion_time, + connection_id=connection_id + ) + self.in_flight_requests[connection_id] = in_flight + + # Schedule completion event + if will_fail: + self.schedule_event(completion_time, 'batch_failed', connection_id) + else: + self.schedule_event(completion_time, 'batch_completed', connection_id) + + # Log event for analysis + self.timeline_events.append({ + 'time': self.current_time, + 'event': 'batch_sent', + 'batch_id': batch.batch_id, + 'batch_size': batch.size_bytes, + 'commit_count': len(batch.commits), + 'retry_count': batch.retry_count, + 'is_retry': is_retry + }) + + def handle_batch_completed(self, connection_id: int): + """Handle successful batch completion""" + if connection_id not in self.in_flight_requests: + return + + in_flight = self.in_flight_requests.pop(connection_id) + batch = in_flight.batch + + # Calculate metrics + batch_latency = self.current_time - batch.created_time + self.batch_metrics.append({ + 'batch_id': batch.batch_id, + 'latency': batch_latency, + 'size_bytes': batch.size_bytes, + 'commit_count': len(batch.commits), + 'retry_count': batch.retry_count + }) + + # Mark commits as completed and calculate end-to-end latency + for commit in batch.commits: + commit_latency = self.current_time - commit.arrival_time + self.completed_commits.append({ + 'commit_id': commit.commit_id, + 'arrival_time': commit.arrival_time, + 'completion_time': self.current_time, + 'latency': commit_latency, + 'batch_id': batch.batch_id, + 'retry_count': batch.retry_count + }) + + self.timeline_events.append({ + 'time': self.current_time, + 'event': 'batch_completed', + 'batch_id': batch.batch_id, + 'latency': batch_latency, + 'retry_count': batch.retry_count + }) + + # Try to process any pending work now that we have capacity + self.process_pending_work() + + def handle_batch_failed(self, connection_id: int): + """Handle batch failure with retry logic""" + if connection_id not in self.in_flight_requests: + return + + in_flight = self.in_flight_requests.pop(connection_id) + batch = in_flight.batch + + self.timeline_events.append({ + 'time': self.current_time, + 'event': 'batch_failed', + 'batch_id': batch.batch_id, + 'retry_count': batch.retry_count + }) + + if batch.retry_count < self.max_retry_attempts: + # Exponential backoff retry + batch.retry_count += 1 + backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1)) + retry_time = self.current_time + backoff_delay + + self.schedule_event(retry_time, 'retry_batch', batch) + self.retry_counts[batch.retry_count] += 1 + else: + # Max retries exhausted - this would be a fatal error in real system + self.timeline_events.append({ + 'time': self.current_time, + 'event': 'batch_abandoned', + 'batch_id': batch.batch_id, + 'retry_count': batch.retry_count + }) + + def handle_retry_batch(self, batch: Batch): + """Handle batch retry""" + self.send_batch_to_s3(batch, is_retry=True) + + def process_pending_work(self): + """Process any pending commits that can now be batched""" + # Move pending commits to current batch + while self.pending_commits and self.can_start_new_request(): + if not self.current_batch: + self.batch_start_time = self.current_time + + commit = self.pending_commits.popleft() + self.current_batch.append(commit) + + # Check if we should process this batch immediately + if self.should_process_batch(): + self.process_current_batch() + break + + # If we have commits but no in-flight capacity, they stay in current_batch + # and will be processed when capacity becomes available + + def handle_commit_arrival(self, commit: Commit): + """Handle new commit arrival""" + # If we have in-flight capacity, try to add to current batch + if self.can_start_new_request(): + if not self.current_batch: + self.batch_start_time = self.current_time + + self.current_batch.append(commit) + self.process_current_batch() # Check if we should process now + else: + # Add to pending queue due to flow control + self.pending_commits.append(commit) + + # Schedule timeout event for current batch if it's the first commit + if len(self.current_batch) == 1 and not self.pending_commits: + timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0) + self.schedule_event(timeout_time, 'batch_timeout', None) + + def handle_batch_timeout(self): + """Handle batch timeout trigger""" + if self.current_batch and self.can_start_new_request(): + self.process_current_batch() + + def sample_queue_depth(self): + """Sample current queue depths for analysis""" + pending_count = len(self.pending_commits) + current_batch_count = len(self.current_batch) + in_flight_count = len(self.in_flight_requests) + + self.queue_depth_samples.append({ + 'time': self.current_time, + 'pending_commits': pending_count, + 'current_batch_size': current_batch_count, + 'in_flight_requests': in_flight_count, + 'total_unacknowledged': pending_count + current_batch_count + + sum(len(req.batch.commits) for req in self.in_flight_requests.values()) + }) + + def generate_arrivals(self): + """Generate Poisson arrival events for the simulation""" + current_time = 0.0 + + while current_time < self.simulation_duration_sec: + # Sample next arrival time + inter_arrival = self.sample_inter_arrival_time() + current_time += inter_arrival + + if current_time >= self.simulation_duration_sec: + break + + # Create commit + commit = Commit( + commit_id=self.next_commit_id, + arrival_time=current_time, + size_bytes=self.sample_commit_size() # Realistic size distribution + ) + self.next_commit_id += 1 + + # Schedule arrival event + self.schedule_event(current_time, 'commit_arrival', commit) + + # Schedule periodic queue depth sampling + sample_time = 0.0 + while sample_time < self.simulation_duration_sec: + self.schedule_event(sample_time, 'sample_queue_depth', None) + sample_time += 0.1 # Sample every 100ms + + def run_simulation(self): + """Run the complete simulation""" + print(f"Starting persistence simulation...") + print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec") + print(f"Duration: {self.simulation_duration_sec} seconds") + print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}") + print() + + # Generate all arrival events + self.generate_arrivals() + + # Process events in time order + events_processed = 0 + while self.event_queue and events_processed < 1000000: # Safety limit + time, event_type, data = heapq.heappop(self.event_queue) + self.current_time = time + + if time > self.simulation_duration_sec: + break + + if event_type == 'commit_arrival': + self.handle_commit_arrival(data) + elif event_type == 'batch_completed': + self.handle_batch_completed(data) + elif event_type == 'batch_failed': + self.handle_batch_failed(data) + elif event_type == 'retry_batch': + self.handle_retry_batch(data) + elif event_type == 'batch_timeout': + self.handle_batch_timeout() + elif event_type == 'sample_queue_depth': + self.sample_queue_depth() + + events_processed += 1 + + print(f"Simulation completed. Processed {events_processed} events.") + return self.analyze_results() + + def analyze_results(self) -> Dict: + """Analyze simulation results and return metrics""" + if not self.completed_commits: + return {"error": "No commits completed during simulation"} + + # Calculate latency statistics + latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms + + results = { + 'simulation_config': { + 'duration_sec': self.simulation_duration_sec, + 'arrival_rate_per_sec': self.arrival_rate_per_sec, + 'batch_timeout_ms': self.batch_timeout_ms, + 'batch_size_threshold': self.batch_size_threshold, + 'max_in_flight_requests': self.max_in_flight_requests, + 's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})", + 's3_failure_rate': self.s3_failure_rate + }, + 'commit_metrics': { + 'total_commits': len(self.completed_commits), + 'latency_ms': { + 'mean': statistics.mean(latencies), + 'median': statistics.median(latencies), + 'std': statistics.stdev(latencies) if len(latencies) > 1 else 0, + 'min': min(latencies), + 'max': max(latencies), + 'p95': np.percentile(latencies, 95), + 'p99': np.percentile(latencies, 99) + } + }, + 'batch_metrics': { + 'total_batches': len(self.batch_metrics), + 'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]), + 'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]), + 'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics]) + }, + 'retry_analysis': dict(self.retry_counts), + 'queue_depth_analysis': self._analyze_queue_depths() + } + + return results + + def _analyze_queue_depths(self) -> Dict: + """Analyze queue depth patterns""" + if not self.queue_depth_samples: + return {} + + pending = [s['pending_commits'] for s in self.queue_depth_samples] + in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples] + total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples] + + return { + 'pending_commits': { + 'mean': statistics.mean(pending), + 'max': max(pending), + 'p95': np.percentile(pending, 95) + }, + 'in_flight_requests': { + 'mean': statistics.mean(in_flight), + 'max': max(in_flight), + 'p95': np.percentile(in_flight, 95) + }, + 'total_unacknowledged': { + 'mean': statistics.mean(total_unack), + 'max': max(total_unack), + 'p95': np.percentile(total_unack, 95) + } + } + + def plot_results(self, results: Dict, save_path: Optional[str] = None): + """Generate visualization plots of simulation results""" + if not self.completed_commits: + print("No data to plot") + return + + fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) + fig.suptitle('Persistence Thread Simulation Results', fontsize=16) + + # Plot 1: Commit latency histogram + latencies_ms = [c['latency'] * 1000 for c in self.completed_commits] + ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black') + ax1.set_xlabel('Commit Latency (ms)') + ax1.set_ylabel('Count') + ax1.set_title('Commit Latency Distribution') + ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red', + linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms") + ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange', + linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms") + ax1.legend() + + # Plot 2: Timeline of commit completions + completion_times = [c['completion_time'] for c in self.completed_commits] + completion_latencies = [c['latency'] * 1000 for c in self.completed_commits] + ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10) + ax2.set_xlabel('Time (seconds)') + ax2.set_ylabel('Commit Latency (ms)') + ax2.set_title('Latency Over Time') + + # Plot 3: Queue depth over time + if self.queue_depth_samples: + times = [s['time'] for s in self.queue_depth_samples] + pending = [s['pending_commits'] for s in self.queue_depth_samples] + in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples] + total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples] + + ax3.plot(times, pending, label='Pending Commits', alpha=0.8) + ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8) + ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8) + ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--', + label=f'Max In-Flight Limit ({self.max_in_flight_requests})') + ax3.set_xlabel('Time (seconds)') + ax3.set_ylabel('Count') + ax3.set_title('Queue Depths Over Time') + ax3.legend() + + # Plot 4: Batch size distribution + if self.batch_metrics: + batch_sizes = [b['commit_count'] for b in self.batch_metrics] + ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black') + ax4.set_xlabel('Commits per Batch') + ax4.set_ylabel('Count') + ax4.set_title('Batch Size Distribution') + + plt.tight_layout() + + if save_path: + plt.savefig(save_path, dpi=300, bbox_inches='tight') + print(f"Plots saved to {save_path}") + else: + plt.show() + + +def print_results(results: Dict): + """Pretty print simulation results""" + print("=" * 80) + print("PERSISTENCE THREAD SIMULATION RESULTS") + print("=" * 80) + + # Configuration + config = results['simulation_config'] + print(f"\nConfiguration:") + print(f" Duration: {config['duration_sec']}s") + print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec") + print(f" Batch Timeout: {config['batch_timeout_ms']}ms") + print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes") + print(f" Max In-Flight: {config['max_in_flight_requests']}") + print(f" S3 Latency: {config['s3_latency_params']}") + print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}") + + # Commit metrics + commit_metrics = results['commit_metrics'] + latency = commit_metrics['latency_ms'] + print(f"\nCommit Performance:") + print(f" Total Commits: {commit_metrics['total_commits']:,}") + print(f" Latency Mean: {latency['mean']:.2f}ms") + print(f" Latency Median: {latency['median']:.2f}ms") + print(f" Latency P95: {latency['p95']:.2f}ms") + print(f" Latency P99: {latency['p99']:.2f}ms") + print(f" Latency Std: {latency['std']:.2f}ms") + print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms") + + # Batch metrics + batch_metrics = results['batch_metrics'] + print(f"\nBatching Performance:") + print(f" Total Batches: {batch_metrics['total_batches']:,}") + print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}") + print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB") + print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms") + + # Retry analysis + if results['retry_analysis']: + print(f"\nRetry Analysis:") + for retry_count, occurrences in results['retry_analysis'].items(): + print(f" {occurrences:,} batches required {retry_count} retries") + + # Queue depth analysis + if results['queue_depth_analysis']: + queue_analysis = results['queue_depth_analysis'] + print(f"\nQueue Depth Analysis:") + if 'pending_commits' in queue_analysis: + pending = queue_analysis['pending_commits'] + print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}") + if 'in_flight_requests' in queue_analysis: + in_flight = queue_analysis['in_flight_requests'] + print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}") + if 'total_unacknowledged' in queue_analysis: + total = queue_analysis['total_unacknowledged'] + print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}") + + +if __name__ == "__main__": + print("Running Persistence Thread Configuration Analysis") + print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)") + print("Testing different configurations to optimize latency...") + print() + + # Test configurations with different max_in_flight values + configs = [ + {"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5}, + {"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10}, + {"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20}, + {"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0}, + {"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0}, + ] + + results_comparison = [] + + for config in configs: + print(f"\n{'='*60}") + print(f"Testing: {config['name']}") + print(f"{'='*60}") + + sim = PersistenceSimulation( + arrival_rate_per_sec=1000.0, + simulation_duration_sec=30.0, + s3_latency_shape=2.0, + s3_latency_scale=25.0, + s3_failure_rate=0.01, + max_in_flight_requests=config.get("max_in_flight_requests", 5), + batch_timeout_ms=config.get("batch_timeout_ms", 5.0) + ) + + results = sim.run_simulation() + results["config_name"] = config["name"] + results_comparison.append(results) + + # Print key metrics for quick comparison + commit_metrics = results['commit_metrics'] + batch_metrics = results['batch_metrics'] + queue_metrics = results.get('queue_depth_analysis', {}) + + print(f"\nKey Metrics:") + print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms") + print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms") + print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms") + print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}") + print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB") + if queue_metrics: + print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}") + print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}") + + # Summary comparison + print(f"\n{'='*80}") + print("CONFIGURATION COMPARISON SUMMARY") + print(f"{'='*80}") + print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}") + print(f"{'-'*80}") + + for result in results_comparison: + name = result["config_name"] + commit_metrics = result['commit_metrics'] + queue_metrics = result.get('queue_depth_analysis', {}) + mean_lat = commit_metrics['latency_ms']['mean'] + p95_lat = commit_metrics['latency_ms']['p95'] + p99_lat = commit_metrics['latency_ms']['p99'] + avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0) + + print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}") + + print(f"\nRecommendation: Choose config with lowest P95/P99 latencies") + print(f"Note: Higher in-flight allows more parallelism but may increase queue variability") + + # Generate plots for best configuration + best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95']) + print(f"\nGenerating plots for best configuration: {best_config['config_name']}") + + try: + # Re-run best config to get simulation object for plotting + best_params = next(c for c in configs if c['name'] == best_config['config_name']) + sim_best = PersistenceSimulation( + arrival_rate_per_sec=1000.0, + simulation_duration_sec=30.0, + s3_latency_shape=2.0, + s3_latency_scale=25.0, + s3_failure_rate=0.01, + max_in_flight_requests=best_params.get("max_in_flight_requests", 5), + batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0) + ) + sim_best.run_simulation() + sim_best.plot_results(best_config, f'persistence_optimization_results.png') + except Exception as e: + print(f"\nCould not generate plots: {e}") + print("Install matplotlib and numpy to enable visualization") \ No newline at end of file