#!/usr/bin/env python3 """ Database Persistence Pattern Comparison Compares WeaselDB's batched S3 persistence approach against traditional database persistence patterns to understand trade-offs in latency, throughput, consistency, and operational complexity. Simulated approaches: 1. WeaselDB: Batched async S3 persistence with optimistic concurrency 2. Traditional WAL: Write-ahead log with periodic sync to disk 3. Synchronous: Immediate disk sync per transaction (like PostgreSQL serializable) 4. Group Commit: Batched disk writes with configurable group size 5. Async Replication: Immediate response with async durability """ import heapq import random import statistics from dataclasses import dataclass, field from typing import List, Optional, Dict, Tuple, Any import numpy as np import matplotlib.pyplot as plt from collections import defaultdict, deque from abc import ABC, abstractmethod from enum import Enum import time class PersistencePattern(Enum): WEASELDB_S3 = "WeaselDB S3 Batched" TRADITIONAL_WAL = "Traditional WAL" SYNCHRONOUS = "Synchronous Disk" GROUP_COMMIT = "Group Commit" ASYNC_REPLICATION = "Async Replication" @dataclass class Transaction: """Represents a database transaction/commit""" txn_id: int arrival_time: float size_bytes: int = 1024 requires_durability: bool = True # Some txns can be async @dataclass class PersistenceMetrics: """Metrics for a persistence approach""" pattern: PersistencePattern total_transactions: int completed_transactions: int # Latency metrics (milliseconds) min_latency: float mean_latency: float median_latency: float p95_latency: float p99_latency: float max_latency: float # Throughput metrics avg_throughput_tps: float peak_throughput_tps: float # Resource metrics avg_disk_iops: float avg_network_mbps: float storage_efficiency: float # GB stored / GB logical data # Consistency metrics durability_guarantee: str consistency_model: str recovery_time_estimate: float # Operational metrics operational_complexity: int # 1-10 scale infrastructure_cost: float # Relative cost per transaction class PersistenceSimulator(ABC): """Abstract base class for persistence pattern simulators""" def __init__( self, simulation_duration: float = 60.0, arrival_rate_per_sec: float = 1000.0 ): self.simulation_duration = simulation_duration self.arrival_rate_per_sec = arrival_rate_per_sec # Simulation state self.current_time = 0.0 self.event_queue = [] self.completed_transactions = [] self.next_txn_id = 0 # Metrics tracking self.disk_iops = [] self.network_usage = [] self.throughput_samples = [] # Random number generator self.rng = np.random.RandomState(42) def generate_transaction(self, arrival_time: float) -> Transaction: """Generate a transaction with realistic characteristics""" # Size distribution: mostly small, some large if self.rng.random() < 0.8: # 80% small transactions size = self.rng.randint(100, 2048) # 100B - 2KB elif self.rng.random() < 0.95: # 15% medium size = self.rng.randint(2048, 20480) # 2KB - 20KB else: # 5% large transactions size = self.rng.randint(20480, 102400) # 20KB - 100KB return Transaction( txn_id=self.next_txn_id, arrival_time=arrival_time, size_bytes=size, requires_durability=True, ) @abstractmethod def process_transaction(self, txn: Transaction) -> None: """Process a transaction according to the persistence pattern""" pass @abstractmethod def get_pattern_name(self) -> PersistencePattern: """Return the persistence pattern this simulator implements""" pass def run_simulation(self) -> PersistenceMetrics: """Run the simulation and return metrics""" # Generate arrival events self._generate_arrivals() # Process events while self.event_queue and self.current_time < self.simulation_duration: time, event_type, data = heapq.heappop(self.event_queue) self.current_time = time if event_type == "transaction_arrival": self.process_transaction(data) elif event_type == "custom": self._handle_custom_event(data) return self._calculate_metrics() def _generate_arrivals(self): """Generate Poisson arrival events""" time = 0.0 while time < self.simulation_duration: inter_arrival = self.rng.exponential(1.0 / self.arrival_rate_per_sec) time += inter_arrival if time >= self.simulation_duration: break txn = self.generate_transaction(time) self.next_txn_id += 1 heapq.heappush(self.event_queue, (time, "transaction_arrival", txn)) def _handle_custom_event(self, data): """Handle custom events - override in subclasses""" pass def schedule_event(self, time: float, event_type: str, data: Any): """Schedule a custom event""" heapq.heappush(self.event_queue, (time, event_type, data)) def _calculate_metrics(self) -> PersistenceMetrics: """Calculate performance metrics from completed transactions""" if not self.completed_transactions: raise ValueError("No transactions completed") latencies = [txn["latency_ms"] for txn in self.completed_transactions] return PersistenceMetrics( pattern=self.get_pattern_name(), total_transactions=self.next_txn_id, completed_transactions=len(self.completed_transactions), # Latency metrics min_latency=min(latencies), mean_latency=statistics.mean(latencies), median_latency=statistics.median(latencies), p95_latency=np.percentile(latencies, 95), p99_latency=np.percentile(latencies, 99), max_latency=max(latencies), # Throughput metrics avg_throughput_tps=len(self.completed_transactions) / self.simulation_duration, peak_throughput_tps=self._calculate_peak_throughput(), # Resource metrics - implemented by subclasses avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0, avg_network_mbps=( statistics.mean(self.network_usage) if self.network_usage else 0 ), storage_efficiency=self._calculate_storage_efficiency(), # Pattern-specific characteristics durability_guarantee=self._get_durability_guarantee(), consistency_model=self._get_consistency_model(), recovery_time_estimate=self._get_recovery_time(), operational_complexity=self._get_operational_complexity(), infrastructure_cost=self._get_infrastructure_cost(), ) def _calculate_peak_throughput(self) -> float: """Calculate peak throughput in 1-second windows""" if not self.completed_transactions: return 0.0 # Group transactions by second throughput_by_second = defaultdict(int) for txn in self.completed_transactions: second = int(txn["completion_time"]) throughput_by_second[second] += 1 return max(throughput_by_second.values()) if throughput_by_second else 0 # Abstract methods for pattern-specific characteristics @abstractmethod def _calculate_storage_efficiency(self) -> float: pass @abstractmethod def _get_durability_guarantee(self) -> str: pass @abstractmethod def _get_consistency_model(self) -> str: pass @abstractmethod def _get_recovery_time(self) -> float: pass @abstractmethod def _get_operational_complexity(self) -> int: pass @abstractmethod def _get_infrastructure_cost(self) -> float: pass class WeaselDBSimulator(PersistenceSimulator): """WeaselDB's batched S3 persistence simulation""" def __init__( self, batch_timeout_ms: float = 1.0, batch_size_threshold: int = 800000, max_in_flight: int = 50, **kwargs, ): super().__init__(**kwargs) self.batch_timeout_ms = batch_timeout_ms self.batch_size_threshold = batch_size_threshold self.max_in_flight = max_in_flight # State self.current_batch = [] self.batch_start_time = None self.in_flight_batches = {} self.next_batch_id = 0 # S3 characteristics self.s3_base_latency_ms = 60.0 # From our simulation self.s3_size_penalty_per_mb = 20.0 def get_pattern_name(self) -> PersistencePattern: return PersistencePattern.WEASELDB_S3 def process_transaction(self, txn: Transaction): """Process transaction using WeaselDB batching logic""" # Add to current batch if not self.current_batch: self.batch_start_time = self.current_time self.current_batch.append(txn) # Check if we should send batch if ( self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight ): self._send_current_batch() def _should_send_batch(self) -> bool: """Check batch triggers""" if not self.current_batch: return False # Size trigger batch_size = sum(txn.size_bytes for txn in self.current_batch) if batch_size >= self.batch_size_threshold: return True # Time trigger if self.batch_start_time and (self.current_time - self.batch_start_time) >= ( self.batch_timeout_ms / 1000.0 ): return True return False def _send_current_batch(self): """Send current batch to S3""" if not self.current_batch: return batch_id = self.next_batch_id self.next_batch_id += 1 batch_size = sum(txn.size_bytes for txn in self.current_batch) # Sample S3 latency s3_latency = self._sample_s3_latency(batch_size) / 1000.0 completion_time = self.current_time + s3_latency # Track batch self.in_flight_batches[batch_id] = { "transactions": self.current_batch.copy(), "sent_time": self.current_time, "completion_time": completion_time, "size_bytes": batch_size, } # Schedule completion self.schedule_event( completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id} ) # Track network usage self.network_usage.append(batch_size / (1024 * 1024)) # MB # Clear batch self.current_batch.clear() self.batch_start_time = None def _sample_s3_latency(self, batch_size_bytes: int) -> float: """Sample S3 latency with size scaling""" base = self.s3_base_latency_ms size_penalty = (batch_size_bytes / (1024 * 1024)) * self.s3_size_penalty_per_mb variable = self.rng.gamma(2.0, 15.0) # Variable component return 30.0 + variable + size_penalty # 30ms RTT + variable + size def _handle_custom_event(self, data): """Handle batch completion events""" if data["type"] == "batch_complete": batch_id = data["batch_id"] if batch_id in self.in_flight_batches: batch = self.in_flight_batches.pop(batch_id) # Mark transactions complete for txn in batch["transactions"]: latency_ms = (self.current_time - txn.arrival_time) * 1000 self.completed_transactions.append( { "txn_id": txn.txn_id, "arrival_time": txn.arrival_time, "completion_time": self.current_time, "latency_ms": latency_ms, "size_bytes": txn.size_bytes, } ) def _calculate_storage_efficiency(self) -> float: return 1.0 # S3 has no storage overhead def _get_durability_guarantee(self) -> str: return "Eventually durable (S3 11 9's)" def _get_consistency_model(self) -> str: return "Optimistic concurrency, eventual consistency" def _get_recovery_time(self) -> float: return 0.1 # Fast recovery, just reconnect to S3 def _get_operational_complexity(self) -> int: return 3 # Moderate - S3 managed, but need batching logic def _get_infrastructure_cost(self) -> float: return 1.0 # Baseline cost class TraditionalWALSimulator(PersistenceSimulator): """Traditional Write-Ahead Log with periodic sync""" def __init__( self, wal_sync_interval_ms: float = 10.0, checkpoint_interval_sec: float = 30.0, **kwargs, ): super().__init__(**kwargs) self.wal_sync_interval_ms = wal_sync_interval_ms self.checkpoint_interval_sec = checkpoint_interval_sec # WAL state self.wal_buffer = [] self.last_sync_time = 0.0 self.pending_transactions = {} # Waiting for sync # EBS characteristics self.disk_latency_ms = 1.0 # EBS base latency self.disk_iops_limit = 10000 # Typical SSD # Schedule periodic syncs self._schedule_periodic_syncs() def get_pattern_name(self) -> PersistencePattern: return PersistencePattern.TRADITIONAL_WAL def process_transaction(self, txn: Transaction): """Write to WAL buffer, wait for sync""" # Write to WAL buffer (immediate) self.wal_buffer.append(txn) self.pending_transactions[txn.txn_id] = txn # Track disk IOPS (write to WAL) self.disk_iops.append(1.0) def _schedule_periodic_syncs(self): """Schedule periodic WAL sync events""" sync_time = self.wal_sync_interval_ms / 1000.0 while sync_time < self.simulation_duration: self.schedule_event(sync_time, "custom", {"type": "wal_sync"}) sync_time += self.wal_sync_interval_ms / 1000.0 def _handle_custom_event(self, data): """Handle WAL sync events""" if data["type"] == "wal_sync": self._perform_wal_sync() def _perform_wal_sync(self): """Sync WAL buffer to disk""" if not self.wal_buffer: return # Calculate sync latency based on buffer size buffer_size = sum(txn.size_bytes for txn in self.wal_buffer) sync_latency = self._calculate_disk_sync_latency(buffer_size) completion_time = self.current_time + sync_latency / 1000.0 # Schedule sync completion syncing_txns = list(self.wal_buffer) self.schedule_event( completion_time, "custom", {"type": "sync_complete", "transactions": syncing_txns}, ) # Track IOPS for sync operation self.disk_iops.append(len(self.wal_buffer)) # Clear buffer self.wal_buffer.clear() self.last_sync_time = self.current_time def _calculate_disk_sync_latency(self, size_bytes: int) -> float: """Calculate disk sync latency with realistic fsync modeling including directory sync""" # Data write to page cache write_latency = 0.1 # Fast write to page cache # EBS sequential write throughput throughput_mbps = 1000.0 # EBS gp3 throughput size_mb = size_bytes / (1024 * 1024) transfer_latency = size_mb * (1000.0 / throughput_mbps) # WAL file fsync latency on EBS # fdatasync on EBS with network replication file_fsync_base = 0.8 # Higher base latency on EBS file_fsync_variable = self.rng.gamma(2.2, 0.4) # More variable due to network # Size penalty for large WAL syncs size_penalty = min(size_mb * 0.05, 1.0) # Smaller penalty than batched writes file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty # WAL typically appends to existing files, so no directory fsync needed # Directory fsync only required when creating new WAL segments return write_latency + transfer_latency + file_fsync_latency def _handle_custom_event(self, data): """Handle sync completion""" if data["type"] == "wal_sync": self._perform_wal_sync() elif data["type"] == "sync_complete": # Mark transactions as durable for txn in data["transactions"]: if txn.txn_id in self.pending_transactions: del self.pending_transactions[txn.txn_id] latency_ms = (self.current_time - txn.arrival_time) * 1000 self.completed_transactions.append( { "txn_id": txn.txn_id, "arrival_time": txn.arrival_time, "completion_time": self.current_time, "latency_ms": latency_ms, "size_bytes": txn.size_bytes, } ) def _calculate_storage_efficiency(self) -> float: return 2.0 # WAL + main storage def _get_durability_guarantee(self) -> str: return "ACID durable after WAL sync" def _get_consistency_model(self) -> str: return "Strict ACID consistency" def _get_recovery_time(self) -> float: return 30.0 # WAL replay time def _get_operational_complexity(self) -> int: return 7 # Complex - WAL management, checkpoints, recovery def _get_infrastructure_cost(self) -> float: return 1.5 # Higher due to disk I/O overhead class SynchronousSimulator(PersistenceSimulator): """Synchronous disk persistence (like PostgreSQL with synchronous_commit=on)""" def __init__(self, **kwargs): super().__init__(**kwargs) self.disk_latency_ms = 1.0 # EBS base latency def get_pattern_name(self) -> PersistencePattern: return PersistencePattern.SYNCHRONOUS def process_transaction(self, txn: Transaction): """Immediately sync transaction to disk""" # Calculate disk write latency disk_latency = self._calculate_disk_latency(txn.size_bytes) / 1000.0 completion_time = self.current_time + disk_latency # Schedule completion self.schedule_event( completion_time, "custom", {"type": "disk_write_complete", "transaction": txn}, ) # Track disk IOPS self.disk_iops.append(1.0) def _calculate_disk_latency(self, size_bytes: int) -> float: """Calculate per-transaction disk write latency with fsync""" # Write to page cache (fast) write_latency = 0.1 # FSYNC latency for immediate durability on EBS # Each transaction requires its own fsync - expensive on network storage! fsync_base = 1.5 # Much higher base latency for individual fsyncs on EBS fsync_variable = self.rng.gamma(2.3, 0.5) # More variable due to network # Small write penalty - less efficient than batched writes if size_bytes < 4096: penalty = 0.5 # Individual small writes are less efficient else: penalty = 0.0 fsync_latency = fsync_base + fsync_variable + penalty # Synchronous commits to existing database files don't need directory fsync # Directory fsync only needed when creating new database files/tablespaces return write_latency + fsync_latency def _handle_custom_event(self, data): """Handle disk write completion""" if data["type"] == "disk_write_complete": txn = data["transaction"] latency_ms = (self.current_time - txn.arrival_time) * 1000 self.completed_transactions.append( { "txn_id": txn.txn_id, "arrival_time": txn.arrival_time, "completion_time": self.current_time, "latency_ms": latency_ms, "size_bytes": txn.size_bytes, } ) def _calculate_storage_efficiency(self) -> float: return 1.5 # Some overhead for metadata def _get_durability_guarantee(self) -> str: return "Immediate ACID durability" def _get_consistency_model(self) -> str: return "Strict ACID with immediate consistency" def _get_recovery_time(self) -> float: return 5.0 # Fast recovery, data already on disk def _get_operational_complexity(self) -> int: return 5 # Moderate - standard database operations def _get_infrastructure_cost(self) -> float: return 2.0 # High due to disk I/O per transaction class WeaselDBDiskSimulator(PersistenceSimulator): """WeaselDB's batched disk persistence simulation""" def __init__( self, batch_timeout_ms: float = 1.0, batch_size_threshold: int = 800000, max_in_flight: int = 50, **kwargs, ): super().__init__(**kwargs) self.batch_timeout_ms = batch_timeout_ms self.batch_size_threshold = batch_size_threshold self.max_in_flight = max_in_flight # State self.current_batch = [] self.batch_start_time = None self.in_flight_batches = {} self.next_batch_id = 0 # EBS characteristics (gp3 or io2 volumes) self.disk_base_latency_ms = 0.5 # EBS has higher base latency than local NVMe self.disk_throughput_mbps = 1000.0 # EBS gp3 max throughput def get_pattern_name(self) -> PersistencePattern: return PersistencePattern.WEASELDB_S3 # Reuse enum, but it's actually disk def process_transaction(self, txn: Transaction): """Process transaction using WeaselDB batching logic""" # Add to current batch if not self.current_batch: self.batch_start_time = self.current_time self.current_batch.append(txn) # Check if we should send batch if ( self._should_send_batch() and len(self.in_flight_batches) < self.max_in_flight ): self._send_current_batch() def _should_send_batch(self) -> bool: """Check batch triggers""" if not self.current_batch: return False # Size trigger batch_size = sum(txn.size_bytes for txn in self.current_batch) if batch_size >= self.batch_size_threshold: return True # Time trigger if self.batch_start_time and (self.current_time - self.batch_start_time) >= ( self.batch_timeout_ms / 1000.0 ): return True return False def _send_current_batch(self): """Send current batch to disk""" if not self.current_batch: return batch_id = self.next_batch_id self.next_batch_id += 1 batch_size = sum(txn.size_bytes for txn in self.current_batch) # Sample disk write latency disk_latency = self._sample_disk_latency(batch_size) / 1000.0 completion_time = self.current_time + disk_latency # Track batch self.in_flight_batches[batch_id] = { "transactions": self.current_batch.copy(), "sent_time": self.current_time, "completion_time": completion_time, "size_bytes": batch_size, } # Schedule completion self.schedule_event( completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id} ) # Track disk IOPS (one write operation per batch) self.disk_iops.append(1.0) # Clear batch self.current_batch.clear() self.batch_start_time = None def _sample_disk_latency(self, batch_size_bytes: int) -> float: """Sample disk write latency with realistic fsync modeling including directory sync""" # Base latency for the write command (data goes to page cache) write_latency = self.disk_base_latency_ms # Throughput-based latency for data transfer to page cache size_mb = batch_size_bytes / (1024 * 1024) transfer_latency = ( size_mb / self.disk_throughput_mbps ) * 1000.0 # Convert to ms # FSYNC latency for EBS - forces write to replicated storage # EBS has higher fsync latency due to network replication file_fsync_base = 1.0 # Higher base fsync latency for EBS file_fsync_variable = self.rng.gamma(2.5, 0.6) # More variable due to network # Size-dependent fsync penalty for large batches size_penalty = min(size_mb * 0.1, 2.0) # Max 2ms penalty file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty # Directory fsync latency - required for WeaselDB batch file creation on EBS # EBS directory metadata sync is also network-replicated dir_fsync_base = 0.5 # Higher directory metadata sync latency on EBS dir_fsync_variable = self.rng.gamma(1.8, 0.3) # More variable due to network dir_fsync_latency = dir_fsync_base + dir_fsync_variable # Total latency: write + file fsync + directory fsync # WeaselDB needs directory fsync for batch file durability guarantees return write_latency + transfer_latency + file_fsync_latency + dir_fsync_latency def _handle_custom_event(self, data): """Handle batch completion events""" if data["type"] == "batch_complete": batch_id = data["batch_id"] if batch_id in self.in_flight_batches: batch = self.in_flight_batches.pop(batch_id) # Mark transactions complete for txn in batch["transactions"]: latency_ms = (self.current_time - txn.arrival_time) * 1000 self.completed_transactions.append( { "txn_id": txn.txn_id, "arrival_time": txn.arrival_time, "completion_time": self.current_time, "latency_ms": latency_ms, "size_bytes": txn.size_bytes, } ) def _calculate_storage_efficiency(self) -> float: return 1.2 # Some overhead for batching metadata def _get_durability_guarantee(self) -> str: return "ACID durable after EBS replication" def _get_consistency_model(self) -> str: return "Strict serializable with optimistic concurrency" def _get_recovery_time(self) -> float: return 10.0 # EBS volume attachment + recovery def _get_operational_complexity(self) -> int: return 4 # Moderate - batching logic + disk management def _get_infrastructure_cost(self) -> float: return 1.4 # Higher than S3 due to EBS provisioned storage + replication class DatabaseComparisonFramework: """Framework for comparing different database persistence patterns""" def __init__(self, simulation_duration: float = 30.0): self.simulation_duration = simulation_duration def run_comparison( self, arrival_rates: List[float] = [100, 500, 1000, 2000] ) -> Dict[str, List[PersistenceMetrics]]: """Run comparison across multiple arrival rates""" results = defaultdict(list) for rate in arrival_rates: print(f"\nTesting at {rate} TPS...") # WeaselDB S3 (optimized config) weasel_s3 = WeaselDBSimulator( batch_timeout_ms=1.0, batch_size_threshold=800000, max_in_flight=50, simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate, ) weasel_s3_metrics = weasel_s3.run_simulation() results["WeaselDB S3"].append(weasel_s3_metrics) print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms") # WeaselDB EBS (same batching, EBS storage) weasel_ebs = WeaselDBDiskSimulator( batch_timeout_ms=1.0, batch_size_threshold=800000, max_in_flight=50, simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate, ) weasel_ebs_metrics = weasel_ebs.run_simulation() results["WeaselDB EBS"].append(weasel_ebs_metrics) print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms") # Traditional WAL wal = TraditionalWALSimulator( wal_sync_interval_ms=10.0, simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate, ) wal_metrics = wal.run_simulation() results["Traditional WAL"].append(wal_metrics) print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms") # Synchronous sync = SynchronousSimulator( simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate ) sync_metrics = sync.run_simulation() results["Synchronous"].append(sync_metrics) print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms") return dict(results) def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]): """Print comprehensive comparison report""" print("\n" + "=" * 80) print("DATABASE PERSISTENCE PATTERN COMPARISON") print("=" * 80) # Get arrival rates for headers arrival_rates = [100, 500, 1000, 2000] # Latency comparison print(f"\nP95 LATENCY COMPARISON (ms)") print(f"{'Pattern':<20}", end="") for rate in arrival_rates: print(f"{rate:>8} TPS", end="") print() print("-" * 60) for pattern_name, metrics_list in results.items(): print(f"{pattern_name:<20}", end="") for metrics in metrics_list: print(f"{metrics.p95_latency:>8.1f}", end="") print() # Throughput comparison print(f"\nTHROUGHPUT ACHIEVED (TPS)") print(f"{'Pattern':<20}", end="") for rate in arrival_rates: print(f"{rate:>8} TPS", end="") print() print("-" * 60) for pattern_name, metrics_list in results.items(): print(f"{pattern_name:<20}", end="") for metrics in metrics_list: print(f"{metrics.avg_throughput_tps:>8.1f}", end="") print() # Characteristics comparison print(f"\nSYSTEM CHARACTERISTICS") print( f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}" ) print("-" * 85) for pattern_name, metrics_list in results.items(): metrics = metrics_list[0] # Use first metrics for characteristics print( f"{pattern_name:<20} {metrics.durability_guarantee:<25} " f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} " f"{metrics.infrastructure_cost:<6.1f}" ) # Performance sweet spots print(f"\nPERFORMANCE SWEET SPOTS") print("-" * 40) for rate in arrival_rates: print(f"\nAt {rate} TPS:") rate_results = [ (name, metrics_list[arrival_rates.index(rate)]) for name, metrics_list in results.items() ] # Sort by P95 latency rate_results.sort(key=lambda x: x[1].p95_latency) for i, (name, metrics) in enumerate(rate_results): rank_symbol = ( "🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " " ) print( f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, " f"{metrics.avg_throughput_tps:.0f} TPS achieved" ) def plot_comparison_results( self, results: Dict[str, List[PersistenceMetrics]], save_path: Optional[str] = None, ): """Plot comparison results""" try: arrival_rates = [100, 500, 1000, 2000] fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12)) fig.suptitle("Database Persistence Pattern Comparison", fontsize=16) # Plot 1: P95 Latency vs Load for pattern_name, metrics_list in results.items(): p95_latencies = [m.p95_latency for m in metrics_list] ax1.plot( arrival_rates, p95_latencies, marker="o", linewidth=2, label=pattern_name, ) ax1.set_xlabel("Arrival Rate (TPS)") ax1.set_ylabel("P95 Latency (ms)") ax1.set_title("P95 Latency vs Load") ax1.legend() ax1.grid(True, alpha=0.3) ax1.set_yscale("log") # Plot 2: Throughput Achieved for pattern_name, metrics_list in results.items(): throughputs = [m.avg_throughput_tps for m in metrics_list] ax2.plot( arrival_rates, throughputs, marker="s", linewidth=2, label=pattern_name, ) # Perfect throughput line ax2.plot( arrival_rates, arrival_rates, "k--", alpha=0.5, label="Perfect (no loss)", ) ax2.set_xlabel("Target Rate (TPS)") ax2.set_ylabel("Achieved Throughput (TPS)") ax2.set_title("Throughput: Target vs Achieved") ax2.legend() ax2.grid(True, alpha=0.3) # Plot 3: Latency Distribution at 1000 TPS rate_idx = 2 # 1000 TPS for pattern_name, metrics_list in results.items(): metrics = metrics_list[rate_idx] # Plot latency percentiles percentiles = [50, 95, 99] values = [ metrics.median_latency, metrics.p95_latency, metrics.p99_latency, ] ax3.bar( [f"{pattern_name}\nP{p}" for p in percentiles], values, alpha=0.7, label=pattern_name, ) ax3.set_ylabel("Latency (ms)") ax3.set_title("Latency Percentiles at 1000 TPS") ax3.set_yscale("log") ax3.grid(True, alpha=0.3) # Plot 4: Cost vs Performance for pattern_name, metrics_list in results.items(): costs = [m.infrastructure_cost for m in metrics_list] p95s = [m.p95_latency for m in metrics_list] # Use different markers for different patterns markers = {"WeaselDB": "o", "Traditional WAL": "s", "Synchronous": "^"} marker = markers.get(pattern_name, "o") ax4.scatter( costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name ) ax4.set_xlabel("Infrastructure Cost (relative)") ax4.set_ylabel("P95 Latency (ms)") ax4.set_title("Cost vs Performance Trade-off") ax4.legend() ax4.grid(True, alpha=0.3) ax4.set_yscale("log") plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches="tight") print(f"Comparison plots saved to {save_path}") else: plt.show() except Exception as e: print(f"Could not generate plots: {e}") def main(): """Run database persistence pattern comparison""" print("Database Persistence Pattern Comparison") print("Comparing WeaselDB vs Traditional Database Approaches") print() comparison = DatabaseComparisonFramework(simulation_duration=20.0) results = comparison.run_comparison() comparison.print_comparison_report(results) try: comparison.plot_comparison_results(results, "database_comparison.png") except Exception as e: print(f"Could not generate plots: {e}") if __name__ == "__main__": main()