diff --git a/latency_sim/database_comparison.py b/tools/latency_sim/database_comparison.py similarity index 76% rename from latency_sim/database_comparison.py rename to tools/latency_sim/database_comparison.py index 312ed30..8254c42 100644 --- a/latency_sim/database_comparison.py +++ b/tools/latency_sim/database_comparison.py @@ -38,6 +38,7 @@ class PersistencePattern(Enum): @dataclass class Transaction: """Represents a database transaction/commit""" + txn_id: int arrival_time: float size_bytes: int = 1024 @@ -47,6 +48,7 @@ class Transaction: @dataclass class PersistenceMetrics: """Metrics for a persistence approach""" + pattern: PersistencePattern total_transactions: int completed_transactions: int @@ -81,9 +83,9 @@ class PersistenceMetrics: class PersistenceSimulator(ABC): """Abstract base class for persistence pattern simulators""" - def __init__(self, - simulation_duration: float = 60.0, - arrival_rate_per_sec: float = 1000.0): + def __init__( + self, simulation_duration: float = 60.0, arrival_rate_per_sec: float = 1000.0 + ): self.simulation_duration = simulation_duration self.arrival_rate_per_sec = arrival_rate_per_sec @@ -115,7 +117,7 @@ class PersistenceSimulator(ABC): txn_id=self.next_txn_id, arrival_time=arrival_time, size_bytes=size, - requires_durability=True + requires_durability=True, ) @abstractmethod @@ -138,9 +140,9 @@ class PersistenceSimulator(ABC): time, event_type, data = heapq.heappop(self.event_queue) self.current_time = time - if event_type == 'transaction_arrival': + if event_type == "transaction_arrival": self.process_transaction(data) - elif event_type == 'custom': + elif event_type == "custom": self._handle_custom_event(data) return self._calculate_metrics() @@ -158,7 +160,7 @@ class PersistenceSimulator(ABC): txn = self.generate_transaction(time) self.next_txn_id += 1 - heapq.heappush(self.event_queue, (time, 'transaction_arrival', txn)) + heapq.heappush(self.event_queue, (time, "transaction_arrival", txn)) def _handle_custom_event(self, data): """Handle custom events - override in subclasses""" @@ -173,13 +175,12 @@ class PersistenceSimulator(ABC): if not self.completed_transactions: raise ValueError("No transactions completed") - latencies = [txn['latency_ms'] for txn in self.completed_transactions] + latencies = [txn["latency_ms"] for txn in self.completed_transactions] return PersistenceMetrics( pattern=self.get_pattern_name(), total_transactions=self.next_txn_id, completed_transactions=len(self.completed_transactions), - # Latency metrics min_latency=min(latencies), mean_latency=statistics.mean(latencies), @@ -187,22 +188,22 @@ class PersistenceSimulator(ABC): p95_latency=np.percentile(latencies, 95), p99_latency=np.percentile(latencies, 99), max_latency=max(latencies), - # Throughput metrics - avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration, + avg_throughput_tps=len(self.completed_transactions) + / self.simulation_duration, peak_throughput_tps=self._calculate_peak_throughput(), - # Resource metrics - implemented by subclasses avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0, - avg_network_mbps=statistics.mean(self.network_usage) if self.network_usage else 0, + avg_network_mbps=( + statistics.mean(self.network_usage) if self.network_usage else 0 + ), storage_efficiency=self._calculate_storage_efficiency(), - # Pattern-specific characteristics durability_guarantee=self._get_durability_guarantee(), consistency_model=self._get_consistency_model(), recovery_time_estimate=self._get_recovery_time(), operational_complexity=self._get_operational_complexity(), - infrastructure_cost=self._get_infrastructure_cost() + infrastructure_cost=self._get_infrastructure_cost(), ) def _calculate_peak_throughput(self) -> float: @@ -213,7 +214,7 @@ class PersistenceSimulator(ABC): # Group transactions by second throughput_by_second = defaultdict(int) for txn in self.completed_transactions: - second = int(txn['completion_time']) + second = int(txn["completion_time"]) throughput_by_second[second] += 1 return max(throughput_by_second.values()) if throughput_by_second else 0 @@ -247,11 +248,13 @@ class PersistenceSimulator(ABC): class WeaselDBSimulator(PersistenceSimulator): """WeaselDB's batched S3 persistence simulation""" - def __init__(self, - batch_timeout_ms: float = 1.0, - batch_size_threshold: int = 800000, - max_in_flight: int = 50, - **kwargs): + def __init__( + self, + batch_timeout_ms: float = 1.0, + batch_size_threshold: int = 800000, + max_in_flight: int = 50, + **kwargs, + ): super().__init__(**kwargs) self.batch_timeout_ms = batch_timeout_ms @@ -280,7 +283,10 @@ class WeaselDBSimulator(PersistenceSimulator): self.current_batch.append(txn) # Check if we should send batch - if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight: + if ( + self._should_send_batch() + and len(self.in_flight_batches) < self.max_in_flight + ): self._send_current_batch() def _should_send_batch(self) -> bool: @@ -294,7 +300,9 @@ class WeaselDBSimulator(PersistenceSimulator): return True # Time trigger - if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + if self.batch_start_time and (self.current_time - self.batch_start_time) >= ( + self.batch_timeout_ms / 1000.0 + ): return True return False @@ -315,14 +323,16 @@ class WeaselDBSimulator(PersistenceSimulator): # Track batch self.in_flight_batches[batch_id] = { - 'transactions': self.current_batch.copy(), - 'sent_time': self.current_time, - 'completion_time': completion_time, - 'size_bytes': batch_size + "transactions": self.current_batch.copy(), + "sent_time": self.current_time, + "completion_time": completion_time, + "size_bytes": batch_size, } # Schedule completion - self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id}) + self.schedule_event( + completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id} + ) # Track network usage self.network_usage.append(batch_size / (1024 * 1024)) # MB @@ -340,21 +350,23 @@ class WeaselDBSimulator(PersistenceSimulator): def _handle_custom_event(self, data): """Handle batch completion events""" - if data['type'] == 'batch_complete': - batch_id = data['batch_id'] + if data["type"] == "batch_complete": + batch_id = data["batch_id"] if batch_id in self.in_flight_batches: batch = self.in_flight_batches.pop(batch_id) # Mark transactions complete - for txn in batch['transactions']: + for txn in batch["transactions"]: latency_ms = (self.current_time - txn.arrival_time) * 1000 - self.completed_transactions.append({ - 'txn_id': txn.txn_id, - 'arrival_time': txn.arrival_time, - 'completion_time': self.current_time, - 'latency_ms': latency_ms, - 'size_bytes': txn.size_bytes - }) + self.completed_transactions.append( + { + "txn_id": txn.txn_id, + "arrival_time": txn.arrival_time, + "completion_time": self.current_time, + "latency_ms": latency_ms, + "size_bytes": txn.size_bytes, + } + ) def _calculate_storage_efficiency(self) -> float: return 1.0 # S3 has no storage overhead @@ -378,10 +390,12 @@ class WeaselDBSimulator(PersistenceSimulator): class TraditionalWALSimulator(PersistenceSimulator): """Traditional Write-Ahead Log with periodic sync""" - def __init__(self, - wal_sync_interval_ms: float = 10.0, - checkpoint_interval_sec: float = 30.0, - **kwargs): + def __init__( + self, + wal_sync_interval_ms: float = 10.0, + checkpoint_interval_sec: float = 30.0, + **kwargs, + ): super().__init__(**kwargs) self.wal_sync_interval_ms = wal_sync_interval_ms @@ -415,12 +429,12 @@ class TraditionalWALSimulator(PersistenceSimulator): """Schedule periodic WAL sync events""" sync_time = self.wal_sync_interval_ms / 1000.0 while sync_time < self.simulation_duration: - self.schedule_event(sync_time, 'custom', {'type': 'wal_sync'}) + self.schedule_event(sync_time, "custom", {"type": "wal_sync"}) sync_time += self.wal_sync_interval_ms / 1000.0 def _handle_custom_event(self, data): """Handle WAL sync events""" - if data['type'] == 'wal_sync': + if data["type"] == "wal_sync": self._perform_wal_sync() def _perform_wal_sync(self): @@ -436,10 +450,11 @@ class TraditionalWALSimulator(PersistenceSimulator): # Schedule sync completion syncing_txns = list(self.wal_buffer) - self.schedule_event(completion_time, 'custom', { - 'type': 'sync_complete', - 'transactions': syncing_txns - }) + self.schedule_event( + completion_time, + "custom", + {"type": "sync_complete", "transactions": syncing_txns}, + ) # Track IOPS for sync operation self.disk_iops.append(len(self.wal_buffer)) @@ -474,22 +489,24 @@ class TraditionalWALSimulator(PersistenceSimulator): def _handle_custom_event(self, data): """Handle sync completion""" - if data['type'] == 'wal_sync': + if data["type"] == "wal_sync": self._perform_wal_sync() - elif data['type'] == 'sync_complete': + elif data["type"] == "sync_complete": # Mark transactions as durable - for txn in data['transactions']: + for txn in data["transactions"]: if txn.txn_id in self.pending_transactions: del self.pending_transactions[txn.txn_id] latency_ms = (self.current_time - txn.arrival_time) * 1000 - self.completed_transactions.append({ - 'txn_id': txn.txn_id, - 'arrival_time': txn.arrival_time, - 'completion_time': self.current_time, - 'latency_ms': latency_ms, - 'size_bytes': txn.size_bytes - }) + self.completed_transactions.append( + { + "txn_id": txn.txn_id, + "arrival_time": txn.arrival_time, + "completion_time": self.current_time, + "latency_ms": latency_ms, + "size_bytes": txn.size_bytes, + } + ) def _calculate_storage_efficiency(self) -> float: return 2.0 # WAL + main storage @@ -527,10 +544,11 @@ class SynchronousSimulator(PersistenceSimulator): completion_time = self.current_time + disk_latency # Schedule completion - self.schedule_event(completion_time, 'custom', { - 'type': 'disk_write_complete', - 'transaction': txn - }) + self.schedule_event( + completion_time, + "custom", + {"type": "disk_write_complete", "transaction": txn}, + ) # Track disk IOPS self.disk_iops.append(1.0) @@ -559,17 +577,19 @@ class SynchronousSimulator(PersistenceSimulator): def _handle_custom_event(self, data): """Handle disk write completion""" - if data['type'] == 'disk_write_complete': - txn = data['transaction'] + if data["type"] == "disk_write_complete": + txn = data["transaction"] latency_ms = (self.current_time - txn.arrival_time) * 1000 - self.completed_transactions.append({ - 'txn_id': txn.txn_id, - 'arrival_time': txn.arrival_time, - 'completion_time': self.current_time, - 'latency_ms': latency_ms, - 'size_bytes': txn.size_bytes - }) + self.completed_transactions.append( + { + "txn_id": txn.txn_id, + "arrival_time": txn.arrival_time, + "completion_time": self.current_time, + "latency_ms": latency_ms, + "size_bytes": txn.size_bytes, + } + ) def _calculate_storage_efficiency(self) -> float: return 1.5 # Some overhead for metadata @@ -593,11 +613,13 @@ class SynchronousSimulator(PersistenceSimulator): class WeaselDBDiskSimulator(PersistenceSimulator): """WeaselDB's batched disk persistence simulation""" - def __init__(self, - batch_timeout_ms: float = 1.0, - batch_size_threshold: int = 800000, - max_in_flight: int = 50, - **kwargs): + def __init__( + self, + batch_timeout_ms: float = 1.0, + batch_size_threshold: int = 800000, + max_in_flight: int = 50, + **kwargs, + ): super().__init__(**kwargs) self.batch_timeout_ms = batch_timeout_ms @@ -626,7 +648,10 @@ class WeaselDBDiskSimulator(PersistenceSimulator): self.current_batch.append(txn) # Check if we should send batch - if self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight: + if ( + self._should_send_batch() + and len(self.in_flight_batches) < self.max_in_flight + ): self._send_current_batch() def _should_send_batch(self) -> bool: @@ -640,7 +665,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator): return True # Time trigger - if self.batch_start_time and (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + if self.batch_start_time and (self.current_time - self.batch_start_time) >= ( + self.batch_timeout_ms / 1000.0 + ): return True return False @@ -661,14 +688,16 @@ class WeaselDBDiskSimulator(PersistenceSimulator): # Track batch self.in_flight_batches[batch_id] = { - 'transactions': self.current_batch.copy(), - 'sent_time': self.current_time, - 'completion_time': completion_time, - 'size_bytes': batch_size + "transactions": self.current_batch.copy(), + "sent_time": self.current_time, + "completion_time": completion_time, + "size_bytes": batch_size, } # Schedule completion - self.schedule_event(completion_time, 'custom', {'type': 'batch_complete', 'batch_id': batch_id}) + self.schedule_event( + completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id} + ) # Track disk IOPS (one write operation per batch) self.disk_iops.append(1.0) @@ -684,7 +713,9 @@ class WeaselDBDiskSimulator(PersistenceSimulator): # Throughput-based latency for data transfer to page cache size_mb = batch_size_bytes / (1024 * 1024) - transfer_latency = (size_mb / self.disk_throughput_mbps) * 1000.0 # Convert to ms + transfer_latency = ( + size_mb / self.disk_throughput_mbps + ) * 1000.0 # Convert to ms # FSYNC latency for EBS - forces write to replicated storage # EBS has higher fsync latency due to network replication @@ -709,21 +740,23 @@ class WeaselDBDiskSimulator(PersistenceSimulator): def _handle_custom_event(self, data): """Handle batch completion events""" - if data['type'] == 'batch_complete': - batch_id = data['batch_id'] + if data["type"] == "batch_complete": + batch_id = data["batch_id"] if batch_id in self.in_flight_batches: batch = self.in_flight_batches.pop(batch_id) # Mark transactions complete - for txn in batch['transactions']: + for txn in batch["transactions"]: latency_ms = (self.current_time - txn.arrival_time) * 1000 - self.completed_transactions.append({ - 'txn_id': txn.txn_id, - 'arrival_time': txn.arrival_time, - 'completion_time': self.current_time, - 'latency_ms': latency_ms, - 'size_bytes': txn.size_bytes - }) + self.completed_transactions.append( + { + "txn_id": txn.txn_id, + "arrival_time": txn.arrival_time, + "completion_time": self.current_time, + "latency_ms": latency_ms, + "size_bytes": txn.size_bytes, + } + ) def _calculate_storage_efficiency(self) -> float: return 1.2 # Some overhead for batching metadata @@ -750,8 +783,9 @@ class DatabaseComparisonFramework: def __init__(self, simulation_duration: float = 30.0): self.simulation_duration = simulation_duration - def run_comparison(self, - arrival_rates: List[float] = [100, 500, 1000, 2000]) -> Dict[str, List[PersistenceMetrics]]: + def run_comparison( + self, arrival_rates: List[float] = [100, 500, 1000, 2000] + ) -> Dict[str, List[PersistenceMetrics]]: """Run comparison across multiple arrival rates""" results = defaultdict(list) @@ -765,10 +799,10 @@ class DatabaseComparisonFramework: batch_size_threshold=800000, max_in_flight=50, simulation_duration=self.simulation_duration, - arrival_rate_per_sec=rate + arrival_rate_per_sec=rate, ) weasel_s3_metrics = weasel_s3.run_simulation() - results['WeaselDB S3'].append(weasel_s3_metrics) + results["WeaselDB S3"].append(weasel_s3_metrics) print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms") # WeaselDB EBS (same batching, EBS storage) @@ -777,38 +811,37 @@ class DatabaseComparisonFramework: batch_size_threshold=800000, max_in_flight=50, simulation_duration=self.simulation_duration, - arrival_rate_per_sec=rate + arrival_rate_per_sec=rate, ) weasel_ebs_metrics = weasel_ebs.run_simulation() - results['WeaselDB EBS'].append(weasel_ebs_metrics) + results["WeaselDB EBS"].append(weasel_ebs_metrics) print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms") # Traditional WAL wal = TraditionalWALSimulator( wal_sync_interval_ms=10.0, simulation_duration=self.simulation_duration, - arrival_rate_per_sec=rate + arrival_rate_per_sec=rate, ) wal_metrics = wal.run_simulation() - results['Traditional WAL'].append(wal_metrics) + results["Traditional WAL"].append(wal_metrics) print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms") # Synchronous sync = SynchronousSimulator( - simulation_duration=self.simulation_duration, - arrival_rate_per_sec=rate + simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate ) sync_metrics = sync.run_simulation() - results['Synchronous'].append(sync_metrics) + results["Synchronous"].append(sync_metrics) print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms") return dict(results) def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]): """Print comprehensive comparison report""" - print("\n" + "="*80) + print("\n" + "=" * 80) print("DATABASE PERSISTENCE PATTERN COMPARISON") - print("="*80) + print("=" * 80) # Get arrival rates for headers arrival_rates = [100, 500, 1000, 2000] @@ -843,14 +876,18 @@ class DatabaseComparisonFramework: # Characteristics comparison print(f"\nSYSTEM CHARACTERISTICS") - print(f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}") + print( + f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}" + ) print("-" * 85) for pattern_name, metrics_list in results.items(): metrics = metrics_list[0] # Use first metrics for characteristics - print(f"{pattern_name:<20} {metrics.durability_guarantee:<25} " - f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} " - f"{metrics.infrastructure_cost:<6.1f}") + print( + f"{pattern_name:<20} {metrics.durability_guarantee:<25} " + f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} " + f"{metrics.infrastructure_cost:<6.1f}" + ) # Performance sweet spots print(f"\nPERFORMANCE SWEET SPOTS") @@ -858,49 +895,76 @@ class DatabaseComparisonFramework: for rate in arrival_rates: print(f"\nAt {rate} TPS:") - rate_results = [(name, metrics_list[arrival_rates.index(rate)]) - for name, metrics_list in results.items()] + rate_results = [ + (name, metrics_list[arrival_rates.index(rate)]) + for name, metrics_list in results.items() + ] # Sort by P95 latency rate_results.sort(key=lambda x: x[1].p95_latency) for i, (name, metrics) in enumerate(rate_results): - rank_symbol = "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " " - print(f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, " - f"{metrics.avg_throughput_tps:.0f} TPS achieved") + rank_symbol = ( + "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " " + ) + print( + f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, " + f"{metrics.avg_throughput_tps:.0f} TPS achieved" + ) - def plot_comparison_results(self, results: Dict[str, List[PersistenceMetrics]], - save_path: Optional[str] = None): + def plot_comparison_results( + self, + results: Dict[str, List[PersistenceMetrics]], + save_path: Optional[str] = None, + ): """Plot comparison results""" try: arrival_rates = [100, 500, 1000, 2000] fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) - fig.suptitle('Database Persistence Pattern Comparison', fontsize=16) + fig.suptitle("Database Persistence Pattern Comparison", fontsize=16) # Plot 1: P95 Latency vs Load for pattern_name, metrics_list in results.items(): p95_latencies = [m.p95_latency for m in metrics_list] - ax1.plot(arrival_rates, p95_latencies, marker='o', linewidth=2, label=pattern_name) + ax1.plot( + arrival_rates, + p95_latencies, + marker="o", + linewidth=2, + label=pattern_name, + ) - ax1.set_xlabel('Arrival Rate (TPS)') - ax1.set_ylabel('P95 Latency (ms)') - ax1.set_title('P95 Latency vs Load') + ax1.set_xlabel("Arrival Rate (TPS)") + ax1.set_ylabel("P95 Latency (ms)") + ax1.set_title("P95 Latency vs Load") ax1.legend() ax1.grid(True, alpha=0.3) - ax1.set_yscale('log') + ax1.set_yscale("log") # Plot 2: Throughput Achieved for pattern_name, metrics_list in results.items(): throughputs = [m.avg_throughput_tps for m in metrics_list] - ax2.plot(arrival_rates, throughputs, marker='s', linewidth=2, label=pattern_name) + ax2.plot( + arrival_rates, + throughputs, + marker="s", + linewidth=2, + label=pattern_name, + ) # Perfect throughput line - ax2.plot(arrival_rates, arrival_rates, 'k--', alpha=0.5, label='Perfect (no loss)') + ax2.plot( + arrival_rates, + arrival_rates, + "k--", + alpha=0.5, + label="Perfect (no loss)", + ) - ax2.set_xlabel('Target Rate (TPS)') - ax2.set_ylabel('Achieved Throughput (TPS)') - ax2.set_title('Throughput: Target vs Achieved') + ax2.set_xlabel("Target Rate (TPS)") + ax2.set_ylabel("Achieved Throughput (TPS)") + ax2.set_title("Throughput: Target vs Achieved") ax2.legend() ax2.grid(True, alpha=0.3) @@ -910,13 +974,21 @@ class DatabaseComparisonFramework: metrics = metrics_list[rate_idx] # Plot latency percentiles percentiles = [50, 95, 99] - values = [metrics.median_latency, metrics.p95_latency, metrics.p99_latency] - ax3.bar([f"{pattern_name}\nP{p}" for p in percentiles], values, - alpha=0.7, label=pattern_name) + values = [ + metrics.median_latency, + metrics.p95_latency, + metrics.p99_latency, + ] + ax3.bar( + [f"{pattern_name}\nP{p}" for p in percentiles], + values, + alpha=0.7, + label=pattern_name, + ) - ax3.set_ylabel('Latency (ms)') - ax3.set_title('Latency Percentiles at 1000 TPS') - ax3.set_yscale('log') + ax3.set_ylabel("Latency (ms)") + ax3.set_title("Latency Percentiles at 1000 TPS") + ax3.set_yscale("log") ax3.grid(True, alpha=0.3) # Plot 4: Cost vs Performance @@ -925,22 +997,24 @@ class DatabaseComparisonFramework: p95s = [m.p95_latency for m in metrics_list] # Use different markers for different patterns - markers = {'WeaselDB': 'o', 'Traditional WAL': 's', 'Synchronous': '^'} - marker = markers.get(pattern_name, 'o') + markers = {"WeaselDB": "o", "Traditional WAL": "s", "Synchronous": "^"} + marker = markers.get(pattern_name, "o") - ax4.scatter(costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name) + ax4.scatter( + costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name + ) - ax4.set_xlabel('Infrastructure Cost (relative)') - ax4.set_ylabel('P95 Latency (ms)') - ax4.set_title('Cost vs Performance Trade-off') + ax4.set_xlabel("Infrastructure Cost (relative)") + ax4.set_ylabel("P95 Latency (ms)") + ax4.set_title("Cost vs Performance Trade-off") ax4.legend() ax4.grid(True, alpha=0.3) - ax4.set_yscale('log') + ax4.set_yscale("log") plt.tight_layout() if save_path: - plt.savefig(save_path, dpi=300, bbox_inches='tight') + plt.savefig(save_path, dpi=300, bbox_inches="tight") print(f"Comparison plots saved to {save_path}") else: plt.show() @@ -961,10 +1035,10 @@ def main(): comparison.print_comparison_report(results) try: - comparison.plot_comparison_results(results, 'database_comparison.png') + comparison.plot_comparison_results(results, "database_comparison.png") except Exception as e: print(f"Could not generate plots: {e}") if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/latency_sim/persistence_optimizer.py b/tools/latency_sim/persistence_optimizer.py similarity index 71% rename from latency_sim/persistence_optimizer.py rename to tools/latency_sim/persistence_optimizer.py index dac4e86..6d68027 100644 --- a/latency_sim/persistence_optimizer.py +++ b/tools/latency_sim/persistence_optimizer.py @@ -25,6 +25,7 @@ try: from skopt.utils import use_named_args from skopt.plots import plot_convergence, plot_objective import matplotlib.pyplot as plt + OPTIMIZE_AVAILABLE = True except ImportError: print("scikit-optimize not available. Install with: pip install scikit-optimize") @@ -40,12 +41,14 @@ class PersistenceOptimizer: by intelligently exploring the parameter space using Gaussian Process models. """ - def __init__(self, - optimization_budget: int = 50, - simulation_duration: float = 20.0, - arrival_rate: float = 1000.0, - objective_metric: str = "p95_latency", - random_seed: int = 42): + def __init__( + self, + optimization_budget: int = 50, + simulation_duration: float = 20.0, + arrival_rate: float = 1000.0, + objective_metric: str = "p95_latency", + random_seed: int = 42, + ): self.optimization_budget = optimization_budget self.simulation_duration = simulation_duration @@ -56,7 +59,7 @@ class PersistenceOptimizer: # Track optimization history self.optimization_history = [] self.best_params = None - self.best_score = float('inf') + self.best_score = float("inf") # Define parameter search space self.parameter_space = self._define_search_space() @@ -71,31 +74,35 @@ class PersistenceOptimizer: """ return [ # Core batching parameters - Real(1.0, 50.0, name='batch_timeout_ms', - prior='log-uniform'), # Log scale since small changes matter - Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB - prior='log-uniform'), - + Real( + 1.0, 50.0, name="batch_timeout_ms", prior="log-uniform" + ), # Log scale since small changes matter + Integer( + 64 * 1024, + 4 * 1024 * 1024, + name="batch_size_threshold", # 64KB - 4MB + prior="log-uniform", + ), # Flow control parameters - likely the most impactful - Integer(1, 50, name='max_in_flight_requests'), + Integer(1, 50, name="max_in_flight_requests"), ] def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict: """Run simulation with given parameters and return results""" try: sim = PersistenceSimulation( - batch_timeout_ms=params['batch_timeout_ms'], - batch_size_threshold=int(params['batch_size_threshold']), - max_in_flight_requests=int(params['max_in_flight_requests']), + batch_timeout_ms=params["batch_timeout_ms"], + batch_size_threshold=int(params["batch_size_threshold"]), + max_in_flight_requests=int(params["max_in_flight_requests"]), # Retry parameters fixed since S3 is 100% reliable - max_retry_attempts=0, # No retries needed - retry_base_delay_ms=100.0, # Irrelevant but needs a value + max_retry_attempts=0, # No retries needed + retry_base_delay_ms=100.0, # Irrelevant but needs a value # S3 parameters kept fixed - 100% reliable for optimization focus - s3_latency_shape=2.0, # Fixed Gamma shape - s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean) - s3_failure_rate=0.0, # 100% reliable S3 + s3_latency_shape=2.0, # Fixed Gamma shape + s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean) + s3_failure_rate=0.0, # 100% reliable S3 arrival_rate_per_sec=self.arrival_rate, - simulation_duration_sec=self.simulation_duration + simulation_duration_sec=self.simulation_duration, ) return sim.run_simulation() @@ -104,34 +111,32 @@ class PersistenceOptimizer: print(f"Simulation failed with params {params}: {e}") # Return a high penalty score for failed simulations return { - 'commit_metrics': { - 'latency_ms': { - 'mean': 10000, - 'p95': 10000, - 'p99': 10000 - } + "commit_metrics": { + "latency_ms": {"mean": 10000, "p95": 10000, "p99": 10000} }, - 'error': str(e) + "error": str(e), } def _extract_objective_value(self, results: Dict) -> float: """Extract the objective value to minimize from simulation results""" try: - commit_metrics = results['commit_metrics']['latency_ms'] + commit_metrics = results["commit_metrics"]["latency_ms"] if self.objective_metric == "mean_latency": - return commit_metrics['mean'] + return commit_metrics["mean"] elif self.objective_metric == "p95_latency": - return commit_metrics['p95'] + return commit_metrics["p95"] elif self.objective_metric == "p99_latency": - return commit_metrics['p99'] + return commit_metrics["p99"] elif self.objective_metric == "weighted_latency": # Weighted combination emphasizing tail latencies - return (0.3 * commit_metrics['mean'] + - 0.5 * commit_metrics['p95'] + - 0.2 * commit_metrics['p99']) + return ( + 0.3 * commit_metrics["mean"] + + 0.5 * commit_metrics["p95"] + + 0.2 * commit_metrics["p99"] + ) else: - return commit_metrics['p95'] # Default to P95 + return commit_metrics["p95"] # Default to P95 except KeyError as e: print(f"Failed to extract objective from results: {e}") @@ -147,7 +152,9 @@ class PersistenceOptimizer: if not OPTIMIZE_AVAILABLE: return self.optimize_with_grid_search() - print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations") + print( + f"Starting Bayesian Optimization with {self.optimization_budget} evaluations" + ) print(f"Objective: Minimize {self.objective_metric}") print(f"Parameter space: {len(self.parameter_space)} dimensions") print() @@ -165,11 +172,11 @@ class PersistenceOptimizer: # Track optimization history history_entry = { - 'params': params.copy(), - 'objective_value': objective_value, - 'results': results, - 'eval_time': eval_time, - 'iteration': len(self.optimization_history) + 1 + "params": params.copy(), + "objective_value": objective_value, + "results": results, + "eval_time": eval_time, + "iteration": len(self.optimization_history) + 1, } self.optimization_history.append(history_entry) @@ -177,7 +184,9 @@ class PersistenceOptimizer: if objective_value < self.best_score: self.best_score = objective_value self.best_params = params.copy() - print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})") + print( + f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})" + ) else: print(f" Score: {objective_value:.2f}ms") @@ -192,8 +201,8 @@ class PersistenceOptimizer: dimensions=self.parameter_space, n_calls=self.optimization_budget, n_initial_points=10, # Random exploration first - acq_func='EI', # Expected Improvement acquisition - random_state=self.random_seed + acq_func="EI", # Expected Improvement acquisition + random_state=self.random_seed, ) # Extract best parameters @@ -205,32 +214,34 @@ class PersistenceOptimizer: def optimize_with_grid_search(self) -> Tuple[Dict, float]: """Fallback grid search optimization if scikit-optimize not available""" - print("Using grid search optimization (install scikit-optimize for better results)") + print( + "Using grid search optimization (install scikit-optimize for better results)" + ) print() # Define a smaller grid for key parameters grid_configs = [ # Vary max_in_flight and batch_timeout - {'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0}, - {'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0}, - {'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0}, - {'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0}, - {'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0}, - {'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0}, - {'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0}, + {"max_in_flight_requests": 5, "batch_timeout_ms": 5.0}, + {"max_in_flight_requests": 10, "batch_timeout_ms": 5.0}, + {"max_in_flight_requests": 20, "batch_timeout_ms": 5.0}, + {"max_in_flight_requests": 10, "batch_timeout_ms": 2.0}, + {"max_in_flight_requests": 10, "batch_timeout_ms": 10.0}, + {"max_in_flight_requests": 15, "batch_timeout_ms": 3.0}, + {"max_in_flight_requests": 25, "batch_timeout_ms": 7.0}, ] best_params = None - best_score = float('inf') + best_score = float("inf") for i, config in enumerate(grid_configs): print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}") # Use default values for unspecified parameters full_params = { - 'batch_timeout_ms': 5.0, - 'batch_size_threshold': 1024 * 1024, - 'max_in_flight_requests': 5 + "batch_timeout_ms": 5.0, + "batch_size_threshold": 1024 * 1024, + "max_in_flight_requests": 5, } full_params.update(config) @@ -261,8 +272,8 @@ class PersistenceOptimizer: objectives = [] for entry in self.optimization_history: - objectives.append(entry['objective_value']) - for param_name, param_value in entry['params'].items(): + objectives.append(entry["objective_value"]) + for param_name, param_value in entry["params"].items(): if param_name not in param_data: param_data[param_name] = [] param_data[param_name].append(param_value) @@ -289,12 +300,12 @@ class PersistenceOptimizer: if not OPTIMIZE_AVAILABLE or not self.optimization_history: return - iterations = [entry['iteration'] for entry in self.optimization_history] - objectives = [entry['objective_value'] for entry in self.optimization_history] + iterations = [entry["iteration"] for entry in self.optimization_history] + objectives = [entry["objective_value"] for entry in self.optimization_history] # Calculate running minimum (best so far) running_min = [] - current_min = float('inf') + current_min = float("inf") for obj in objectives: current_min = min(current_min, obj) running_min.append(current_min) @@ -304,34 +315,38 @@ class PersistenceOptimizer: # Plot 1: Objective value over iterations plt.subplot(2, 2, 1) plt.scatter(iterations, objectives, alpha=0.6, s=30) - plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far') - plt.xlabel('Iteration') - plt.ylabel(f'{self.objective_metric} (ms)') - plt.title('Optimization Progress') + plt.plot(iterations, running_min, "r-", linewidth=2, label="Best so far") + plt.xlabel("Iteration") + plt.ylabel(f"{self.objective_metric} (ms)") + plt.title("Optimization Progress") plt.legend() plt.grid(True, alpha=0.3) # Plot 2: Parameter evolution for key parameters plt.subplot(2, 2, 2) - key_params = ['max_in_flight_requests', 'batch_timeout_ms'] + key_params = ["max_in_flight_requests", "batch_timeout_ms"] for param in key_params: - if param in self.optimization_history[0]['params']: - values = [entry['params'][param] for entry in self.optimization_history] + if param in self.optimization_history[0]["params"]: + values = [entry["params"][param] for entry in self.optimization_history] plt.scatter(iterations, values, alpha=0.6, label=param, s=30) - plt.xlabel('Iteration') - plt.ylabel('Parameter Value') - plt.title('Key Parameter Evolution') + plt.xlabel("Iteration") + plt.ylabel("Parameter Value") + plt.title("Key Parameter Evolution") plt.legend() plt.grid(True, alpha=0.3) # Plot 3: Objective distribution plt.subplot(2, 2, 3) - plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black') - plt.axvline(self.best_score, color='red', linestyle='--', - label=f'Best: {self.best_score:.1f}ms') - plt.xlabel(f'{self.objective_metric} (ms)') - plt.ylabel('Count') - plt.title('Objective Value Distribution') + plt.hist(objectives, bins=20, alpha=0.7, edgecolor="black") + plt.axvline( + self.best_score, + color="red", + linestyle="--", + label=f"Best: {self.best_score:.1f}ms", + ) + plt.xlabel(f"{self.objective_metric} (ms)") + plt.ylabel("Count") + plt.title("Objective Value Distribution") plt.legend() plt.grid(True, alpha=0.3) @@ -342,21 +357,21 @@ class PersistenceOptimizer: if i == 0: improvements.append(0) else: - prev_best = running_min[i-1] + prev_best = running_min[i - 1] curr_best = running_min[i] improvement = prev_best - curr_best improvements.append(improvement) - plt.plot(iterations, improvements, 'g-', marker='o', markersize=3) - plt.xlabel('Iteration') - plt.ylabel('Improvement (ms)') - plt.title('Per-Iteration Improvement') + plt.plot(iterations, improvements, "g-", marker="o", markersize=3) + plt.xlabel("Iteration") + plt.ylabel("Improvement (ms)") + plt.title("Per-Iteration Improvement") plt.grid(True, alpha=0.3) plt.tight_layout() if save_path: - plt.savefig(save_path, dpi=300, bbox_inches='tight') + plt.savefig(save_path, dpi=300, bbox_inches="tight") print(f"Optimization plots saved to {save_path}") else: plt.show() @@ -379,12 +394,12 @@ class PersistenceOptimizer: # Prepare optimization summary optimization_summary = { - 'best_parameters': best_params, - 'best_objective_value': best_score, - 'optimization_time': total_time, - 'evaluations_performed': len(self.optimization_history), - 'final_simulation_results': final_results, - 'optimization_history': self.optimization_history + "best_parameters": best_params, + "best_objective_value": best_score, + "optimization_time": total_time, + "evaluations_performed": len(self.optimization_history), + "final_simulation_results": final_results, + "optimization_history": self.optimization_history, } return optimization_summary @@ -402,9 +417,9 @@ class PersistenceOptimizer: print("OPTIMAL PARAMETERS:") print("-" * 40) - for param, value in summary['best_parameters'].items(): + for param, value in summary["best_parameters"].items(): if isinstance(value, float): - if param.endswith('_rate'): + if param.endswith("_rate"): print(f" {param:<25}: {value:.4f}") else: print(f" {param:<25}: {value:.2f}") @@ -413,7 +428,7 @@ class PersistenceOptimizer: print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:") print("-" * 50) - final_results = summary['final_simulation_results'] + final_results = summary["final_simulation_results"] print_results(final_results) print("\nPARAMETER IMPACT ANALYSIS:") @@ -440,7 +455,7 @@ def main(): simulation_duration=15.0, # Shorter sims for faster optimization arrival_rate=1000.0, objective_metric=objective, - random_seed=42 + random_seed=42, ) # Run optimization @@ -449,13 +464,13 @@ def main(): # Generate plots try: - optimizer.plot_optimization_progress(f'optimization_{objective}.png') + optimizer.plot_optimization_progress(f"optimization_{objective}.png") except Exception as e: print(f"Could not generate plots: {e}") print(f"\nOptimization for {objective} completed!") - print("="*80) + print("=" * 80) if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/latency_sim/persistence_simulation.py b/tools/latency_sim/persistence_simulation.py similarity index 62% rename from latency_sim/persistence_simulation.py rename to tools/latency_sim/persistence_simulation.py index d718d69..45011d1 100644 --- a/latency_sim/persistence_simulation.py +++ b/tools/latency_sim/persistence_simulation.py @@ -27,6 +27,7 @@ import time @dataclass class Commit: """Represents a single commit request""" + commit_id: int arrival_time: float size_bytes: int = 1024 # Default 1KB per commit @@ -35,6 +36,7 @@ class Commit: @dataclass class Batch: """Represents a batch of commits being processed""" + batch_id: int commits: List[Commit] created_time: float @@ -48,6 +50,7 @@ class Batch: @dataclass class InFlightRequest: """Tracks an in-flight S3 request""" + batch: Batch start_time: float expected_completion: float @@ -66,24 +69,23 @@ class PersistenceSimulation: - Shape parameter controls the heaviness of the tail """ - def __init__(self, - # Configuration from persistence.md defaults - batch_timeout_ms: float = 5.0, - batch_size_threshold: int = 1024 * 1024, # 1MB - max_in_flight_requests: int = 5, - max_retry_attempts: int = 3, - retry_base_delay_ms: float = 100.0, - - # S3 latency modeling (Gamma distribution parameters) - s3_latency_shape: float = 2.0, # Shape parameter (α) - s3_latency_scale: float = 25.0, # Scale parameter (β) - s3_failure_rate: float = 0.01, # 1% failure rate - - # Arrival rate modeling - arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson - - # Simulation parameters - simulation_duration_sec: float = 60.0): + def __init__( + self, + # Configuration from persistence.md defaults + batch_timeout_ms: float = 5.0, + batch_size_threshold: int = 1024 * 1024, # 1MB + max_in_flight_requests: int = 5, + max_retry_attempts: int = 3, + retry_base_delay_ms: float = 100.0, + # S3 latency modeling (Gamma distribution parameters) + s3_latency_shape: float = 2.0, # Shape parameter (α) + s3_latency_scale: float = 25.0, # Scale parameter (β) + s3_failure_rate: float = 0.01, # 1% failure rate + # Arrival rate modeling + arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson + # Simulation parameters + simulation_duration_sec: float = 60.0, + ): # Configuration self.batch_timeout_ms = batch_timeout_ms @@ -142,7 +144,9 @@ class PersistenceSimulation: min_rtt = 30.0 # 30ms minimum round-trip time # Variable latency component from Gamma distribution - variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale) + variable_latency = self.np_rng.gamma( + self.s3_latency_shape, self.s3_latency_scale + ) # Size-dependent scaling: +20ms per MB size_mb = batch_size_bytes / (1024 * 1024) @@ -191,7 +195,9 @@ class PersistenceSimulation: # Time trigger - only if we have a valid batch start time if self.batch_start_time is not None: - if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0): + if (self.current_time - self.batch_start_time) >= ( + self.batch_timeout_ms / 1000.0 + ): return True return False @@ -209,7 +215,7 @@ class PersistenceSimulation: batch = Batch( batch_id=self.next_batch_id, commits=self.current_batch.copy(), - created_time=self.current_time + created_time=self.current_time, ) self.next_batch_id += 1 @@ -223,11 +229,13 @@ class PersistenceSimulation: """Send batch to S3 and track as in-flight request""" if not self.can_start_new_request(): # This shouldn't happen due to flow control, but handle gracefully - self.schedule_event(self.current_time + 0.001, 'retry_batch', batch) + self.schedule_event(self.current_time + 0.001, "retry_batch", batch) return # Sample S3 response characteristics (pass batch size for latency modeling) - s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds + s3_latency = ( + self.sample_s3_latency(batch.size_bytes) / 1000.0 + ) # Convert ms to seconds will_fail = self.rng.random() < self.s3_failure_rate if will_fail: @@ -242,26 +250,28 @@ class PersistenceSimulation: batch=batch, start_time=self.current_time, expected_completion=completion_time, - connection_id=connection_id + connection_id=connection_id, ) self.in_flight_requests[connection_id] = in_flight # Schedule completion event if will_fail: - self.schedule_event(completion_time, 'batch_failed', connection_id) + self.schedule_event(completion_time, "batch_failed", connection_id) else: - self.schedule_event(completion_time, 'batch_completed', connection_id) + self.schedule_event(completion_time, "batch_completed", connection_id) # Log event for analysis - self.timeline_events.append({ - 'time': self.current_time, - 'event': 'batch_sent', - 'batch_id': batch.batch_id, - 'batch_size': batch.size_bytes, - 'commit_count': len(batch.commits), - 'retry_count': batch.retry_count, - 'is_retry': is_retry - }) + self.timeline_events.append( + { + "time": self.current_time, + "event": "batch_sent", + "batch_id": batch.batch_id, + "batch_size": batch.size_bytes, + "commit_count": len(batch.commits), + "retry_count": batch.retry_count, + "is_retry": is_retry, + } + ) def handle_batch_completed(self, connection_id: int): """Handle successful batch completion""" @@ -273,33 +283,39 @@ class PersistenceSimulation: # Calculate metrics batch_latency = self.current_time - batch.created_time - self.batch_metrics.append({ - 'batch_id': batch.batch_id, - 'latency': batch_latency, - 'size_bytes': batch.size_bytes, - 'commit_count': len(batch.commits), - 'retry_count': batch.retry_count - }) + self.batch_metrics.append( + { + "batch_id": batch.batch_id, + "latency": batch_latency, + "size_bytes": batch.size_bytes, + "commit_count": len(batch.commits), + "retry_count": batch.retry_count, + } + ) # Mark commits as completed and calculate end-to-end latency for commit in batch.commits: commit_latency = self.current_time - commit.arrival_time - self.completed_commits.append({ - 'commit_id': commit.commit_id, - 'arrival_time': commit.arrival_time, - 'completion_time': self.current_time, - 'latency': commit_latency, - 'batch_id': batch.batch_id, - 'retry_count': batch.retry_count - }) + self.completed_commits.append( + { + "commit_id": commit.commit_id, + "arrival_time": commit.arrival_time, + "completion_time": self.current_time, + "latency": commit_latency, + "batch_id": batch.batch_id, + "retry_count": batch.retry_count, + } + ) - self.timeline_events.append({ - 'time': self.current_time, - 'event': 'batch_completed', - 'batch_id': batch.batch_id, - 'latency': batch_latency, - 'retry_count': batch.retry_count - }) + self.timeline_events.append( + { + "time": self.current_time, + "event": "batch_completed", + "batch_id": batch.batch_id, + "latency": batch_latency, + "retry_count": batch.retry_count, + } + ) # Try to process any pending work now that we have capacity self.process_pending_work() @@ -312,29 +328,35 @@ class PersistenceSimulation: in_flight = self.in_flight_requests.pop(connection_id) batch = in_flight.batch - self.timeline_events.append({ - 'time': self.current_time, - 'event': 'batch_failed', - 'batch_id': batch.batch_id, - 'retry_count': batch.retry_count - }) + self.timeline_events.append( + { + "time": self.current_time, + "event": "batch_failed", + "batch_id": batch.batch_id, + "retry_count": batch.retry_count, + } + ) if batch.retry_count < self.max_retry_attempts: # Exponential backoff retry batch.retry_count += 1 - backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1)) + backoff_delay = (self.retry_base_delay_ms / 1000.0) * ( + 2 ** (batch.retry_count - 1) + ) retry_time = self.current_time + backoff_delay - self.schedule_event(retry_time, 'retry_batch', batch) + self.schedule_event(retry_time, "retry_batch", batch) self.retry_counts[batch.retry_count] += 1 else: # Max retries exhausted - this would be a fatal error in real system - self.timeline_events.append({ - 'time': self.current_time, - 'event': 'batch_abandoned', - 'batch_id': batch.batch_id, - 'retry_count': batch.retry_count - }) + self.timeline_events.append( + { + "time": self.current_time, + "event": "batch_abandoned", + "batch_id": batch.batch_id, + "retry_count": batch.retry_count, + } + ) def handle_retry_batch(self, batch: Batch): """Handle batch retry""" @@ -374,7 +396,7 @@ class PersistenceSimulation: # Schedule timeout event for current batch if it's the first commit if len(self.current_batch) == 1 and not self.pending_commits: timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0) - self.schedule_event(timeout_time, 'batch_timeout', None) + self.schedule_event(timeout_time, "batch_timeout", None) def handle_batch_timeout(self): """Handle batch timeout trigger""" @@ -387,14 +409,19 @@ class PersistenceSimulation: current_batch_count = len(self.current_batch) in_flight_count = len(self.in_flight_requests) - self.queue_depth_samples.append({ - 'time': self.current_time, - 'pending_commits': pending_count, - 'current_batch_size': current_batch_count, - 'in_flight_requests': in_flight_count, - 'total_unacknowledged': pending_count + current_batch_count + - sum(len(req.batch.commits) for req in self.in_flight_requests.values()) - }) + self.queue_depth_samples.append( + { + "time": self.current_time, + "pending_commits": pending_count, + "current_batch_size": current_batch_count, + "in_flight_requests": in_flight_count, + "total_unacknowledged": pending_count + + current_batch_count + + sum( + len(req.batch.commits) for req in self.in_flight_requests.values() + ), + } + ) def generate_arrivals(self): """Generate Poisson arrival events for the simulation""" @@ -412,17 +439,17 @@ class PersistenceSimulation: commit = Commit( commit_id=self.next_commit_id, arrival_time=current_time, - size_bytes=self.sample_commit_size() # Realistic size distribution + size_bytes=self.sample_commit_size(), # Realistic size distribution ) self.next_commit_id += 1 # Schedule arrival event - self.schedule_event(current_time, 'commit_arrival', commit) + self.schedule_event(current_time, "commit_arrival", commit) # Schedule periodic queue depth sampling sample_time = 0.0 while sample_time < self.simulation_duration_sec: - self.schedule_event(sample_time, 'sample_queue_depth', None) + self.schedule_event(sample_time, "sample_queue_depth", None) sample_time += 0.1 # Sample every 100ms def run_simulation(self): @@ -430,7 +457,9 @@ class PersistenceSimulation: print(f"Starting persistence simulation...") print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec") print(f"Duration: {self.simulation_duration_sec} seconds") - print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}") + print( + f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}" + ) print() # Generate all arrival events @@ -445,17 +474,17 @@ class PersistenceSimulation: if time > self.simulation_duration_sec: break - if event_type == 'commit_arrival': + if event_type == "commit_arrival": self.handle_commit_arrival(data) - elif event_type == 'batch_completed': + elif event_type == "batch_completed": self.handle_batch_completed(data) - elif event_type == 'batch_failed': + elif event_type == "batch_failed": self.handle_batch_failed(data) - elif event_type == 'retry_batch': + elif event_type == "retry_batch": self.handle_retry_batch(data) - elif event_type == 'batch_timeout': + elif event_type == "batch_timeout": self.handle_batch_timeout() - elif event_type == 'sample_queue_depth': + elif event_type == "sample_queue_depth": self.sample_queue_depth() events_processed += 1 @@ -469,38 +498,46 @@ class PersistenceSimulation: return {"error": "No commits completed during simulation"} # Calculate latency statistics - latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms + latencies = [ + c["latency"] * 1000 for c in self.completed_commits + ] # Convert to ms results = { - 'simulation_config': { - 'duration_sec': self.simulation_duration_sec, - 'arrival_rate_per_sec': self.arrival_rate_per_sec, - 'batch_timeout_ms': self.batch_timeout_ms, - 'batch_size_threshold': self.batch_size_threshold, - 'max_in_flight_requests': self.max_in_flight_requests, - 's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})", - 's3_failure_rate': self.s3_failure_rate + "simulation_config": { + "duration_sec": self.simulation_duration_sec, + "arrival_rate_per_sec": self.arrival_rate_per_sec, + "batch_timeout_ms": self.batch_timeout_ms, + "batch_size_threshold": self.batch_size_threshold, + "max_in_flight_requests": self.max_in_flight_requests, + "s3_latency_params": f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})", + "s3_failure_rate": self.s3_failure_rate, }, - 'commit_metrics': { - 'total_commits': len(self.completed_commits), - 'latency_ms': { - 'mean': statistics.mean(latencies), - 'median': statistics.median(latencies), - 'std': statistics.stdev(latencies) if len(latencies) > 1 else 0, - 'min': min(latencies), - 'max': max(latencies), - 'p95': np.percentile(latencies, 95), - 'p99': np.percentile(latencies, 99) - } + "commit_metrics": { + "total_commits": len(self.completed_commits), + "latency_ms": { + "mean": statistics.mean(latencies), + "median": statistics.median(latencies), + "std": statistics.stdev(latencies) if len(latencies) > 1 else 0, + "min": min(latencies), + "max": max(latencies), + "p95": np.percentile(latencies, 95), + "p99": np.percentile(latencies, 99), + }, }, - 'batch_metrics': { - 'total_batches': len(self.batch_metrics), - 'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]), - 'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]), - 'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics]) + "batch_metrics": { + "total_batches": len(self.batch_metrics), + "avg_commits_per_batch": statistics.mean( + [b["commit_count"] for b in self.batch_metrics] + ), + "avg_batch_size_bytes": statistics.mean( + [b["size_bytes"] for b in self.batch_metrics] + ), + "avg_batch_latency_ms": statistics.mean( + [b["latency"] * 1000 for b in self.batch_metrics] + ), }, - 'retry_analysis': dict(self.retry_counts), - 'queue_depth_analysis': self._analyze_queue_depths() + "retry_analysis": dict(self.retry_counts), + "queue_depth_analysis": self._analyze_queue_depths(), } return results @@ -510,26 +547,26 @@ class PersistenceSimulation: if not self.queue_depth_samples: return {} - pending = [s['pending_commits'] for s in self.queue_depth_samples] - in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples] - total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples] + pending = [s["pending_commits"] for s in self.queue_depth_samples] + in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples] + total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples] return { - 'pending_commits': { - 'mean': statistics.mean(pending), - 'max': max(pending), - 'p95': np.percentile(pending, 95) + "pending_commits": { + "mean": statistics.mean(pending), + "max": max(pending), + "p95": np.percentile(pending, 95), }, - 'in_flight_requests': { - 'mean': statistics.mean(in_flight), - 'max': max(in_flight), - 'p95': np.percentile(in_flight, 95) + "in_flight_requests": { + "mean": statistics.mean(in_flight), + "max": max(in_flight), + "p95": np.percentile(in_flight, 95), + }, + "total_unacknowledged": { + "mean": statistics.mean(total_unack), + "max": max(total_unack), + "p95": np.percentile(total_unack, 95), }, - 'total_unacknowledged': { - 'mean': statistics.mean(total_unack), - 'max': max(total_unack), - 'p95': np.percentile(total_unack, 95) - } } def plot_results(self, results: Dict, save_path: Optional[str] = None): @@ -539,57 +576,69 @@ class PersistenceSimulation: return fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) - fig.suptitle('Persistence Thread Simulation Results', fontsize=16) + fig.suptitle("Persistence Thread Simulation Results", fontsize=16) # Plot 1: Commit latency histogram - latencies_ms = [c['latency'] * 1000 for c in self.completed_commits] - ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black') - ax1.set_xlabel('Commit Latency (ms)') - ax1.set_ylabel('Count') - ax1.set_title('Commit Latency Distribution') - ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red', - linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms") - ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange', - linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms") + latencies_ms = [c["latency"] * 1000 for c in self.completed_commits] + ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor="black") + ax1.set_xlabel("Commit Latency (ms)") + ax1.set_ylabel("Count") + ax1.set_title("Commit Latency Distribution") + ax1.axvline( + results["commit_metrics"]["latency_ms"]["mean"], + color="red", + linestyle="--", + label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms", + ) + ax1.axvline( + results["commit_metrics"]["latency_ms"]["p95"], + color="orange", + linestyle="--", + label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms", + ) ax1.legend() # Plot 2: Timeline of commit completions - completion_times = [c['completion_time'] for c in self.completed_commits] - completion_latencies = [c['latency'] * 1000 for c in self.completed_commits] + completion_times = [c["completion_time"] for c in self.completed_commits] + completion_latencies = [c["latency"] * 1000 for c in self.completed_commits] ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10) - ax2.set_xlabel('Time (seconds)') - ax2.set_ylabel('Commit Latency (ms)') - ax2.set_title('Latency Over Time') + ax2.set_xlabel("Time (seconds)") + ax2.set_ylabel("Commit Latency (ms)") + ax2.set_title("Latency Over Time") # Plot 3: Queue depth over time if self.queue_depth_samples: - times = [s['time'] for s in self.queue_depth_samples] - pending = [s['pending_commits'] for s in self.queue_depth_samples] - in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples] - total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples] + times = [s["time"] for s in self.queue_depth_samples] + pending = [s["pending_commits"] for s in self.queue_depth_samples] + in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples] + total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples] - ax3.plot(times, pending, label='Pending Commits', alpha=0.8) - ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8) - ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8) - ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--', - label=f'Max In-Flight Limit ({self.max_in_flight_requests})') - ax3.set_xlabel('Time (seconds)') - ax3.set_ylabel('Count') - ax3.set_title('Queue Depths Over Time') + ax3.plot(times, pending, label="Pending Commits", alpha=0.8) + ax3.plot(times, in_flight, label="In-Flight Requests", alpha=0.8) + ax3.plot(times, total_unack, label="Total Unacknowledged", alpha=0.8) + ax3.axhline( + self.max_in_flight_requests, + color="red", + linestyle="--", + label=f"Max In-Flight Limit ({self.max_in_flight_requests})", + ) + ax3.set_xlabel("Time (seconds)") + ax3.set_ylabel("Count") + ax3.set_title("Queue Depths Over Time") ax3.legend() # Plot 4: Batch size distribution if self.batch_metrics: - batch_sizes = [b['commit_count'] for b in self.batch_metrics] - ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black') - ax4.set_xlabel('Commits per Batch') - ax4.set_ylabel('Count') - ax4.set_title('Batch Size Distribution') + batch_sizes = [b["commit_count"] for b in self.batch_metrics] + ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor="black") + ax4.set_xlabel("Commits per Batch") + ax4.set_ylabel("Count") + ax4.set_title("Batch Size Distribution") plt.tight_layout() if save_path: - plt.savefig(save_path, dpi=300, bbox_inches='tight') + plt.savefig(save_path, dpi=300, bbox_inches="tight") print(f"Plots saved to {save_path}") else: plt.show() @@ -602,7 +651,7 @@ def print_results(results: Dict): print("=" * 80) # Configuration - config = results['simulation_config'] + config = results["simulation_config"] print(f"\nConfiguration:") print(f" Duration: {config['duration_sec']}s") print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec") @@ -613,8 +662,8 @@ def print_results(results: Dict): print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}") # Commit metrics - commit_metrics = results['commit_metrics'] - latency = commit_metrics['latency_ms'] + commit_metrics = results["commit_metrics"] + latency = commit_metrics["latency_ms"] print(f"\nCommit Performance:") print(f" Total Commits: {commit_metrics['total_commits']:,}") print(f" Latency Mean: {latency['mean']:.2f}ms") @@ -625,7 +674,7 @@ def print_results(results: Dict): print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms") # Batch metrics - batch_metrics = results['batch_metrics'] + batch_metrics = results["batch_metrics"] print(f"\nBatching Performance:") print(f" Total Batches: {batch_metrics['total_batches']:,}") print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}") @@ -633,24 +682,30 @@ def print_results(results: Dict): print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms") # Retry analysis - if results['retry_analysis']: + if results["retry_analysis"]: print(f"\nRetry Analysis:") - for retry_count, occurrences in results['retry_analysis'].items(): + for retry_count, occurrences in results["retry_analysis"].items(): print(f" {occurrences:,} batches required {retry_count} retries") # Queue depth analysis - if results['queue_depth_analysis']: - queue_analysis = results['queue_depth_analysis'] + if results["queue_depth_analysis"]: + queue_analysis = results["queue_depth_analysis"] print(f"\nQueue Depth Analysis:") - if 'pending_commits' in queue_analysis: - pending = queue_analysis['pending_commits'] - print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}") - if 'in_flight_requests' in queue_analysis: - in_flight = queue_analysis['in_flight_requests'] - print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}") - if 'total_unacknowledged' in queue_analysis: - total = queue_analysis['total_unacknowledged'] - print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}") + if "pending_commits" in queue_analysis: + pending = queue_analysis["pending_commits"] + print( + f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}" + ) + if "in_flight_requests" in queue_analysis: + in_flight = queue_analysis["in_flight_requests"] + print( + f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}" + ) + if "total_unacknowledged" in queue_analysis: + total = queue_analysis["total_unacknowledged"] + print( + f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}" + ) if __name__ == "__main__": @@ -664,8 +719,16 @@ if __name__ == "__main__": {"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5}, {"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10}, {"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20}, - {"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0}, - {"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0}, + { + "name": "Lower Timeout (max_in_flight=10, timeout=2ms)", + "max_in_flight_requests": 10, + "batch_timeout_ms": 2.0, + }, + { + "name": "Higher Timeout (max_in_flight=10, timeout=10ms)", + "max_in_flight_requests": 10, + "batch_timeout_ms": 10.0, + }, ] results_comparison = [] @@ -682,7 +745,7 @@ if __name__ == "__main__": s3_latency_scale=25.0, s3_failure_rate=0.01, max_in_flight_requests=config.get("max_in_flight_requests", 5), - batch_timeout_ms=config.get("batch_timeout_ms", 5.0) + batch_timeout_ms=config.get("batch_timeout_ms", 5.0), ) results = sim.run_simulation() @@ -690,9 +753,9 @@ if __name__ == "__main__": results_comparison.append(results) # Print key metrics for quick comparison - commit_metrics = results['commit_metrics'] - batch_metrics = results['batch_metrics'] - queue_metrics = results.get('queue_depth_analysis', {}) + commit_metrics = results["commit_metrics"] + batch_metrics = results["batch_metrics"] + queue_metrics = results.get("queue_depth_analysis", {}) print(f"\nKey Metrics:") print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms") @@ -701,8 +764,12 @@ if __name__ == "__main__": print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}") print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB") if queue_metrics: - print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}") - print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}") + print( + f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}" + ) + print( + f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}" + ) # Summary comparison print(f"\n{'='*80}") @@ -713,25 +780,33 @@ if __name__ == "__main__": for result in results_comparison: name = result["config_name"] - commit_metrics = result['commit_metrics'] - queue_metrics = result.get('queue_depth_analysis', {}) - mean_lat = commit_metrics['latency_ms']['mean'] - p95_lat = commit_metrics['latency_ms']['p95'] - p99_lat = commit_metrics['latency_ms']['p99'] - avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0) + commit_metrics = result["commit_metrics"] + queue_metrics = result.get("queue_depth_analysis", {}) + mean_lat = commit_metrics["latency_ms"]["mean"] + p95_lat = commit_metrics["latency_ms"]["p95"] + p99_lat = commit_metrics["latency_ms"]["p99"] + avg_queue = queue_metrics.get("total_unacknowledged", {}).get("mean", 0) - print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}") + print( + f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}" + ) print(f"\nRecommendation: Choose config with lowest P95/P99 latencies") - print(f"Note: Higher in-flight allows more parallelism but may increase queue variability") + print( + f"Note: Higher in-flight allows more parallelism but may increase queue variability" + ) # Generate plots for best configuration - best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95']) + best_config = min( + results_comparison, key=lambda r: r["commit_metrics"]["latency_ms"]["p95"] + ) print(f"\nGenerating plots for best configuration: {best_config['config_name']}") try: # Re-run best config to get simulation object for plotting - best_params = next(c for c in configs if c['name'] == best_config['config_name']) + best_params = next( + c for c in configs if c["name"] == best_config["config_name"] + ) sim_best = PersistenceSimulation( arrival_rate_per_sec=1000.0, simulation_duration_sec=30.0, @@ -739,10 +814,10 @@ if __name__ == "__main__": s3_latency_scale=25.0, s3_failure_rate=0.01, max_in_flight_requests=best_params.get("max_in_flight_requests", 5), - batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0) + batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0), ) sim_best.run_simulation() - sim_best.plot_results(best_config, f'persistence_optimization_results.png') + sim_best.plot_results(best_config, f"persistence_optimization_results.png") except Exception as e: print(f"\nCould not generate plots: {e}") - print("Install matplotlib and numpy to enable visualization") \ No newline at end of file + print("Install matplotlib and numpy to enable visualization")