461 lines
18 KiB
Python
461 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Persistence Thread Parameter Optimization
|
|
|
|
Uses Bayesian Optimization to automatically find the optimal configuration
|
|
parameters that minimize commit latency. This is much more efficient than
|
|
grid search since it uses a probabilistic model to guide parameter exploration.
|
|
|
|
Key advantages:
|
|
- Efficiently explores high-dimensional parameter spaces
|
|
- Uses previous simulation results to guide future parameter choices
|
|
- Handles expensive objective function evaluations (our simulation)
|
|
- Provides uncertainty estimates for parameter importance
|
|
"""
|
|
|
|
import numpy as np
|
|
from typing import Dict, List, Tuple, Optional
|
|
import time
|
|
from persistence_simulation import PersistenceSimulation, print_results
|
|
|
|
# Try to import scikit-optimize for Bayesian optimization
|
|
try:
|
|
from skopt import gp_minimize, forest_minimize
|
|
from skopt.space import Real, Integer
|
|
from skopt.utils import use_named_args
|
|
from skopt.plots import plot_convergence, plot_objective
|
|
import matplotlib.pyplot as plt
|
|
OPTIMIZE_AVAILABLE = True
|
|
except ImportError:
|
|
print("scikit-optimize not available. Install with: pip install scikit-optimize")
|
|
print("Falling back to grid search...")
|
|
OPTIMIZE_AVAILABLE = False
|
|
|
|
|
|
class PersistenceOptimizer:
|
|
"""
|
|
Automated parameter optimization for the persistence thread using Bayesian optimization.
|
|
|
|
This class finds the optimal configuration parameters to minimize commit latency
|
|
by intelligently exploring the parameter space using Gaussian Process models.
|
|
"""
|
|
|
|
def __init__(self,
|
|
optimization_budget: int = 50,
|
|
simulation_duration: float = 20.0,
|
|
arrival_rate: float = 1000.0,
|
|
objective_metric: str = "p95_latency",
|
|
random_seed: int = 42):
|
|
|
|
self.optimization_budget = optimization_budget
|
|
self.simulation_duration = simulation_duration
|
|
self.arrival_rate = arrival_rate
|
|
self.objective_metric = objective_metric
|
|
self.random_seed = random_seed
|
|
|
|
# Track optimization history
|
|
self.optimization_history = []
|
|
self.best_params = None
|
|
self.best_score = float('inf')
|
|
|
|
# Define parameter search space
|
|
self.parameter_space = self._define_search_space()
|
|
self.parameter_names = [dim.name for dim in self.parameter_space]
|
|
|
|
def _define_search_space(self) -> List:
|
|
"""
|
|
Define the parameter search space for optimization.
|
|
|
|
Focus on the 3 core parameters that matter for persistence thread performance
|
|
with 100% reliable S3. Retry parameters removed since S3 never fails.
|
|
"""
|
|
return [
|
|
# Core batching parameters
|
|
Real(1.0, 50.0, name='batch_timeout_ms',
|
|
prior='log-uniform'), # Log scale since small changes matter
|
|
Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB
|
|
prior='log-uniform'),
|
|
|
|
# Flow control parameters - likely the most impactful
|
|
Integer(1, 50, name='max_in_flight_requests'),
|
|
]
|
|
|
|
def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict:
|
|
"""Run simulation with given parameters and return results"""
|
|
try:
|
|
sim = PersistenceSimulation(
|
|
batch_timeout_ms=params['batch_timeout_ms'],
|
|
batch_size_threshold=int(params['batch_size_threshold']),
|
|
max_in_flight_requests=int(params['max_in_flight_requests']),
|
|
# Retry parameters fixed since S3 is 100% reliable
|
|
max_retry_attempts=0, # No retries needed
|
|
retry_base_delay_ms=100.0, # Irrelevant but needs a value
|
|
# S3 parameters kept fixed - 100% reliable for optimization focus
|
|
s3_latency_shape=2.0, # Fixed Gamma shape
|
|
s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean)
|
|
s3_failure_rate=0.0, # 100% reliable S3
|
|
arrival_rate_per_sec=self.arrival_rate,
|
|
simulation_duration_sec=self.simulation_duration
|
|
)
|
|
|
|
return sim.run_simulation()
|
|
|
|
except Exception as e:
|
|
print(f"Simulation failed with params {params}: {e}")
|
|
# Return a high penalty score for failed simulations
|
|
return {
|
|
'commit_metrics': {
|
|
'latency_ms': {
|
|
'mean': 10000,
|
|
'p95': 10000,
|
|
'p99': 10000
|
|
}
|
|
},
|
|
'error': str(e)
|
|
}
|
|
|
|
def _extract_objective_value(self, results: Dict) -> float:
|
|
"""Extract the objective value to minimize from simulation results"""
|
|
try:
|
|
commit_metrics = results['commit_metrics']['latency_ms']
|
|
|
|
if self.objective_metric == "mean_latency":
|
|
return commit_metrics['mean']
|
|
elif self.objective_metric == "p95_latency":
|
|
return commit_metrics['p95']
|
|
elif self.objective_metric == "p99_latency":
|
|
return commit_metrics['p99']
|
|
elif self.objective_metric == "weighted_latency":
|
|
# Weighted combination emphasizing tail latencies
|
|
return (0.3 * commit_metrics['mean'] +
|
|
0.5 * commit_metrics['p95'] +
|
|
0.2 * commit_metrics['p99'])
|
|
else:
|
|
return commit_metrics['p95'] # Default to P95
|
|
|
|
except KeyError as e:
|
|
print(f"Failed to extract objective from results: {e}")
|
|
return 10000 # High penalty for invalid results
|
|
|
|
def optimize_with_bayesian(self) -> Tuple[Dict, float]:
|
|
"""
|
|
Use Bayesian Optimization to find optimal parameters.
|
|
|
|
This uses Gaussian Process models to build a probabilistic model
|
|
of the objective function and intelligently choose where to sample next.
|
|
"""
|
|
if not OPTIMIZE_AVAILABLE:
|
|
return self.optimize_with_grid_search()
|
|
|
|
print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations")
|
|
print(f"Objective: Minimize {self.objective_metric}")
|
|
print(f"Parameter space: {len(self.parameter_space)} dimensions")
|
|
print()
|
|
|
|
@use_named_args(self.parameter_space)
|
|
def objective(**params):
|
|
"""Objective function for Bayesian optimization"""
|
|
print(f"Evaluating: {params}")
|
|
|
|
start_time = time.time()
|
|
results = self._run_simulation_with_params(params)
|
|
eval_time = time.time() - start_time
|
|
|
|
objective_value = self._extract_objective_value(results)
|
|
|
|
# Track optimization history
|
|
history_entry = {
|
|
'params': params.copy(),
|
|
'objective_value': objective_value,
|
|
'results': results,
|
|
'eval_time': eval_time,
|
|
'iteration': len(self.optimization_history) + 1
|
|
}
|
|
self.optimization_history.append(history_entry)
|
|
|
|
# Update best if improved
|
|
if objective_value < self.best_score:
|
|
self.best_score = objective_value
|
|
self.best_params = params.copy()
|
|
print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})")
|
|
else:
|
|
print(f" Score: {objective_value:.2f}ms")
|
|
|
|
print(f" Time: {eval_time:.1f}s")
|
|
print()
|
|
|
|
return objective_value
|
|
|
|
# Run Bayesian optimization
|
|
result = gp_minimize(
|
|
func=objective,
|
|
dimensions=self.parameter_space,
|
|
n_calls=self.optimization_budget,
|
|
n_initial_points=10, # Random exploration first
|
|
acq_func='EI', # Expected Improvement acquisition
|
|
random_state=self.random_seed
|
|
)
|
|
|
|
# Extract best parameters
|
|
best_params_list = result.x
|
|
best_params_dict = dict(zip(self.parameter_names, best_params_list))
|
|
best_objective = result.fun
|
|
|
|
return best_params_dict, best_objective
|
|
|
|
def optimize_with_grid_search(self) -> Tuple[Dict, float]:
|
|
"""Fallback grid search optimization if scikit-optimize not available"""
|
|
print("Using grid search optimization (install scikit-optimize for better results)")
|
|
print()
|
|
|
|
# Define a smaller grid for key parameters
|
|
grid_configs = [
|
|
# Vary max_in_flight and batch_timeout
|
|
{'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0},
|
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0},
|
|
{'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0},
|
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0},
|
|
{'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0},
|
|
{'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0},
|
|
{'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0},
|
|
]
|
|
|
|
best_params = None
|
|
best_score = float('inf')
|
|
|
|
for i, config in enumerate(grid_configs):
|
|
print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}")
|
|
|
|
# Use default values for unspecified parameters
|
|
full_params = {
|
|
'batch_timeout_ms': 5.0,
|
|
'batch_size_threshold': 1024 * 1024,
|
|
'max_in_flight_requests': 5
|
|
}
|
|
full_params.update(config)
|
|
|
|
results = self._run_simulation_with_params(full_params)
|
|
objective_value = self._extract_objective_value(results)
|
|
|
|
if objective_value < best_score:
|
|
best_score = objective_value
|
|
best_params = full_params.copy()
|
|
print(f"✓ NEW BEST: {objective_value:.2f}ms")
|
|
else:
|
|
print(f" Score: {objective_value:.2f}ms")
|
|
print()
|
|
|
|
return best_params, best_score
|
|
|
|
def analyze_parameter_importance(self):
|
|
"""Analyze which parameters have the most impact on performance"""
|
|
if not self.optimization_history:
|
|
print("No optimization history available")
|
|
return
|
|
|
|
print("Parameter Importance Analysis")
|
|
print("=" * 50)
|
|
|
|
# Extract parameter values and objectives
|
|
param_data = {}
|
|
objectives = []
|
|
|
|
for entry in self.optimization_history:
|
|
objectives.append(entry['objective_value'])
|
|
for param_name, param_value in entry['params'].items():
|
|
if param_name not in param_data:
|
|
param_data[param_name] = []
|
|
param_data[param_name].append(param_value)
|
|
|
|
objectives = np.array(objectives)
|
|
|
|
# Simple correlation analysis
|
|
print("Parameter correlations with objective (lower is better):")
|
|
correlations = []
|
|
|
|
for param_name, values in param_data.items():
|
|
correlation = np.corrcoef(values, objectives)[0, 1]
|
|
correlations.append((param_name, correlation))
|
|
print(f" {param_name:<25}: {correlation:+.3f}")
|
|
|
|
print("\nMost impactful parameters (by absolute correlation):")
|
|
correlations.sort(key=lambda x: abs(x[1]), reverse=True)
|
|
for param_name, correlation in correlations[:5]:
|
|
impact = "reduces latency" if correlation < 0 else "increases latency"
|
|
print(f" {param_name:<25}: {impact} (r={correlation:+.3f})")
|
|
|
|
def plot_optimization_progress(self, save_path: Optional[str] = None):
|
|
"""Plot optimization convergence"""
|
|
if not OPTIMIZE_AVAILABLE or not self.optimization_history:
|
|
return
|
|
|
|
iterations = [entry['iteration'] for entry in self.optimization_history]
|
|
objectives = [entry['objective_value'] for entry in self.optimization_history]
|
|
|
|
# Calculate running minimum (best so far)
|
|
running_min = []
|
|
current_min = float('inf')
|
|
for obj in objectives:
|
|
current_min = min(current_min, obj)
|
|
running_min.append(current_min)
|
|
|
|
plt.figure(figsize=(12, 8))
|
|
|
|
# Plot 1: Objective value over iterations
|
|
plt.subplot(2, 2, 1)
|
|
plt.scatter(iterations, objectives, alpha=0.6, s=30)
|
|
plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far')
|
|
plt.xlabel('Iteration')
|
|
plt.ylabel(f'{self.objective_metric} (ms)')
|
|
plt.title('Optimization Progress')
|
|
plt.legend()
|
|
plt.grid(True, alpha=0.3)
|
|
|
|
# Plot 2: Parameter evolution for key parameters
|
|
plt.subplot(2, 2, 2)
|
|
key_params = ['max_in_flight_requests', 'batch_timeout_ms']
|
|
for param in key_params:
|
|
if param in self.optimization_history[0]['params']:
|
|
values = [entry['params'][param] for entry in self.optimization_history]
|
|
plt.scatter(iterations, values, alpha=0.6, label=param, s=30)
|
|
plt.xlabel('Iteration')
|
|
plt.ylabel('Parameter Value')
|
|
plt.title('Key Parameter Evolution')
|
|
plt.legend()
|
|
plt.grid(True, alpha=0.3)
|
|
|
|
# Plot 3: Objective distribution
|
|
plt.subplot(2, 2, 3)
|
|
plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black')
|
|
plt.axvline(self.best_score, color='red', linestyle='--',
|
|
label=f'Best: {self.best_score:.1f}ms')
|
|
plt.xlabel(f'{self.objective_metric} (ms)')
|
|
plt.ylabel('Count')
|
|
plt.title('Objective Value Distribution')
|
|
plt.legend()
|
|
plt.grid(True, alpha=0.3)
|
|
|
|
# Plot 4: Convergence rate
|
|
plt.subplot(2, 2, 4)
|
|
improvements = []
|
|
for i, entry in enumerate(self.optimization_history):
|
|
if i == 0:
|
|
improvements.append(0)
|
|
else:
|
|
prev_best = running_min[i-1]
|
|
curr_best = running_min[i]
|
|
improvement = prev_best - curr_best
|
|
improvements.append(improvement)
|
|
|
|
plt.plot(iterations, improvements, 'g-', marker='o', markersize=3)
|
|
plt.xlabel('Iteration')
|
|
plt.ylabel('Improvement (ms)')
|
|
plt.title('Per-Iteration Improvement')
|
|
plt.grid(True, alpha=0.3)
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches='tight')
|
|
print(f"Optimization plots saved to {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
def run_optimization(self) -> Dict:
|
|
"""Run the full optimization process and return results"""
|
|
start_time = time.time()
|
|
|
|
# Run optimization
|
|
if OPTIMIZE_AVAILABLE:
|
|
best_params, best_score = self.optimize_with_bayesian()
|
|
else:
|
|
best_params, best_score = self.optimize_with_grid_search()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Run final simulation with best parameters for detailed results
|
|
print("Running final simulation with optimal parameters...")
|
|
final_results = self._run_simulation_with_params(best_params)
|
|
|
|
# Prepare optimization summary
|
|
optimization_summary = {
|
|
'best_parameters': best_params,
|
|
'best_objective_value': best_score,
|
|
'optimization_time': total_time,
|
|
'evaluations_performed': len(self.optimization_history),
|
|
'final_simulation_results': final_results,
|
|
'optimization_history': self.optimization_history
|
|
}
|
|
|
|
return optimization_summary
|
|
|
|
def print_optimization_summary(self, summary: Dict):
|
|
"""Print a comprehensive summary of optimization results"""
|
|
print("=" * 80)
|
|
print("BAYESIAN OPTIMIZATION RESULTS")
|
|
print("=" * 80)
|
|
|
|
print(f"Optimization completed in {summary['optimization_time']:.1f} seconds")
|
|
print(f"Performed {summary['evaluations_performed']} parameter evaluations")
|
|
print(f"Best {self.objective_metric}: {summary['best_objective_value']:.2f}ms")
|
|
print()
|
|
|
|
print("OPTIMAL PARAMETERS:")
|
|
print("-" * 40)
|
|
for param, value in summary['best_parameters'].items():
|
|
if isinstance(value, float):
|
|
if param.endswith('_rate'):
|
|
print(f" {param:<25}: {value:.4f}")
|
|
else:
|
|
print(f" {param:<25}: {value:.2f}")
|
|
else:
|
|
print(f" {param:<25}: {value}")
|
|
|
|
print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:")
|
|
print("-" * 50)
|
|
final_results = summary['final_simulation_results']
|
|
print_results(final_results)
|
|
|
|
print("\nPARAMETER IMPACT ANALYSIS:")
|
|
print("-" * 30)
|
|
self.analyze_parameter_importance()
|
|
|
|
|
|
def main():
|
|
"""Main optimization workflow"""
|
|
print("Persistence Thread Parameter Optimization")
|
|
print("Using Bayesian Optimization for intelligent parameter search")
|
|
print()
|
|
|
|
# Create optimizer with different objective functions to test
|
|
objectives_to_test = ["p95_latency", "weighted_latency"]
|
|
|
|
for objective in objectives_to_test:
|
|
print(f"\n{'='*80}")
|
|
print(f"OPTIMIZING FOR: {objective.upper()}")
|
|
print(f"{'='*80}")
|
|
|
|
optimizer = PersistenceOptimizer(
|
|
optimization_budget=30, # Reasonable for demo
|
|
simulation_duration=15.0, # Shorter sims for faster optimization
|
|
arrival_rate=1000.0,
|
|
objective_metric=objective,
|
|
random_seed=42
|
|
)
|
|
|
|
# Run optimization
|
|
summary = optimizer.run_optimization()
|
|
optimizer.print_optimization_summary(summary)
|
|
|
|
# Generate plots
|
|
try:
|
|
optimizer.plot_optimization_progress(f'optimization_{objective}.png')
|
|
except Exception as e:
|
|
print(f"Could not generate plots: {e}")
|
|
|
|
print(f"\nOptimization for {objective} completed!")
|
|
print("="*80)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |