Add latency sim. Not reviewed
This commit is contained in:
748
latency_sim/persistence_simulation.py
Normal file
748
latency_sim/persistence_simulation.py
Normal file
@@ -0,0 +1,748 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Persistence Thread Simulation
|
||||
|
||||
Simulates the persistence thread design from persistence.md to analyze
|
||||
commit latency distributions with Poisson arrival times and realistic
|
||||
S3 response characteristics.
|
||||
|
||||
Key metrics tracked:
|
||||
- End-to-end commit latency (arrival to acknowledgment)
|
||||
- Batch processing latencies
|
||||
- Queue depths and flow control behavior
|
||||
- Retry patterns and failure handling
|
||||
"""
|
||||
|
||||
import heapq
|
||||
import random
|
||||
import statistics
|
||||
from dataclasses import dataclass, field
|
||||
from typing import List, Optional, Dict, Tuple
|
||||
import numpy as np
|
||||
import matplotlib.pyplot as plt
|
||||
from collections import defaultdict, deque
|
||||
import time
|
||||
|
||||
|
||||
@dataclass
|
||||
class Commit:
|
||||
"""Represents a single commit request"""
|
||||
commit_id: int
|
||||
arrival_time: float
|
||||
size_bytes: int = 1024 # Default 1KB per commit
|
||||
|
||||
|
||||
@dataclass
|
||||
class Batch:
|
||||
"""Represents a batch of commits being processed"""
|
||||
batch_id: int
|
||||
commits: List[Commit]
|
||||
created_time: float
|
||||
size_bytes: int = field(init=False)
|
||||
retry_count: int = 0
|
||||
|
||||
def __post_init__(self):
|
||||
self.size_bytes = sum(c.size_bytes for c in self.commits)
|
||||
|
||||
|
||||
@dataclass
|
||||
class InFlightRequest:
|
||||
"""Tracks an in-flight S3 request"""
|
||||
batch: Batch
|
||||
start_time: float
|
||||
expected_completion: float
|
||||
connection_id: int
|
||||
|
||||
|
||||
class PersistenceSimulation:
|
||||
"""
|
||||
Simulates the persistence thread behavior described in persistence.md
|
||||
|
||||
For S3 latency, we use a Gamma distribution which is recommended for
|
||||
modeling network service response times because:
|
||||
- It has a natural lower bound (minimum network RTT)
|
||||
- It can model the right-skewed tail typical of network services
|
||||
- It captures both typical fast responses and occasional slow responses
|
||||
- Shape parameter controls the heaviness of the tail
|
||||
"""
|
||||
|
||||
def __init__(self,
|
||||
# Configuration from persistence.md defaults
|
||||
batch_timeout_ms: float = 5.0,
|
||||
batch_size_threshold: int = 1024 * 1024, # 1MB
|
||||
max_in_flight_requests: int = 5,
|
||||
max_retry_attempts: int = 3,
|
||||
retry_base_delay_ms: float = 100.0,
|
||||
|
||||
# S3 latency modeling (Gamma distribution parameters)
|
||||
s3_latency_shape: float = 2.0, # Shape parameter (α)
|
||||
s3_latency_scale: float = 25.0, # Scale parameter (β)
|
||||
s3_failure_rate: float = 0.01, # 1% failure rate
|
||||
|
||||
# Arrival rate modeling
|
||||
arrival_rate_per_sec: float = 1000.0, # Lambda for Poisson
|
||||
|
||||
# Simulation parameters
|
||||
simulation_duration_sec: float = 60.0):
|
||||
|
||||
# Configuration
|
||||
self.batch_timeout_ms = batch_timeout_ms
|
||||
self.batch_size_threshold = batch_size_threshold
|
||||
self.max_in_flight_requests = max_in_flight_requests
|
||||
self.max_retry_attempts = max_retry_attempts
|
||||
self.retry_base_delay_ms = retry_base_delay_ms
|
||||
|
||||
# S3 modeling parameters
|
||||
self.s3_latency_shape = s3_latency_shape
|
||||
self.s3_latency_scale = s3_latency_scale
|
||||
self.s3_failure_rate = s3_failure_rate
|
||||
|
||||
# Arrival modeling
|
||||
self.arrival_rate_per_sec = arrival_rate_per_sec
|
||||
self.simulation_duration_sec = simulation_duration_sec
|
||||
|
||||
# Simulation state
|
||||
self.current_time = 0.0
|
||||
self.event_queue = [] # Priority queue of (time, event_type, event_data)
|
||||
self.pending_commits = deque()
|
||||
self.current_batch = []
|
||||
self.batch_start_time = None # Will be set when first commit added to batch
|
||||
self.in_flight_requests: Dict[int, InFlightRequest] = {}
|
||||
self.next_batch_id = 0
|
||||
self.next_connection_id = 0
|
||||
self.next_commit_id = 0
|
||||
|
||||
# Metrics collection
|
||||
self.completed_commits = []
|
||||
self.batch_metrics = []
|
||||
self.retry_counts = defaultdict(int)
|
||||
self.queue_depth_samples = []
|
||||
self.timeline_events = [] # For debugging/visualization
|
||||
|
||||
# Random number generators
|
||||
self.rng = random.Random(42) # Reproducible results
|
||||
self.np_rng = np.random.RandomState(42)
|
||||
|
||||
def sample_s3_latency(self, batch_size_bytes: int = 0) -> float:
|
||||
"""
|
||||
Sample S3 response latency using Gamma distribution with size-dependent scaling.
|
||||
|
||||
Gamma distribution is ideal for S3 latency because:
|
||||
- Shape=2.0, Scale=15.0 gives variable latency around the mean
|
||||
- Minimum 30ms RTT prevents unrealistic sub-network responses
|
||||
- Right-skewed tail captures occasional slow responses
|
||||
- Models the reality that most responses are fast but some are slow
|
||||
|
||||
Size-dependent scaling:
|
||||
- Base latency: 30ms RTT + Gamma(2.0, 15.0) = ~60ms mean
|
||||
- Linear scaling with size: +20ms per MB
|
||||
- Large requests (1MB) average ~80ms with similar tail behavior
|
||||
"""
|
||||
# Minimum network RTT (realistic for cloud storage)
|
||||
min_rtt = 30.0 # 30ms minimum round-trip time
|
||||
|
||||
# Variable latency component from Gamma distribution
|
||||
variable_latency = self.np_rng.gamma(self.s3_latency_shape, self.s3_latency_scale)
|
||||
|
||||
# Size-dependent scaling: +20ms per MB
|
||||
size_mb = batch_size_bytes / (1024 * 1024)
|
||||
size_penalty = size_mb * 20.0 # 20ms per MB
|
||||
|
||||
return min_rtt + variable_latency + size_penalty
|
||||
|
||||
def sample_inter_arrival_time(self) -> float:
|
||||
"""Sample time between commit arrivals using Poisson process"""
|
||||
return self.np_rng.exponential(1.0 / self.arrival_rate_per_sec)
|
||||
|
||||
def sample_commit_size(self) -> int:
|
||||
"""
|
||||
Sample commit size with realistic distribution including large commits.
|
||||
|
||||
Distribution:
|
||||
- 70% small commits: 500B - 10KB (typical operations)
|
||||
- 25% medium commits: 10KB - 100KB (batch operations, large documents)
|
||||
- 5% large commits: 100KB - 1MB (bulk data, file uploads)
|
||||
|
||||
This creates a realistic mix where most commits are small but some
|
||||
can trigger size-based batching or become single-commit batches.
|
||||
"""
|
||||
rand = self.rng.random()
|
||||
|
||||
if rand < 0.70: # 70% small commits
|
||||
return self.rng.randint(500, 10 * 1024) # 500B - 10KB
|
||||
elif rand < 0.95: # 25% medium commits
|
||||
return self.rng.randint(10 * 1024, 100 * 1024) # 10KB - 100KB
|
||||
else: # 5% large commits
|
||||
return self.rng.randint(100 * 1024, 1024 * 1024) # 100KB - 1MB
|
||||
|
||||
def schedule_event(self, time: float, event_type: str, data=None):
|
||||
"""Add event to priority queue"""
|
||||
heapq.heappush(self.event_queue, (time, event_type, data))
|
||||
|
||||
def should_process_batch(self) -> bool:
|
||||
"""Check if current batch should be processed based on triggers"""
|
||||
if not self.current_batch:
|
||||
return False
|
||||
|
||||
# Size trigger
|
||||
batch_size = sum(c.size_bytes for c in self.current_batch)
|
||||
if batch_size >= self.batch_size_threshold:
|
||||
return True
|
||||
|
||||
# Time trigger - only if we have a valid batch start time
|
||||
if self.batch_start_time is not None:
|
||||
if (self.current_time - self.batch_start_time) >= (self.batch_timeout_ms / 1000.0):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def can_start_new_request(self) -> bool:
|
||||
"""Check if we can start a new request based on flow control"""
|
||||
return len(self.in_flight_requests) < self.max_in_flight_requests
|
||||
|
||||
def process_current_batch(self):
|
||||
"""Process the current batch if conditions are met"""
|
||||
if not self.current_batch or not self.can_start_new_request():
|
||||
return
|
||||
|
||||
if self.should_process_batch():
|
||||
batch = Batch(
|
||||
batch_id=self.next_batch_id,
|
||||
commits=self.current_batch.copy(),
|
||||
created_time=self.current_time
|
||||
)
|
||||
self.next_batch_id += 1
|
||||
|
||||
# Clear current batch
|
||||
self.current_batch.clear()
|
||||
self.batch_start_time = None # Reset to None, will be set on next batch
|
||||
|
||||
self.send_batch_to_s3(batch)
|
||||
|
||||
def send_batch_to_s3(self, batch: Batch, is_retry: bool = False):
|
||||
"""Send batch to S3 and track as in-flight request"""
|
||||
if not self.can_start_new_request():
|
||||
# This shouldn't happen due to flow control, but handle gracefully
|
||||
self.schedule_event(self.current_time + 0.001, 'retry_batch', batch)
|
||||
return
|
||||
|
||||
# Sample S3 response characteristics (pass batch size for latency modeling)
|
||||
s3_latency = self.sample_s3_latency(batch.size_bytes) / 1000.0 # Convert ms to seconds
|
||||
will_fail = self.rng.random() < self.s3_failure_rate
|
||||
|
||||
if will_fail:
|
||||
s3_latency *= 2 # Failed requests typically take longer
|
||||
|
||||
completion_time = self.current_time + s3_latency
|
||||
connection_id = self.next_connection_id
|
||||
self.next_connection_id += 1
|
||||
|
||||
# Track in-flight request
|
||||
in_flight = InFlightRequest(
|
||||
batch=batch,
|
||||
start_time=self.current_time,
|
||||
expected_completion=completion_time,
|
||||
connection_id=connection_id
|
||||
)
|
||||
self.in_flight_requests[connection_id] = in_flight
|
||||
|
||||
# Schedule completion event
|
||||
if will_fail:
|
||||
self.schedule_event(completion_time, 'batch_failed', connection_id)
|
||||
else:
|
||||
self.schedule_event(completion_time, 'batch_completed', connection_id)
|
||||
|
||||
# Log event for analysis
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_sent',
|
||||
'batch_id': batch.batch_id,
|
||||
'batch_size': batch.size_bytes,
|
||||
'commit_count': len(batch.commits),
|
||||
'retry_count': batch.retry_count,
|
||||
'is_retry': is_retry
|
||||
})
|
||||
|
||||
def handle_batch_completed(self, connection_id: int):
|
||||
"""Handle successful batch completion"""
|
||||
if connection_id not in self.in_flight_requests:
|
||||
return
|
||||
|
||||
in_flight = self.in_flight_requests.pop(connection_id)
|
||||
batch = in_flight.batch
|
||||
|
||||
# Calculate metrics
|
||||
batch_latency = self.current_time - batch.created_time
|
||||
self.batch_metrics.append({
|
||||
'batch_id': batch.batch_id,
|
||||
'latency': batch_latency,
|
||||
'size_bytes': batch.size_bytes,
|
||||
'commit_count': len(batch.commits),
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
|
||||
# Mark commits as completed and calculate end-to-end latency
|
||||
for commit in batch.commits:
|
||||
commit_latency = self.current_time - commit.arrival_time
|
||||
self.completed_commits.append({
|
||||
'commit_id': commit.commit_id,
|
||||
'arrival_time': commit.arrival_time,
|
||||
'completion_time': self.current_time,
|
||||
'latency': commit_latency,
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_completed',
|
||||
'batch_id': batch.batch_id,
|
||||
'latency': batch_latency,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
|
||||
# Try to process any pending work now that we have capacity
|
||||
self.process_pending_work()
|
||||
|
||||
def handle_batch_failed(self, connection_id: int):
|
||||
"""Handle batch failure with retry logic"""
|
||||
if connection_id not in self.in_flight_requests:
|
||||
return
|
||||
|
||||
in_flight = self.in_flight_requests.pop(connection_id)
|
||||
batch = in_flight.batch
|
||||
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_failed',
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
|
||||
if batch.retry_count < self.max_retry_attempts:
|
||||
# Exponential backoff retry
|
||||
batch.retry_count += 1
|
||||
backoff_delay = (self.retry_base_delay_ms / 1000.0) * (2 ** (batch.retry_count - 1))
|
||||
retry_time = self.current_time + backoff_delay
|
||||
|
||||
self.schedule_event(retry_time, 'retry_batch', batch)
|
||||
self.retry_counts[batch.retry_count] += 1
|
||||
else:
|
||||
# Max retries exhausted - this would be a fatal error in real system
|
||||
self.timeline_events.append({
|
||||
'time': self.current_time,
|
||||
'event': 'batch_abandoned',
|
||||
'batch_id': batch.batch_id,
|
||||
'retry_count': batch.retry_count
|
||||
})
|
||||
|
||||
def handle_retry_batch(self, batch: Batch):
|
||||
"""Handle batch retry"""
|
||||
self.send_batch_to_s3(batch, is_retry=True)
|
||||
|
||||
def process_pending_work(self):
|
||||
"""Process any pending commits that can now be batched"""
|
||||
# Move pending commits to current batch
|
||||
while self.pending_commits and self.can_start_new_request():
|
||||
if not self.current_batch:
|
||||
self.batch_start_time = self.current_time
|
||||
|
||||
commit = self.pending_commits.popleft()
|
||||
self.current_batch.append(commit)
|
||||
|
||||
# Check if we should process this batch immediately
|
||||
if self.should_process_batch():
|
||||
self.process_current_batch()
|
||||
break
|
||||
|
||||
# If we have commits but no in-flight capacity, they stay in current_batch
|
||||
# and will be processed when capacity becomes available
|
||||
|
||||
def handle_commit_arrival(self, commit: Commit):
|
||||
"""Handle new commit arrival"""
|
||||
# If we have in-flight capacity, try to add to current batch
|
||||
if self.can_start_new_request():
|
||||
if not self.current_batch:
|
||||
self.batch_start_time = self.current_time
|
||||
|
||||
self.current_batch.append(commit)
|
||||
self.process_current_batch() # Check if we should process now
|
||||
else:
|
||||
# Add to pending queue due to flow control
|
||||
self.pending_commits.append(commit)
|
||||
|
||||
# Schedule timeout event for current batch if it's the first commit
|
||||
if len(self.current_batch) == 1 and not self.pending_commits:
|
||||
timeout_time = self.current_time + (self.batch_timeout_ms / 1000.0)
|
||||
self.schedule_event(timeout_time, 'batch_timeout', None)
|
||||
|
||||
def handle_batch_timeout(self):
|
||||
"""Handle batch timeout trigger"""
|
||||
if self.current_batch and self.can_start_new_request():
|
||||
self.process_current_batch()
|
||||
|
||||
def sample_queue_depth(self):
|
||||
"""Sample current queue depths for analysis"""
|
||||
pending_count = len(self.pending_commits)
|
||||
current_batch_count = len(self.current_batch)
|
||||
in_flight_count = len(self.in_flight_requests)
|
||||
|
||||
self.queue_depth_samples.append({
|
||||
'time': self.current_time,
|
||||
'pending_commits': pending_count,
|
||||
'current_batch_size': current_batch_count,
|
||||
'in_flight_requests': in_flight_count,
|
||||
'total_unacknowledged': pending_count + current_batch_count +
|
||||
sum(len(req.batch.commits) for req in self.in_flight_requests.values())
|
||||
})
|
||||
|
||||
def generate_arrivals(self):
|
||||
"""Generate Poisson arrival events for the simulation"""
|
||||
current_time = 0.0
|
||||
|
||||
while current_time < self.simulation_duration_sec:
|
||||
# Sample next arrival time
|
||||
inter_arrival = self.sample_inter_arrival_time()
|
||||
current_time += inter_arrival
|
||||
|
||||
if current_time >= self.simulation_duration_sec:
|
||||
break
|
||||
|
||||
# Create commit
|
||||
commit = Commit(
|
||||
commit_id=self.next_commit_id,
|
||||
arrival_time=current_time,
|
||||
size_bytes=self.sample_commit_size() # Realistic size distribution
|
||||
)
|
||||
self.next_commit_id += 1
|
||||
|
||||
# Schedule arrival event
|
||||
self.schedule_event(current_time, 'commit_arrival', commit)
|
||||
|
||||
# Schedule periodic queue depth sampling
|
||||
sample_time = 0.0
|
||||
while sample_time < self.simulation_duration_sec:
|
||||
self.schedule_event(sample_time, 'sample_queue_depth', None)
|
||||
sample_time += 0.1 # Sample every 100ms
|
||||
|
||||
def run_simulation(self):
|
||||
"""Run the complete simulation"""
|
||||
print(f"Starting persistence simulation...")
|
||||
print(f"Arrival rate: {self.arrival_rate_per_sec} commits/sec")
|
||||
print(f"Duration: {self.simulation_duration_sec} seconds")
|
||||
print(f"Expected commits: ~{int(self.arrival_rate_per_sec * self.simulation_duration_sec)}")
|
||||
print()
|
||||
|
||||
# Generate all arrival events
|
||||
self.generate_arrivals()
|
||||
|
||||
# Process events in time order
|
||||
events_processed = 0
|
||||
while self.event_queue and events_processed < 1000000: # Safety limit
|
||||
time, event_type, data = heapq.heappop(self.event_queue)
|
||||
self.current_time = time
|
||||
|
||||
if time > self.simulation_duration_sec:
|
||||
break
|
||||
|
||||
if event_type == 'commit_arrival':
|
||||
self.handle_commit_arrival(data)
|
||||
elif event_type == 'batch_completed':
|
||||
self.handle_batch_completed(data)
|
||||
elif event_type == 'batch_failed':
|
||||
self.handle_batch_failed(data)
|
||||
elif event_type == 'retry_batch':
|
||||
self.handle_retry_batch(data)
|
||||
elif event_type == 'batch_timeout':
|
||||
self.handle_batch_timeout()
|
||||
elif event_type == 'sample_queue_depth':
|
||||
self.sample_queue_depth()
|
||||
|
||||
events_processed += 1
|
||||
|
||||
print(f"Simulation completed. Processed {events_processed} events.")
|
||||
return self.analyze_results()
|
||||
|
||||
def analyze_results(self) -> Dict:
|
||||
"""Analyze simulation results and return metrics"""
|
||||
if not self.completed_commits:
|
||||
return {"error": "No commits completed during simulation"}
|
||||
|
||||
# Calculate latency statistics
|
||||
latencies = [c['latency'] * 1000 for c in self.completed_commits] # Convert to ms
|
||||
|
||||
results = {
|
||||
'simulation_config': {
|
||||
'duration_sec': self.simulation_duration_sec,
|
||||
'arrival_rate_per_sec': self.arrival_rate_per_sec,
|
||||
'batch_timeout_ms': self.batch_timeout_ms,
|
||||
'batch_size_threshold': self.batch_size_threshold,
|
||||
'max_in_flight_requests': self.max_in_flight_requests,
|
||||
's3_latency_params': f"Gamma(shape={self.s3_latency_shape}, scale={self.s3_latency_scale})",
|
||||
's3_failure_rate': self.s3_failure_rate
|
||||
},
|
||||
'commit_metrics': {
|
||||
'total_commits': len(self.completed_commits),
|
||||
'latency_ms': {
|
||||
'mean': statistics.mean(latencies),
|
||||
'median': statistics.median(latencies),
|
||||
'std': statistics.stdev(latencies) if len(latencies) > 1 else 0,
|
||||
'min': min(latencies),
|
||||
'max': max(latencies),
|
||||
'p95': np.percentile(latencies, 95),
|
||||
'p99': np.percentile(latencies, 99)
|
||||
}
|
||||
},
|
||||
'batch_metrics': {
|
||||
'total_batches': len(self.batch_metrics),
|
||||
'avg_commits_per_batch': statistics.mean([b['commit_count'] for b in self.batch_metrics]),
|
||||
'avg_batch_size_bytes': statistics.mean([b['size_bytes'] for b in self.batch_metrics]),
|
||||
'avg_batch_latency_ms': statistics.mean([b['latency'] * 1000 for b in self.batch_metrics])
|
||||
},
|
||||
'retry_analysis': dict(self.retry_counts),
|
||||
'queue_depth_analysis': self._analyze_queue_depths()
|
||||
}
|
||||
|
||||
return results
|
||||
|
||||
def _analyze_queue_depths(self) -> Dict:
|
||||
"""Analyze queue depth patterns"""
|
||||
if not self.queue_depth_samples:
|
||||
return {}
|
||||
|
||||
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||
|
||||
return {
|
||||
'pending_commits': {
|
||||
'mean': statistics.mean(pending),
|
||||
'max': max(pending),
|
||||
'p95': np.percentile(pending, 95)
|
||||
},
|
||||
'in_flight_requests': {
|
||||
'mean': statistics.mean(in_flight),
|
||||
'max': max(in_flight),
|
||||
'p95': np.percentile(in_flight, 95)
|
||||
},
|
||||
'total_unacknowledged': {
|
||||
'mean': statistics.mean(total_unack),
|
||||
'max': max(total_unack),
|
||||
'p95': np.percentile(total_unack, 95)
|
||||
}
|
||||
}
|
||||
|
||||
def plot_results(self, results: Dict, save_path: Optional[str] = None):
|
||||
"""Generate visualization plots of simulation results"""
|
||||
if not self.completed_commits:
|
||||
print("No data to plot")
|
||||
return
|
||||
|
||||
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
||||
fig.suptitle('Persistence Thread Simulation Results', fontsize=16)
|
||||
|
||||
# Plot 1: Commit latency histogram
|
||||
latencies_ms = [c['latency'] * 1000 for c in self.completed_commits]
|
||||
ax1.hist(latencies_ms, bins=50, alpha=0.7, edgecolor='black')
|
||||
ax1.set_xlabel('Commit Latency (ms)')
|
||||
ax1.set_ylabel('Count')
|
||||
ax1.set_title('Commit Latency Distribution')
|
||||
ax1.axvline(results['commit_metrics']['latency_ms']['mean'], color='red',
|
||||
linestyle='--', label=f"Mean: {results['commit_metrics']['latency_ms']['mean']:.1f}ms")
|
||||
ax1.axvline(results['commit_metrics']['latency_ms']['p95'], color='orange',
|
||||
linestyle='--', label=f"P95: {results['commit_metrics']['latency_ms']['p95']:.1f}ms")
|
||||
ax1.legend()
|
||||
|
||||
# Plot 2: Timeline of commit completions
|
||||
completion_times = [c['completion_time'] for c in self.completed_commits]
|
||||
completion_latencies = [c['latency'] * 1000 for c in self.completed_commits]
|
||||
ax2.scatter(completion_times, completion_latencies, alpha=0.6, s=10)
|
||||
ax2.set_xlabel('Time (seconds)')
|
||||
ax2.set_ylabel('Commit Latency (ms)')
|
||||
ax2.set_title('Latency Over Time')
|
||||
|
||||
# Plot 3: Queue depth over time
|
||||
if self.queue_depth_samples:
|
||||
times = [s['time'] for s in self.queue_depth_samples]
|
||||
pending = [s['pending_commits'] for s in self.queue_depth_samples]
|
||||
in_flight = [s['in_flight_requests'] for s in self.queue_depth_samples]
|
||||
total_unack = [s['total_unacknowledged'] for s in self.queue_depth_samples]
|
||||
|
||||
ax3.plot(times, pending, label='Pending Commits', alpha=0.8)
|
||||
ax3.plot(times, in_flight, label='In-Flight Requests', alpha=0.8)
|
||||
ax3.plot(times, total_unack, label='Total Unacknowledged', alpha=0.8)
|
||||
ax3.axhline(self.max_in_flight_requests, color='red', linestyle='--',
|
||||
label=f'Max In-Flight Limit ({self.max_in_flight_requests})')
|
||||
ax3.set_xlabel('Time (seconds)')
|
||||
ax3.set_ylabel('Count')
|
||||
ax3.set_title('Queue Depths Over Time')
|
||||
ax3.legend()
|
||||
|
||||
# Plot 4: Batch size distribution
|
||||
if self.batch_metrics:
|
||||
batch_sizes = [b['commit_count'] for b in self.batch_metrics]
|
||||
ax4.hist(batch_sizes, bins=20, alpha=0.7, edgecolor='black')
|
||||
ax4.set_xlabel('Commits per Batch')
|
||||
ax4.set_ylabel('Count')
|
||||
ax4.set_title('Batch Size Distribution')
|
||||
|
||||
plt.tight_layout()
|
||||
|
||||
if save_path:
|
||||
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
||||
print(f"Plots saved to {save_path}")
|
||||
else:
|
||||
plt.show()
|
||||
|
||||
|
||||
def print_results(results: Dict):
|
||||
"""Pretty print simulation results"""
|
||||
print("=" * 80)
|
||||
print("PERSISTENCE THREAD SIMULATION RESULTS")
|
||||
print("=" * 80)
|
||||
|
||||
# Configuration
|
||||
config = results['simulation_config']
|
||||
print(f"\nConfiguration:")
|
||||
print(f" Duration: {config['duration_sec']}s")
|
||||
print(f" Arrival Rate: {config['arrival_rate_per_sec']} commits/sec")
|
||||
print(f" Batch Timeout: {config['batch_timeout_ms']}ms")
|
||||
print(f" Batch Size Threshold: {config['batch_size_threshold']:,} bytes")
|
||||
print(f" Max In-Flight: {config['max_in_flight_requests']}")
|
||||
print(f" S3 Latency: {config['s3_latency_params']}")
|
||||
print(f" S3 Failure Rate: {config['s3_failure_rate']:.1%}")
|
||||
|
||||
# Commit metrics
|
||||
commit_metrics = results['commit_metrics']
|
||||
latency = commit_metrics['latency_ms']
|
||||
print(f"\nCommit Performance:")
|
||||
print(f" Total Commits: {commit_metrics['total_commits']:,}")
|
||||
print(f" Latency Mean: {latency['mean']:.2f}ms")
|
||||
print(f" Latency Median: {latency['median']:.2f}ms")
|
||||
print(f" Latency P95: {latency['p95']:.2f}ms")
|
||||
print(f" Latency P99: {latency['p99']:.2f}ms")
|
||||
print(f" Latency Std: {latency['std']:.2f}ms")
|
||||
print(f" Latency Range: {latency['min']:.2f}ms - {latency['max']:.2f}ms")
|
||||
|
||||
# Batch metrics
|
||||
batch_metrics = results['batch_metrics']
|
||||
print(f"\nBatching Performance:")
|
||||
print(f" Total Batches: {batch_metrics['total_batches']:,}")
|
||||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||||
print(f" Avg Batch Latency: {batch_metrics['avg_batch_latency_ms']:.2f}ms")
|
||||
|
||||
# Retry analysis
|
||||
if results['retry_analysis']:
|
||||
print(f"\nRetry Analysis:")
|
||||
for retry_count, occurrences in results['retry_analysis'].items():
|
||||
print(f" {occurrences:,} batches required {retry_count} retries")
|
||||
|
||||
# Queue depth analysis
|
||||
if results['queue_depth_analysis']:
|
||||
queue_analysis = results['queue_depth_analysis']
|
||||
print(f"\nQueue Depth Analysis:")
|
||||
if 'pending_commits' in queue_analysis:
|
||||
pending = queue_analysis['pending_commits']
|
||||
print(f" Pending Commits - Mean: {pending['mean']:.1f}, Max: {pending['max']}, P95: {pending['p95']:.1f}")
|
||||
if 'in_flight_requests' in queue_analysis:
|
||||
in_flight = queue_analysis['in_flight_requests']
|
||||
print(f" In-Flight Requests - Mean: {in_flight['mean']:.1f}, Max: {in_flight['max']}, P95: {in_flight['p95']:.1f}")
|
||||
if 'total_unacknowledged' in queue_analysis:
|
||||
total = queue_analysis['total_unacknowledged']
|
||||
print(f" Total Unacknowledged - Mean: {total['mean']:.1f}, Max: {total['max']}, P95: {total['p95']:.1f}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
print("Running Persistence Thread Configuration Analysis")
|
||||
print("S3 Latency Modeling: Gamma distribution (shape=2.0, scale=25ms)")
|
||||
print("Testing different configurations to optimize latency...")
|
||||
print()
|
||||
|
||||
# Test configurations with different max_in_flight values
|
||||
configs = [
|
||||
{"name": "Baseline (max_in_flight=5)", "max_in_flight_requests": 5},
|
||||
{"name": "Higher Parallelism (max_in_flight=10)", "max_in_flight_requests": 10},
|
||||
{"name": "Much Higher (max_in_flight=20)", "max_in_flight_requests": 20},
|
||||
{"name": "Lower Timeout (max_in_flight=10, timeout=2ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 2.0},
|
||||
{"name": "Higher Timeout (max_in_flight=10, timeout=10ms)", "max_in_flight_requests": 10, "batch_timeout_ms": 10.0},
|
||||
]
|
||||
|
||||
results_comparison = []
|
||||
|
||||
for config in configs:
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Testing: {config['name']}")
|
||||
print(f"{'='*60}")
|
||||
|
||||
sim = PersistenceSimulation(
|
||||
arrival_rate_per_sec=1000.0,
|
||||
simulation_duration_sec=30.0,
|
||||
s3_latency_shape=2.0,
|
||||
s3_latency_scale=25.0,
|
||||
s3_failure_rate=0.01,
|
||||
max_in_flight_requests=config.get("max_in_flight_requests", 5),
|
||||
batch_timeout_ms=config.get("batch_timeout_ms", 5.0)
|
||||
)
|
||||
|
||||
results = sim.run_simulation()
|
||||
results["config_name"] = config["name"]
|
||||
results_comparison.append(results)
|
||||
|
||||
# Print key metrics for quick comparison
|
||||
commit_metrics = results['commit_metrics']
|
||||
batch_metrics = results['batch_metrics']
|
||||
queue_metrics = results.get('queue_depth_analysis', {})
|
||||
|
||||
print(f"\nKey Metrics:")
|
||||
print(f" Mean Latency: {commit_metrics['latency_ms']['mean']:.1f}ms")
|
||||
print(f" P95 Latency: {commit_metrics['latency_ms']['p95']:.1f}ms")
|
||||
print(f" P99 Latency: {commit_metrics['latency_ms']['p99']:.1f}ms")
|
||||
print(f" Avg Commits/Batch: {batch_metrics['avg_commits_per_batch']:.1f}")
|
||||
print(f" Avg Batch Size: {batch_metrics['avg_batch_size_bytes']/1024:.1f}KB")
|
||||
if queue_metrics:
|
||||
print(f" Avg Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('mean', 0):.1f}")
|
||||
print(f" Max Queue Depth: {queue_metrics.get('total_unacknowledged', {}).get('max', 0)}")
|
||||
|
||||
# Summary comparison
|
||||
print(f"\n{'='*80}")
|
||||
print("CONFIGURATION COMPARISON SUMMARY")
|
||||
print(f"{'='*80}")
|
||||
print(f"{'Configuration':<40} {'Mean':<8} {'P95':<8} {'P99':<8} {'AvgQueue':<10}")
|
||||
print(f"{'-'*80}")
|
||||
|
||||
for result in results_comparison:
|
||||
name = result["config_name"]
|
||||
commit_metrics = result['commit_metrics']
|
||||
queue_metrics = result.get('queue_depth_analysis', {})
|
||||
mean_lat = commit_metrics['latency_ms']['mean']
|
||||
p95_lat = commit_metrics['latency_ms']['p95']
|
||||
p99_lat = commit_metrics['latency_ms']['p99']
|
||||
avg_queue = queue_metrics.get('total_unacknowledged', {}).get('mean', 0)
|
||||
|
||||
print(f"{name:<40} {mean_lat:<8.1f} {p95_lat:<8.1f} {p99_lat:<8.1f} {avg_queue:<10.1f}")
|
||||
|
||||
print(f"\nRecommendation: Choose config with lowest P95/P99 latencies")
|
||||
print(f"Note: Higher in-flight allows more parallelism but may increase queue variability")
|
||||
|
||||
# Generate plots for best configuration
|
||||
best_config = min(results_comparison, key=lambda r: r['commit_metrics']['latency_ms']['p95'])
|
||||
print(f"\nGenerating plots for best configuration: {best_config['config_name']}")
|
||||
|
||||
try:
|
||||
# Re-run best config to get simulation object for plotting
|
||||
best_params = next(c for c in configs if c['name'] == best_config['config_name'])
|
||||
sim_best = PersistenceSimulation(
|
||||
arrival_rate_per_sec=1000.0,
|
||||
simulation_duration_sec=30.0,
|
||||
s3_latency_shape=2.0,
|
||||
s3_latency_scale=25.0,
|
||||
s3_failure_rate=0.01,
|
||||
max_in_flight_requests=best_params.get("max_in_flight_requests", 5),
|
||||
batch_timeout_ms=best_params.get("batch_timeout_ms", 5.0)
|
||||
)
|
||||
sim_best.run_simulation()
|
||||
sim_best.plot_results(best_config, f'persistence_optimization_results.png')
|
||||
except Exception as e:
|
||||
print(f"\nCould not generate plots: {e}")
|
||||
print("Install matplotlib and numpy to enable visualization")
|
||||
Reference in New Issue
Block a user