Files
weaseldb/tools/latency_sim/persistence_simulation.py

824 lines
31 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Persistence Thread Simulation
Simulates the persistence thread design from persistence.md to analyze
commit latency distributions with Poisson arrival times and realistic
S3 response characteristics.
Key metrics tracked:
- End-to-end commit latency (arrival to acknowledgment)
- Batch processing latencies
- Queue depths and flow control behavior
- Retry patterns and failure handling
"""
import heapq
import random
import statistics
from dataclasses import dataclass, field
from typing import List, Optional, Dict, Tuple
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict, deque
import time
@dataclass
class Commit:
"""Represents a single commit request"""
commit_id: int
arrival_time: float
size_bytes: int = 1024 # Default 1KB per commit
@dataclass
class Batch:
"""Represents a batch of commits being processed"""
batch_id: int
commits: List[Commit]
created_time: float
size_bytes: int = field(init=False)
retry_count: int = 0
def __post_init__(self):
self.size_bytes = sum(c.size_bytes for c in self.commits)
@dataclass
class InFlightRequest:
"""Tracks an in-flight S3 request"""
batch: Batch
start_time: float
expected_completion: float
connection_id: int
class PersistenceSimulation:
"""
Simulates the persistence thread behavior described in persistence.md
For S3 latency, we use a Gamma distribution which is recommended for
modeling network service response times because:
- It has a natural lower bound (minimum network RTT)
- It can model the right-skewed tail typical of network services
- It captures both typical fast responses and occasional slow responses
- Shape parameter controls the heaviness of the tail
"""
def __init__(
self,
# Configuration from persistence.md defaults
batch_timeout_ms: float = 5.0,
batch_size_threshold: int = 1024 * 1024, # 1MB
max_in_flight_requests: int = 5,
max_retry_attempts: int = 3,
retry_base_delay_ms: float = 100.0,
# S3 latency modeling (Gamma distribution parameters)
s3_latency_shape: float = 2.0, # Shape parameter (α)
s3_latency_scale: float = 25.0, # Scale parameter (β)
s3_failure_rate: float = 0.01, # 1% failure rate
# Arrival rate modeling
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
# Simulation parameters
simulation_duration_sec: float = 60.0,
):
# Configuration
self.batch_timeout_ms = batch_timeout_ms
self.batch_size_threshold = batch_size_threshold
self.max_in_flight_requests = max_in_flight_requests
self.max_retry_attempts = max_retry_attempts
self.retry_base_delay_ms = retry_base_delay_ms
# S3 modeling parameters
self.s3_latency_shape = s3_latency_shape
self.s3_latency_scale = s3_latency_scale
self.s3_failure_rate = s3_failure_rate
# Arrival modeling
self.arrival_rate_per_sec = arrival_rate_per_sec
self.simulation_duration_sec = simulation_duration_sec
# Simulation state
self.current_time = 0.0
self.event_queue = [] # Priority queue of (time, event_type, event_data)
self.pending_commits = deque()
self.current_batch = []
self.batch_start_time = None # Will be set when first commit added to batch
self.in_flight_requests: Dict[int, InFlightRequest] = {}
self.next_batch_id = 0
self.next_connection_id = 0
self.next_commit_id = 0
# Metrics collection
self.completed_commits = []
self.batch_metrics = []
self.retry_counts = defaultdict(int)
self.queue_depth_samples = []
self.timeline_events = [] # For debugging/visualization
# Random number generators
self.rng = random.Random(42) # Reproducible results
self.np_rng = np.random.RandomState(42)
def sample_s3_latency(self, batch_size_bytes: int = 0) -> float:
"""
Sample S3 response latency using Gamma distribution with size-dependent scaling.
Gamma distribution is ideal for S3 latency because:
- Shape=2.0, Scale=15.0 gives variable latency around the mean
- Minimum 30ms RTT prevents unrealistic sub-network responses
- Right-skewed tail captures occasional slow responses
- Models the reality that most responses are fast but some are slow
Size-dependent scaling:
- Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean
- Linear scaling with size: +20ms per MB
- Large requests (1MB) average ~80ms with similar tail behavior
"""
# Minimum network RTT (realistic for cloud storage)
min_rtt = 30.0 # 30ms minimum round-trip time
# Variable latency component from Gamma distribution
variable_latency = self.np_rng.gamma(
self.s3_latency_shape, self.s3_latency_scale
)
# Size-dependent scaling: +20ms per MB
size_mb = batch_size_bytes / (1024 * 1024)
size_penalty = size_mb * 20.0 # 20ms per MB
return min_rtt + variable_latency + size_penalty
def sample_inter_arrival_time(self) -> float:
"""Sample time between commit arrivals using Poisson process"""
return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec)
def sample_commit_size(self) -> int:
"""
Sample commit size with realistic distribution including large commits.
Distribution:
- 70% small commits: 500B - 10KB (typical operations)
- 25% medium commits: 10KB - 100KB (batch operations, large documents)
- 5% large commits: 100KB - 1MB (bulk data, file uploads)
This creates a realistic mix where most commits are small but some
can trigger size-based batching or become single-commit batches.
"""
rand = self.rng.random()
if rand < 0.70: # 70% small commits
return self.rng.randint(500, 10 * 1024) # 500B - 10KB
elif rand < 0.95: # 25% medium commits
return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB
else: # 5% large commits
return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB
def schedule_event(self, time: float, event_type: str, data=None):
"""Add event to priority queue"""
heapq.heappush(self.event_queue, (time, event_type, data))
def should_process_batch(self) -> bool:
"""Check if current batch should be processed based on triggers"""
if not self.current_batch:
return False
# Size trigger
batch_size = sum(c.size_bytes for c in self.current_batch)
if batch_size >= self.batch_size_threshold:
return True
# Time trigger - only if we have a valid batch start time
if self.batch_start_time is not None:
if (self.current_time - self.batch_start_time) >= (
self.batch_timeout_ms / 1000.0
):
return True
return False
def can_start_new_request(self) -> bool:
"""Check if we can start a new request based on flow control"""
return len(self.in_flight_requests) < self.max_in_flight_requests
def process_current_batch(self):
"""Process the current batch if conditions are met"""
if not self.current_batch or not self.can_start_new_request():
return
if self.should_process_batch():
batch = Batch(
batch_id=self.next_batch_id,
commits=self.current_batch.copy(),
created_time=self.current_time,
)
self.next_batch_id += 1
# Clear current batch
self.current_batch.clear()
self.batch_start_time = None # Reset to None, will be set on next batch
self.send_batch_to_s3(batch)
def send_batch_to_s3(self, batch: Batch, is_retry: bool = False):
"""Send batch to S3 and track as in-flight request"""
if not self.can_start_new_request():
# This shouldn't happen due to flow control, but handle gracefully
self.schedule_event(self.current_time + 0.001, "retry_batch", batch)
return
# Sample S3 response characteristics (pass batch size for latency modeling)
s3_latency = (
self.sample_s3_latency(batch.size_bytes) / 1000.0
) # Convert ms to seconds
will_fail = self.rng.random() < self.s3_failure_rate
if will_fail:
s3_latency *= 2 # Failed requests typically take longer
completion_time = self.current_time + s3_latency
connection_id = self.next_connection_id
self.next_connection_id += 1
# Track in-flight request
in_flight = InFlightRequest(
batch=batch,
start_time=self.current_time,
expected_completion=completion_time,
connection_id=connection_id,
)
self.in_flight_requests[connection_id] = in_flight
# Schedule completion event
if will_fail:
self.schedule_event(completion_time, "batch_failed", connection_id)
else:
self.schedule_event(completion_time, "batch_completed", connection_id)
# Log event for analysis
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_sent",
"batch_id": batch.batch_id,
"batch_size": batch.size_bytes,
"commit_count": len(batch.commits),
"retry_count": batch.retry_count,
"is_retry": is_retry,
}
)
def handle_batch_completed(self, connection_id: int):
"""Handle successful batch completion"""
if connection_id not in self.in_flight_requests:
return
in_flight = self.in_flight_requests.pop(connection_id)
batch = in_flight.batch
# Calculate metrics
batch_latency = self.current_time - batch.created_time
self.batch_metrics.append(
{
"batch_id": batch.batch_id,
"latency": batch_latency,
"size_bytes": batch.size_bytes,
"commit_count": len(batch.commits),
"retry_count": batch.retry_count,
}
)
# Mark commits as completed and calculate end-to-end latency
for commit in batch.commits:
commit_latency = self.current_time - commit.arrival_time
self.completed_commits.append(
{
"commit_id": commit.commit_id,
"arrival_time": commit.arrival_time,
"completion_time": self.current_time,
"latency": commit_latency,
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_completed",
"batch_id": batch.batch_id,
"latency": batch_latency,
"retry_count": batch.retry_count,
}
)
# Try to process any pending work now that we have capacity
self.process_pending_work()
def handle_batch_failed(self, connection_id: int):
"""Handle batch failure with retry logic"""
if connection_id not in self.in_flight_requests:
return
in_flight = self.in_flight_requests.pop(connection_id)
batch = in_flight.batch
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_failed",
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
if batch.retry_count < self.max_retry_attempts:
# Exponential backoff retry
batch.retry_count += 1
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (
2 ** (batch.retry_count - 1)
)
retry_time = self.current_time + backoff_delay
self.schedule_event(retry_time, "retry_batch", batch)
self.retry_counts[batch.retry_count] += 1
else:
# Max retries exhausted - this would be a fatal error in real system
self.timeline_events.append(
{
"time": self.current_time,
"event": "batch_abandoned",
"batch_id": batch.batch_id,
"retry_count": batch.retry_count,
}
)
def handle_retry_batch(self, batch: Batch):
"""Handle batch retry"""
self.send_batch_to_s3(batch, is_retry=True)
def process_pending_work(self):
"""Process any pending commits that can now be batched"""
# Move pending commits to current batch
while self.pending_commits and self.can_start_new_request():
if not self.current_batch:
self.batch_start_time = self.current_time
commit = self.pending_commits.popleft()
self.current_batch.append(commit)
# Check if we should process this batch immediately
if self.should_process_batch():
self.process_current_batch()
break
# If we have commits but no in-flight capacity, they stay in current_batch
# and will be processed when capacity becomes available
def handle_commit_arrival(self, commit: Commit):
"""Handle new commit arrival"""
# If we have in-flight capacity, try to add to current batch
if self.can_start_new_request():
if not self.current_batch:
self.batch_start_time = self.current_time
self.current_batch.append(commit)
self.process_current_batch() # Check if we should process now
else:
# Add to pending queue due to flow control
self.pending_commits.append(commit)
# Schedule timeout event for current batch if it's the first commit
if len(self.current_batch) == 1 and not self.pending_commits:
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
self.schedule_event(timeout_time, "batch_timeout", None)
def handle_batch_timeout(self):
"""Handle batch timeout trigger"""
if self.current_batch and self.can_start_new_request():
self.process_current_batch()
def sample_queue_depth(self):
"""Sample current queue depths for analysis"""
pending_count = len(self.pending_commits)
current_batch_count = len(self.current_batch)
in_flight_count = len(self.in_flight_requests)
self.queue_depth_samples.append(
{
"time": self.current_time,
"pending_commits": pending_count,
"current_batch_size": current_batch_count,
"in_flight_requests": in_flight_count,
"total_unacknowledged": pending_count
+ current_batch_count
+ sum(
len(req.batch.commits) for req in self.in_flight_requests.values()
),
}
)
def generate_arrivals(self):
"""Generate Poisson arrival events for the simulation"""
current_time = 0.0
while current_time < self.simulation_duration_sec:
# Sample next arrival time
inter_arrival = self.sample_inter_arrival_time()
current_time += inter_arrival
if current_time >= self.simulation_duration_sec:
break
# Create commit
commit = Commit(
commit_id=self.next_commit_id,
arrival_time=current_time,
size_bytes=self.sample_commit_size(), # Realistic size distribution
)
self.next_commit_id += 1
# Schedule arrival event
self.schedule_event(current_time, "commit_arrival", commit)
# Schedule periodic queue depth sampling
sample_time = 0.0
while sample_time < self.simulation_duration_sec:
self.schedule_event(sample_time, "sample_queue_depth", None)
sample_time += 0.1 # Sample every 100ms
def run_simulation(self):
"""Run the complete simulation"""
print(f"Starting persistence simulation...")
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
print(f"Duration: {self.simulation_duration_sec} seconds")
print(
f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}"
)
print()
# Generate all arrival events
self.generate_arrivals()
# Process events in time order
events_processed = 0
while self.event_queue and events_processed < 1000000: # Safety limit
time, event_type, data = heapq.heappop(self.event_queue)
self.current_time = time
if time > self.simulation_duration_sec:
break
if event_type == "commit_arrival":
self.handle_commit_arrival(data)
elif event_type == "batch_completed":
self.handle_batch_completed(data)
elif event_type == "batch_failed":
self.handle_batch_failed(data)
elif event_type == "retry_batch":
self.handle_retry_batch(data)
elif event_type == "batch_timeout":
self.handle_batch_timeout()
elif event_type == "sample_queue_depth":
self.sample_queue_depth()
events_processed += 1
print(f"Simulation completed. Processed {events_processed} events.")
return self.analyze_results()
def analyze_results(self) -> Dict:
"""Analyze simulation results and return metrics"""
if not self.completed_commits:
return {"error": "No commits completed during simulation"}
# Calculate latency statistics
latencies = [
c["latency"] * 1000 for c in self.completed_commits
] # Convert to ms
results = {
"simulation_config": {
"duration_sec": self.simulation_duration_sec,
"arrival_rate_per_sec": self.arrival_rate_per_sec,
"batch_timeout_ms": self.batch_timeout_ms,
"batch_size_threshold": self.batch_size_threshold,
"max_in_flight_requests": self.max_in_flight_requests,
"s3_latency_params": f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
"s3_failure_rate": self.s3_failure_rate,
},
"commit_metrics": {
"total_commits": len(self.completed_commits),
"latency_ms": {
"mean": statistics.mean(latencies),
"median": statistics.median(latencies),
"std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
"min": min(latencies),
"max": max(latencies),
"p95": np.percentile(latencies, 95),
"p99": np.percentile(latencies, 99),
},
},
"batch_metrics": {
"total_batches": len(self.batch_metrics),
"avg_commits_per_batch": statistics.mean(
[b["commit_count"] for b in self.batch_metrics]
),
"avg_batch_size_bytes": statistics.mean(
[b["size_bytes"] for b in self.batch_metrics]
),
"avg_batch_latency_ms": statistics.mean(
[b["latency"] * 1000 for b in self.batch_metrics]
),
},
"retry_analysis": dict(self.retry_counts),
"queue_depth_analysis": self._analyze_queue_depths(),
}
return results
def _analyze_queue_depths(self) -> Dict:
"""Analyze queue depth patterns"""
if not self.queue_depth_samples:
return {}
pending = [s["pending_commits"] for s in self.queue_depth_samples]
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
return {
"pending_commits": {
"mean": statistics.mean(pending),
"max": max(pending),
"p95": np.percentile(pending, 95),
},
"in_flight_requests": {
"mean": statistics.mean(in_flight),
"max": max(in_flight),
"p95": np.percentile(in_flight, 95),
},
"total_unacknowledged": {
"mean": statistics.mean(total_unack),
"max": max(total_unack),
"p95": np.percentile(total_unack, 95),
},
}
def plot_results(self, results: Dict, save_path: Optional[str] = None):
"""Generate visualization plots of simulation results"""
if not self.completed_commits:
print("No data to plot")
return
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
fig.suptitle("Persistence Thread Simulation Results", fontsize=16)
# Plot 1: Commit latency histogram
latencies_ms = [c["latency"] * 1000 for c in self.completed_commits]
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor="black")
ax1.set_xlabel("Commit Latency (ms)")
ax1.set_ylabel("Count")
ax1.set_title("Commit Latency Distribution")
ax1.axvline(
results["commit_metrics"]["latency_ms"]["mean"],
color="red",
linestyle="--",
label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms",
)
ax1.axvline(
results["commit_metrics"]["latency_ms"]["p95"],
color="orange",
linestyle="--",
label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms",
)
ax1.legend()
# Plot 2: Timeline of commit completions
completion_times = [c["completion_time"] for c in self.completed_commits]
completion_latencies = [c["latency"] * 1000 for c in self.completed_commits]
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
ax2.set_xlabel("Time (seconds)")
ax2.set_ylabel("Commit Latency (ms)")
ax2.set_title("Latency Over Time")
# Plot 3: Queue depth over time
if self.queue_depth_samples:
times = [s["time"] for s in self.queue_depth_samples]
pending = [s["pending_commits"] for s in self.queue_depth_samples]
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
ax3.plot(times, pending, label="Pending Commits", alpha=0.8)
ax3.plot(times, in_flight, label="In-Flight Requests", alpha=0.8)
ax3.plot(times, total_unack, label="Total Unacknowledged", alpha=0.8)
ax3.axhline(
self.max_in_flight_requests,
color="red",
linestyle="--",
label=f"Max In-Flight Limit ({self.max_in_flight_requests})",
)
ax3.set_xlabel("Time (seconds)")
ax3.set_ylabel("Count")
ax3.set_title("Queue Depths Over Time")
ax3.legend()
# Plot 4: Batch size distribution
if self.batch_metrics:
batch_sizes = [b["commit_count"] for b in self.batch_metrics]
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor="black")
ax4.set_xlabel("Commits per Batch")
ax4.set_ylabel("Count")
ax4.set_title("Batch Size Distribution")
plt.tight_layout()
if save_path:
plt.savefig(save_path, dpi=300, bbox_inches="tight")
print(f"Plots saved to {save_path}")
else:
plt.show()
def print_results(results: Dict):
"""Pretty print simulation results"""
print("=" * 80)
print("PERSISTENCE THREAD SIMULATION RESULTS")
print("=" * 80)
# Configuration
config = results["simulation_config"]
print(f"\nConfiguration:")
print(f" Duration: {config['duration_sec']}s")
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
print(f" Batch Timeout: {config['batch_timeout_ms']}ms")
print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes")
print(f" Max In-Flight: {config['max_in_flight_requests']}")
print(f" S3 Latency: {config['s3_latency_params']}")
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
# Commit metrics
commit_metrics = results["commit_metrics"]
latency = commit_metrics["latency_ms"]
print(f"\nCommit Performance:")
print(f" Total Commits: {commit_metrics['total_commits']:,}")
print(f" Latency Mean: {latency['mean']:.2f}ms")
print(f" Latency Median: {latency['median']:.2f}ms")
print(f" Latency P95: {latency['p95']:.2f}ms")
print(f" Latency P99: {latency['p99']:.2f}ms")
print(f" Latency Std: {latency['std']:.2f}ms")
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
# Batch metrics
batch_metrics = results["batch_metrics"]
print(f"\nBatching Performance:")
print(f" Total Batches: {batch_metrics['total_batches']:,}")
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
# Retry analysis
if results["retry_analysis"]:
print(f"\nRetry Analysis:")
for retry_count, occurrences in results["retry_analysis"].items():
print(f" {occurrences:,} batches required {retry_count} retries")
# Queue depth analysis
if results["queue_depth_analysis"]:
queue_analysis = results["queue_depth_analysis"]
print(f"\nQueue Depth Analysis:")
if "pending_commits" in queue_analysis:
pending = queue_analysis["pending_commits"]
print(
f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}"
)
if "in_flight_requests" in queue_analysis:
in_flight = queue_analysis["in_flight_requests"]
print(
f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}"
)
if "total_unacknowledged" in queue_analysis:
total = queue_analysis["total_unacknowledged"]
print(
f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}"
)
if __name__ == "__main__":
print("Running Persistence Thread Configuration Analysis")
print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)")
print("Testing different configurations to optimize latency...")
print()
# Test configurations with different max_in_flight values
configs = [
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
{
"name": "Lower Timeout (max_in_flight=10, timeout=2ms)",
"max_in_flight_requests": 10,
"batch_timeout_ms": 2.0,
},
{
"name": "Higher Timeout (max_in_flight=10, timeout=10ms)",
"max_in_flight_requests": 10,
"batch_timeout_ms": 10.0,
},
]
results_comparison = []
for config in configs:
print(f"\n{'='*60}")
print(f"Testing: {config['name']}")
print(f"{'='*60}")
sim = PersistenceSimulation(
arrival_rate_per_sec=1000.0,
simulation_duration_sec=30.0,
s3_latency_shape=2.0,
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=config.get("max_in_flight_requests", 5),
batch_timeout_ms=config.get("batch_timeout_ms", 5.0),
)
results = sim.run_simulation()
results["config_name"] = config["name"]
results_comparison.append(results)
# Print key metrics for quick comparison
commit_metrics = results["commit_metrics"]
batch_metrics = results["batch_metrics"]
queue_metrics = results.get("queue_depth_analysis", {})
print(f"\nKey Metrics:")
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms")
print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms")
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
if queue_metrics:
print(
f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}"
)
print(
f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}"
)
# Summary comparison
print(f"\n{'='*80}")
print("CONFIGURATION COMPARISON SUMMARY")
print(f"{'='*80}")
print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}")
print(f"{'-'*80}")
for result in results_comparison:
name = result["config_name"]
commit_metrics = result["commit_metrics"]
queue_metrics = result.get("queue_depth_analysis", {})
mean_lat = commit_metrics["latency_ms"]["mean"]
p95_lat = commit_metrics["latency_ms"]["p95"]
p99_lat = commit_metrics["latency_ms"]["p99"]
avg_queue = queue_metrics.get("total_unacknowledged", {}).get("mean", 0)
print(
f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}"
)
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
print(
f"Note: Higher in-flight allows more parallelism but may increase queue variability"
)
# Generate plots for best configuration
best_config = min(
results_comparison, key=lambda r: r["commit_metrics"]["latency_ms"]["p95"]
)
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
try:
# Re-run best config to get simulation object for plotting
best_params = next(
c for c in configs if c["name"] == best_config["config_name"]
)
sim_best = PersistenceSimulation(
arrival_rate_per_sec=1000.0,
simulation_duration_sec=30.0,
s3_latency_shape=2.0,
s3_latency_scale=25.0,
s3_failure_rate=0.01,
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0),
)
sim_best.run_simulation()
sim_best.plot_results(best_config, f"persistence_optimization_results.png")
except Exception as e:
print(f"\nCould not generate plots: {e}")
print("Install matplotlib and numpy to enable visualization")