824 lines
31 KiB
Python
824 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Persistence Thread Simulation
|
||
|
||
Simulates the persistence thread design from persistence.md to analyze
|
||
commit latency distributions with Poisson arrival times and realistic
|
||
S3 response characteristics.
|
||
|
||
Key metrics tracked:
|
||
- End-to-end commit latency (arrival to acknowledgment)
|
||
- Batch processing latencies
|
||
- Queue depths and flow control behavior
|
||
- Retry patterns and failure handling
|
||
"""
|
||
|
||
import heapq
|
||
import random
|
||
import statistics
|
||
from dataclasses import dataclass, field
|
||
from typing import List, Optional, Dict, Tuple
|
||
import numpy as np
|
||
import matplotlib.pyplot as plt
|
||
from collections import defaultdict, deque
|
||
import time
|
||
|
||
|
||
@dataclass
|
||
class Commit:
|
||
"""Represents a single commit request"""
|
||
|
||
commit_id: int
|
||
arrival_time: float
|
||
size_bytes: int = 1024 # Default 1KB per commit
|
||
|
||
|
||
@dataclass
|
||
class Batch:
|
||
"""Represents a batch of commits being processed"""
|
||
|
||
batch_id: int
|
||
commits: List[Commit]
|
||
created_time: float
|
||
size_bytes: int = field(init=False)
|
||
retry_count: int = 0
|
||
|
||
def __post_init__(self):
|
||
self.size_bytes = sum(c.size_bytes for c in self.commits)
|
||
|
||
|
||
@dataclass
|
||
class InFlightRequest:
|
||
"""Tracks an in-flight S3 request"""
|
||
|
||
batch: Batch
|
||
start_time: float
|
||
expected_completion: float
|
||
connection_id: int
|
||
|
||
|
||
class PersistenceSimulation:
|
||
"""
|
||
Simulates the persistence thread behavior described in persistence.md
|
||
|
||
For S3 latency, we use a Gamma distribution which is recommended for
|
||
modeling network service response times because:
|
||
- It has a natural lower bound (minimum network RTT)
|
||
- It can model the right-skewed tail typical of network services
|
||
- It captures both typical fast responses and occasional slow responses
|
||
- Shape parameter controls the heaviness of the tail
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
# Configuration from persistence.md defaults
|
||
batch_timeout_ms: float = 5.0,
|
||
batch_size_threshold: int = 1024 * 1024, # 1MB
|
||
max_in_flight_requests: int = 5,
|
||
max_retry_attempts: int = 3,
|
||
retry_base_delay_ms: float = 100.0,
|
||
# S3 latency modeling (Gamma distribution parameters)
|
||
s3_latency_shape: float = 2.0, # Shape parameter (α)
|
||
s3_latency_scale: float = 25.0, # Scale parameter (β)
|
||
s3_failure_rate: float = 0.01, # 1% failure rate
|
||
# Arrival rate modeling
|
||
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
|
||
# Simulation parameters
|
||
simulation_duration_sec: float = 60.0,
|
||
):
|
||
|
||
# Configuration
|
||
self.batch_timeout_ms = batch_timeout_ms
|
||
self.batch_size_threshold = batch_size_threshold
|
||
self.max_in_flight_requests = max_in_flight_requests
|
||
self.max_retry_attempts = max_retry_attempts
|
||
self.retry_base_delay_ms = retry_base_delay_ms
|
||
|
||
# S3 modeling parameters
|
||
self.s3_latency_shape = s3_latency_shape
|
||
self.s3_latency_scale = s3_latency_scale
|
||
self.s3_failure_rate = s3_failure_rate
|
||
|
||
# Arrival modeling
|
||
self.arrival_rate_per_sec = arrival_rate_per_sec
|
||
self.simulation_duration_sec = simulation_duration_sec
|
||
|
||
# Simulation state
|
||
self.current_time = 0.0
|
||
self.event_queue = [] # Priority queue of (time, event_type, event_data)
|
||
self.pending_commits = deque()
|
||
self.current_batch = []
|
||
self.batch_start_time = None # Will be set when first commit added to batch
|
||
self.in_flight_requests: Dict[int, InFlightRequest] = {}
|
||
self.next_batch_id = 0
|
||
self.next_connection_id = 0
|
||
self.next_commit_id = 0
|
||
|
||
# Metrics collection
|
||
self.completed_commits = []
|
||
self.batch_metrics = []
|
||
self.retry_counts = defaultdict(int)
|
||
self.queue_depth_samples = []
|
||
self.timeline_events = [] # For debugging/visualization
|
||
|
||
# Random number generators
|
||
self.rng = random.Random(42) # Reproducible results
|
||
self.np_rng = np.random.RandomState(42)
|
||
|
||
def sample_s3_latency(self, batch_size_bytes: int = 0) -> float:
|
||
"""
|
||
Sample S3 response latency using Gamma distribution with size-dependent scaling.
|
||
|
||
Gamma distribution is ideal for S3 latency because:
|
||
- Shape=2.0, Scale=15.0 gives variable latency around the mean
|
||
- Minimum 30ms RTT prevents unrealistic sub-network responses
|
||
- Right-skewed tail captures occasional slow responses
|
||
- Models the reality that most responses are fast but some are slow
|
||
|
||
Size-dependent scaling:
|
||
- Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean
|
||
- Linear scaling with size: +20ms per MB
|
||
- Large requests (1MB) average ~80ms with similar tail behavior
|
||
"""
|
||
# Minimum network RTT (realistic for cloud storage)
|
||
min_rtt = 30.0 # 30ms minimum round-trip time
|
||
|
||
# Variable latency component from Gamma distribution
|
||
variable_latency = self.np_rng.gamma(
|
||
self.s3_latency_shape, self.s3_latency_scale
|
||
)
|
||
|
||
# Size-dependent scaling: +20ms per MB
|
||
size_mb = batch_size_bytes / (1024 * 1024)
|
||
size_penalty = size_mb * 20.0 # 20ms per MB
|
||
|
||
return min_rtt + variable_latency + size_penalty
|
||
|
||
def sample_inter_arrival_time(self) -> float:
|
||
"""Sample time between commit arrivals using Poisson process"""
|
||
return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec)
|
||
|
||
def sample_commit_size(self) -> int:
|
||
"""
|
||
Sample commit size with realistic distribution including large commits.
|
||
|
||
Distribution:
|
||
- 70% small commits: 500B - 10KB (typical operations)
|
||
- 25% medium commits: 10KB - 100KB (batch operations, large documents)
|
||
- 5% large commits: 100KB - 1MB (bulk data, file uploads)
|
||
|
||
This creates a realistic mix where most commits are small but some
|
||
can trigger size-based batching or become single-commit batches.
|
||
"""
|
||
rand = self.rng.random()
|
||
|
||
if rand < 0.70: # 70% small commits
|
||
return self.rng.randint(500, 10 * 1024) # 500B - 10KB
|
||
elif rand < 0.95: # 25% medium commits
|
||
return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB
|
||
else: # 5% large commits
|
||
return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB
|
||
|
||
def schedule_event(self, time: float, event_type: str, data=None):
|
||
"""Add event to priority queue"""
|
||
heapq.heappush(self.event_queue, (time, event_type, data))
|
||
|
||
def should_process_batch(self) -> bool:
|
||
"""Check if current batch should be processed based on triggers"""
|
||
if not self.current_batch:
|
||
return False
|
||
|
||
# Size trigger
|
||
batch_size = sum(c.size_bytes for c in self.current_batch)
|
||
if batch_size >= self.batch_size_threshold:
|
||
return True
|
||
|
||
# Time trigger - only if we have a valid batch start time
|
||
if self.batch_start_time is not None:
|
||
if (self.current_time - self.batch_start_time) >= (
|
||
self.batch_timeout_ms / 1000.0
|
||
):
|
||
return True
|
||
|
||
return False
|
||
|
||
def can_start_new_request(self) -> bool:
|
||
"""Check if we can start a new request based on flow control"""
|
||
return len(self.in_flight_requests) < self.max_in_flight_requests
|
||
|
||
def process_current_batch(self):
|
||
"""Process the current batch if conditions are met"""
|
||
if not self.current_batch or not self.can_start_new_request():
|
||
return
|
||
|
||
if self.should_process_batch():
|
||
batch = Batch(
|
||
batch_id=self.next_batch_id,
|
||
commits=self.current_batch.copy(),
|
||
created_time=self.current_time,
|
||
)
|
||
self.next_batch_id += 1
|
||
|
||
# Clear current batch
|
||
self.current_batch.clear()
|
||
self.batch_start_time = None # Reset to None, will be set on next batch
|
||
|
||
self.send_batch_to_s3(batch)
|
||
|
||
def send_batch_to_s3(self, batch: Batch, is_retry: bool = False):
|
||
"""Send batch to S3 and track as in-flight request"""
|
||
if not self.can_start_new_request():
|
||
# This shouldn't happen due to flow control, but handle gracefully
|
||
self.schedule_event(self.current_time + 0.001, "retry_batch", batch)
|
||
return
|
||
|
||
# Sample S3 response characteristics (pass batch size for latency modeling)
|
||
s3_latency = (
|
||
self.sample_s3_latency(batch.size_bytes) / 1000.0
|
||
) # Convert ms to seconds
|
||
will_fail = self.rng.random() < self.s3_failure_rate
|
||
|
||
if will_fail:
|
||
s3_latency *= 2 # Failed requests typically take longer
|
||
|
||
completion_time = self.current_time + s3_latency
|
||
connection_id = self.next_connection_id
|
||
self.next_connection_id += 1
|
||
|
||
# Track in-flight request
|
||
in_flight = InFlightRequest(
|
||
batch=batch,
|
||
start_time=self.current_time,
|
||
expected_completion=completion_time,
|
||
connection_id=connection_id,
|
||
)
|
||
self.in_flight_requests[connection_id] = in_flight
|
||
|
||
# Schedule completion event
|
||
if will_fail:
|
||
self.schedule_event(completion_time, "batch_failed", connection_id)
|
||
else:
|
||
self.schedule_event(completion_time, "batch_completed", connection_id)
|
||
|
||
# Log event for analysis
|
||
self.timeline_events.append(
|
||
{
|
||
"time": self.current_time,
|
||
"event": "batch_sent",
|
||
"batch_id": batch.batch_id,
|
||
"batch_size": batch.size_bytes,
|
||
"commit_count": len(batch.commits),
|
||
"retry_count": batch.retry_count,
|
||
"is_retry": is_retry,
|
||
}
|
||
)
|
||
|
||
def handle_batch_completed(self, connection_id: int):
|
||
"""Handle successful batch completion"""
|
||
if connection_id not in self.in_flight_requests:
|
||
return
|
||
|
||
in_flight = self.in_flight_requests.pop(connection_id)
|
||
batch = in_flight.batch
|
||
|
||
# Calculate metrics
|
||
batch_latency = self.current_time - batch.created_time
|
||
self.batch_metrics.append(
|
||
{
|
||
"batch_id": batch.batch_id,
|
||
"latency": batch_latency,
|
||
"size_bytes": batch.size_bytes,
|
||
"commit_count": len(batch.commits),
|
||
"retry_count": batch.retry_count,
|
||
}
|
||
)
|
||
|
||
# Mark commits as completed and calculate end-to-end latency
|
||
for commit in batch.commits:
|
||
commit_latency = self.current_time - commit.arrival_time
|
||
self.completed_commits.append(
|
||
{
|
||
"commit_id": commit.commit_id,
|
||
"arrival_time": commit.arrival_time,
|
||
"completion_time": self.current_time,
|
||
"latency": commit_latency,
|
||
"batch_id": batch.batch_id,
|
||
"retry_count": batch.retry_count,
|
||
}
|
||
)
|
||
|
||
self.timeline_events.append(
|
||
{
|
||
"time": self.current_time,
|
||
"event": "batch_completed",
|
||
"batch_id": batch.batch_id,
|
||
"latency": batch_latency,
|
||
"retry_count": batch.retry_count,
|
||
}
|
||
)
|
||
|
||
# Try to process any pending work now that we have capacity
|
||
self.process_pending_work()
|
||
|
||
def handle_batch_failed(self, connection_id: int):
|
||
"""Handle batch failure with retry logic"""
|
||
if connection_id not in self.in_flight_requests:
|
||
return
|
||
|
||
in_flight = self.in_flight_requests.pop(connection_id)
|
||
batch = in_flight.batch
|
||
|
||
self.timeline_events.append(
|
||
{
|
||
"time": self.current_time,
|
||
"event": "batch_failed",
|
||
"batch_id": batch.batch_id,
|
||
"retry_count": batch.retry_count,
|
||
}
|
||
)
|
||
|
||
if batch.retry_count < self.max_retry_attempts:
|
||
# Exponential backoff retry
|
||
batch.retry_count += 1
|
||
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (
|
||
2 ** (batch.retry_count - 1)
|
||
)
|
||
retry_time = self.current_time + backoff_delay
|
||
|
||
self.schedule_event(retry_time, "retry_batch", batch)
|
||
self.retry_counts[batch.retry_count] += 1
|
||
else:
|
||
# Max retries exhausted - this would be a fatal error in real system
|
||
self.timeline_events.append(
|
||
{
|
||
"time": self.current_time,
|
||
"event": "batch_abandoned",
|
||
"batch_id": batch.batch_id,
|
||
"retry_count": batch.retry_count,
|
||
}
|
||
)
|
||
|
||
def handle_retry_batch(self, batch: Batch):
|
||
"""Handle batch retry"""
|
||
self.send_batch_to_s3(batch, is_retry=True)
|
||
|
||
def process_pending_work(self):
|
||
"""Process any pending commits that can now be batched"""
|
||
# Move pending commits to current batch
|
||
while self.pending_commits and self.can_start_new_request():
|
||
if not self.current_batch:
|
||
self.batch_start_time = self.current_time
|
||
|
||
commit = self.pending_commits.popleft()
|
||
self.current_batch.append(commit)
|
||
|
||
# Check if we should process this batch immediately
|
||
if self.should_process_batch():
|
||
self.process_current_batch()
|
||
break
|
||
|
||
# If we have commits but no in-flight capacity, they stay in current_batch
|
||
# and will be processed when capacity becomes available
|
||
|
||
def handle_commit_arrival(self, commit: Commit):
|
||
"""Handle new commit arrival"""
|
||
# If we have in-flight capacity, try to add to current batch
|
||
if self.can_start_new_request():
|
||
if not self.current_batch:
|
||
self.batch_start_time = self.current_time
|
||
|
||
self.current_batch.append(commit)
|
||
self.process_current_batch() # Check if we should process now
|
||
else:
|
||
# Add to pending queue due to flow control
|
||
self.pending_commits.append(commit)
|
||
|
||
# Schedule timeout event for current batch if it's the first commit
|
||
if len(self.current_batch) == 1 and not self.pending_commits:
|
||
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
|
||
self.schedule_event(timeout_time, "batch_timeout", None)
|
||
|
||
def handle_batch_timeout(self):
|
||
"""Handle batch timeout trigger"""
|
||
if self.current_batch and self.can_start_new_request():
|
||
self.process_current_batch()
|
||
|
||
def sample_queue_depth(self):
|
||
"""Sample current queue depths for analysis"""
|
||
pending_count = len(self.pending_commits)
|
||
current_batch_count = len(self.current_batch)
|
||
in_flight_count = len(self.in_flight_requests)
|
||
|
||
self.queue_depth_samples.append(
|
||
{
|
||
"time": self.current_time,
|
||
"pending_commits": pending_count,
|
||
"current_batch_size": current_batch_count,
|
||
"in_flight_requests": in_flight_count,
|
||
"total_unacknowledged": pending_count
|
||
+ current_batch_count
|
||
+ sum(
|
||
len(req.batch.commits) for req in self.in_flight_requests.values()
|
||
),
|
||
}
|
||
)
|
||
|
||
def generate_arrivals(self):
|
||
"""Generate Poisson arrival events for the simulation"""
|
||
current_time = 0.0
|
||
|
||
while current_time < self.simulation_duration_sec:
|
||
# Sample next arrival time
|
||
inter_arrival = self.sample_inter_arrival_time()
|
||
current_time += inter_arrival
|
||
|
||
if current_time >= self.simulation_duration_sec:
|
||
break
|
||
|
||
# Create commit
|
||
commit = Commit(
|
||
commit_id=self.next_commit_id,
|
||
arrival_time=current_time,
|
||
size_bytes=self.sample_commit_size(), # Realistic size distribution
|
||
)
|
||
self.next_commit_id += 1
|
||
|
||
# Schedule arrival event
|
||
self.schedule_event(current_time, "commit_arrival", commit)
|
||
|
||
# Schedule periodic queue depth sampling
|
||
sample_time = 0.0
|
||
while sample_time < self.simulation_duration_sec:
|
||
self.schedule_event(sample_time, "sample_queue_depth", None)
|
||
sample_time += 0.1 # Sample every 100ms
|
||
|
||
def run_simulation(self):
|
||
"""Run the complete simulation"""
|
||
print(f"Starting persistence simulation...")
|
||
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
|
||
print(f"Duration: {self.simulation_duration_sec} seconds")
|
||
print(
|
||
f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}"
|
||
)
|
||
print()
|
||
|
||
# Generate all arrival events
|
||
self.generate_arrivals()
|
||
|
||
# Process events in time order
|
||
events_processed = 0
|
||
while self.event_queue and events_processed < 1000000: # Safety limit
|
||
time, event_type, data = heapq.heappop(self.event_queue)
|
||
self.current_time = time
|
||
|
||
if time > self.simulation_duration_sec:
|
||
break
|
||
|
||
if event_type == "commit_arrival":
|
||
self.handle_commit_arrival(data)
|
||
elif event_type == "batch_completed":
|
||
self.handle_batch_completed(data)
|
||
elif event_type == "batch_failed":
|
||
self.handle_batch_failed(data)
|
||
elif event_type == "retry_batch":
|
||
self.handle_retry_batch(data)
|
||
elif event_type == "batch_timeout":
|
||
self.handle_batch_timeout()
|
||
elif event_type == "sample_queue_depth":
|
||
self.sample_queue_depth()
|
||
|
||
events_processed += 1
|
||
|
||
print(f"Simulation completed. Processed {events_processed} events.")
|
||
return self.analyze_results()
|
||
|
||
def analyze_results(self) -> Dict:
|
||
"""Analyze simulation results and return metrics"""
|
||
if not self.completed_commits:
|
||
return {"error": "No commits completed during simulation"}
|
||
|
||
# Calculate latency statistics
|
||
latencies = [
|
||
c["latency"] * 1000 for c in self.completed_commits
|
||
] # Convert to ms
|
||
|
||
results = {
|
||
"simulation_config": {
|
||
"duration_sec": self.simulation_duration_sec,
|
||
"arrival_rate_per_sec": self.arrival_rate_per_sec,
|
||
"batch_timeout_ms": self.batch_timeout_ms,
|
||
"batch_size_threshold": self.batch_size_threshold,
|
||
"max_in_flight_requests": self.max_in_flight_requests,
|
||
"s3_latency_params": f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
|
||
"s3_failure_rate": self.s3_failure_rate,
|
||
},
|
||
"commit_metrics": {
|
||
"total_commits": len(self.completed_commits),
|
||
"latency_ms": {
|
||
"mean": statistics.mean(latencies),
|
||
"median": statistics.median(latencies),
|
||
"std": statistics.stdev(latencies) if len(latencies) > 1 else 0,
|
||
"min": min(latencies),
|
||
"max": max(latencies),
|
||
"p95": np.percentile(latencies, 95),
|
||
"p99": np.percentile(latencies, 99),
|
||
},
|
||
},
|
||
"batch_metrics": {
|
||
"total_batches": len(self.batch_metrics),
|
||
"avg_commits_per_batch": statistics.mean(
|
||
[b["commit_count"] for b in self.batch_metrics]
|
||
),
|
||
"avg_batch_size_bytes": statistics.mean(
|
||
[b["size_bytes"] for b in self.batch_metrics]
|
||
),
|
||
"avg_batch_latency_ms": statistics.mean(
|
||
[b["latency"] * 1000 for b in self.batch_metrics]
|
||
),
|
||
},
|
||
"retry_analysis": dict(self.retry_counts),
|
||
"queue_depth_analysis": self._analyze_queue_depths(),
|
||
}
|
||
|
||
return results
|
||
|
||
def _analyze_queue_depths(self) -> Dict:
|
||
"""Analyze queue depth patterns"""
|
||
if not self.queue_depth_samples:
|
||
return {}
|
||
|
||
pending = [s["pending_commits"] for s in self.queue_depth_samples]
|
||
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
|
||
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
|
||
|
||
return {
|
||
"pending_commits": {
|
||
"mean": statistics.mean(pending),
|
||
"max": max(pending),
|
||
"p95": np.percentile(pending, 95),
|
||
},
|
||
"in_flight_requests": {
|
||
"mean": statistics.mean(in_flight),
|
||
"max": max(in_flight),
|
||
"p95": np.percentile(in_flight, 95),
|
||
},
|
||
"total_unacknowledged": {
|
||
"mean": statistics.mean(total_unack),
|
||
"max": max(total_unack),
|
||
"p95": np.percentile(total_unack, 95),
|
||
},
|
||
}
|
||
|
||
def plot_results(self, results: Dict, save_path: Optional[str] = None):
|
||
"""Generate visualization plots of simulation results"""
|
||
if not self.completed_commits:
|
||
print("No data to plot")
|
||
return
|
||
|
||
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||
fig.suptitle("Persistence Thread Simulation Results", fontsize=16)
|
||
|
||
# Plot 1: Commit latency histogram
|
||
latencies_ms = [c["latency"] * 1000 for c in self.completed_commits]
|
||
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor="black")
|
||
ax1.set_xlabel("Commit Latency (ms)")
|
||
ax1.set_ylabel("Count")
|
||
ax1.set_title("Commit Latency Distribution")
|
||
ax1.axvline(
|
||
results["commit_metrics"]["latency_ms"]["mean"],
|
||
color="red",
|
||
linestyle="--",
|
||
label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms",
|
||
)
|
||
ax1.axvline(
|
||
results["commit_metrics"]["latency_ms"]["p95"],
|
||
color="orange",
|
||
linestyle="--",
|
||
label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms",
|
||
)
|
||
ax1.legend()
|
||
|
||
# Plot 2: Timeline of commit completions
|
||
completion_times = [c["completion_time"] for c in self.completed_commits]
|
||
completion_latencies = [c["latency"] * 1000 for c in self.completed_commits]
|
||
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
|
||
ax2.set_xlabel("Time (seconds)")
|
||
ax2.set_ylabel("Commit Latency (ms)")
|
||
ax2.set_title("Latency Over Time")
|
||
|
||
# Plot 3: Queue depth over time
|
||
if self.queue_depth_samples:
|
||
times = [s["time"] for s in self.queue_depth_samples]
|
||
pending = [s["pending_commits"] for s in self.queue_depth_samples]
|
||
in_flight = [s["in_flight_requests"] for s in self.queue_depth_samples]
|
||
total_unack = [s["total_unacknowledged"] for s in self.queue_depth_samples]
|
||
|
||
ax3.plot(times, pending, label="Pending Commits", alpha=0.8)
|
||
ax3.plot(times, in_flight, label="In-Flight Requests", alpha=0.8)
|
||
ax3.plot(times, total_unack, label="Total Unacknowledged", alpha=0.8)
|
||
ax3.axhline(
|
||
self.max_in_flight_requests,
|
||
color="red",
|
||
linestyle="--",
|
||
label=f"Max In-Flight Limit ({self.max_in_flight_requests})",
|
||
)
|
||
ax3.set_xlabel("Time (seconds)")
|
||
ax3.set_ylabel("Count")
|
||
ax3.set_title("Queue Depths Over Time")
|
||
ax3.legend()
|
||
|
||
# Plot 4: Batch size distribution
|
||
if self.batch_metrics:
|
||
batch_sizes = [b["commit_count"] for b in self.batch_metrics]
|
||
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor="black")
|
||
ax4.set_xlabel("Commits per Batch")
|
||
ax4.set_ylabel("Count")
|
||
ax4.set_title("Batch Size Distribution")
|
||
|
||
plt.tight_layout()
|
||
|
||
if save_path:
|
||
plt.savefig(save_path, dpi=300, bbox_inches="tight")
|
||
print(f"Plots saved to {save_path}")
|
||
else:
|
||
plt.show()
|
||
|
||
|
||
def print_results(results: Dict):
|
||
"""Pretty print simulation results"""
|
||
print("=" * 80)
|
||
print("PERSISTENCE THREAD SIMULATION RESULTS")
|
||
print("=" * 80)
|
||
|
||
# Configuration
|
||
config = results["simulation_config"]
|
||
print(f"\nConfiguration:")
|
||
print(f" Duration: {config['duration_sec']}s")
|
||
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
|
||
print(f" Batch Timeout: {config['batch_timeout_ms']}ms")
|
||
print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes")
|
||
print(f" Max In-Flight: {config['max_in_flight_requests']}")
|
||
print(f" S3 Latency: {config['s3_latency_params']}")
|
||
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
|
||
|
||
# Commit metrics
|
||
commit_metrics = results["commit_metrics"]
|
||
latency = commit_metrics["latency_ms"]
|
||
print(f"\nCommit Performance:")
|
||
print(f" Total Commits: {commit_metrics['total_commits']:,}")
|
||
print(f" Latency Mean: {latency['mean']:.2f}ms")
|
||
print(f" Latency Median: {latency['median']:.2f}ms")
|
||
print(f" Latency P95: {latency['p95']:.2f}ms")
|
||
print(f" Latency P99: {latency['p99']:.2f}ms")
|
||
print(f" Latency Std: {latency['std']:.2f}ms")
|
||
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
|
||
|
||
# Batch metrics
|
||
batch_metrics = results["batch_metrics"]
|
||
print(f"\nBatching Performance:")
|
||
print(f" Total Batches: {batch_metrics['total_batches']:,}")
|
||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
|
||
|
||
# Retry analysis
|
||
if results["retry_analysis"]:
|
||
print(f"\nRetry Analysis:")
|
||
for retry_count, occurrences in results["retry_analysis"].items():
|
||
print(f" {occurrences:,} batches required {retry_count} retries")
|
||
|
||
# Queue depth analysis
|
||
if results["queue_depth_analysis"]:
|
||
queue_analysis = results["queue_depth_analysis"]
|
||
print(f"\nQueue Depth Analysis:")
|
||
if "pending_commits" in queue_analysis:
|
||
pending = queue_analysis["pending_commits"]
|
||
print(
|
||
f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}"
|
||
)
|
||
if "in_flight_requests" in queue_analysis:
|
||
in_flight = queue_analysis["in_flight_requests"]
|
||
print(
|
||
f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}"
|
||
)
|
||
if "total_unacknowledged" in queue_analysis:
|
||
total = queue_analysis["total_unacknowledged"]
|
||
print(
|
||
f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}"
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
print("Running Persistence Thread Configuration Analysis")
|
||
print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)")
|
||
print("Testing different configurations to optimize latency...")
|
||
print()
|
||
|
||
# Test configurations with different max_in_flight values
|
||
configs = [
|
||
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
|
||
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
|
||
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
|
||
{
|
||
"name": "Lower Timeout (max_in_flight=10, timeout=2ms)",
|
||
"max_in_flight_requests": 10,
|
||
"batch_timeout_ms": 2.0,
|
||
},
|
||
{
|
||
"name": "Higher Timeout (max_in_flight=10, timeout=10ms)",
|
||
"max_in_flight_requests": 10,
|
||
"batch_timeout_ms": 10.0,
|
||
},
|
||
]
|
||
|
||
results_comparison = []
|
||
|
||
for config in configs:
|
||
print(f"\n{'='*60}")
|
||
print(f"Testing: {config['name']}")
|
||
print(f"{'='*60}")
|
||
|
||
sim = PersistenceSimulation(
|
||
arrival_rate_per_sec=1000.0,
|
||
simulation_duration_sec=30.0,
|
||
s3_latency_shape=2.0,
|
||
s3_latency_scale=25.0,
|
||
s3_failure_rate=0.01,
|
||
max_in_flight_requests=config.get("max_in_flight_requests", 5),
|
||
batch_timeout_ms=config.get("batch_timeout_ms", 5.0),
|
||
)
|
||
|
||
results = sim.run_simulation()
|
||
results["config_name"] = config["name"]
|
||
results_comparison.append(results)
|
||
|
||
# Print key metrics for quick comparison
|
||
commit_metrics = results["commit_metrics"]
|
||
batch_metrics = results["batch_metrics"]
|
||
queue_metrics = results.get("queue_depth_analysis", {})
|
||
|
||
print(f"\nKey Metrics:")
|
||
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
|
||
print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms")
|
||
print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms")
|
||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||
if queue_metrics:
|
||
print(
|
||
f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}"
|
||
)
|
||
print(
|
||
f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}"
|
||
)
|
||
|
||
# Summary comparison
|
||
print(f"\n{'='*80}")
|
||
print("CONFIGURATION COMPARISON SUMMARY")
|
||
print(f"{'='*80}")
|
||
print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}")
|
||
print(f"{'-'*80}")
|
||
|
||
for result in results_comparison:
|
||
name = result["config_name"]
|
||
commit_metrics = result["commit_metrics"]
|
||
queue_metrics = result.get("queue_depth_analysis", {})
|
||
mean_lat = commit_metrics["latency_ms"]["mean"]
|
||
p95_lat = commit_metrics["latency_ms"]["p95"]
|
||
p99_lat = commit_metrics["latency_ms"]["p99"]
|
||
avg_queue = queue_metrics.get("total_unacknowledged", {}).get("mean", 0)
|
||
|
||
print(
|
||
f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}"
|
||
)
|
||
|
||
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
|
||
print(
|
||
f"Note: Higher in-flight allows more parallelism but may increase queue variability"
|
||
)
|
||
|
||
# Generate plots for best configuration
|
||
best_config = min(
|
||
results_comparison, key=lambda r: r["commit_metrics"]["latency_ms"]["p95"]
|
||
)
|
||
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
|
||
|
||
try:
|
||
# Re-run best config to get simulation object for plotting
|
||
best_params = next(
|
||
c for c in configs if c["name"] == best_config["config_name"]
|
||
)
|
||
sim_best = PersistenceSimulation(
|
||
arrival_rate_per_sec=1000.0,
|
||
simulation_duration_sec=30.0,
|
||
s3_latency_shape=2.0,
|
||
s3_latency_scale=25.0,
|
||
s3_failure_rate=0.01,
|
||
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
|
||
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0),
|
||
)
|
||
sim_best.run_simulation()
|
||
sim_best.plot_results(best_config, f"persistence_optimization_results.png")
|
||
except Exception as e:
|
||
print(f"\nCould not generate plots: {e}")
|
||
print("Install matplotlib and numpy to enable visualization")
|