#!/usr/bin/env python3 """ Persistence Thread Parameter Optimization Uses Bayesian Optimization to automatically find the optimal configuration parameters that minimize commit latency. This is much more efficient than grid search since it uses a probabilistic model to guide parameter exploration. Key advantages: - Efficiently explores high-dimensional parameter spaces - Uses previous simulation results to guide future parameter choices - Handles expensive objective function evaluations (our simulation) - Provides uncertainty estimates for parameter importance """ import numpy as np from typing import Dict, List, Tuple, Optional import time from persistence_simulation import PersistenceSimulation, print_results # Try to import scikit-optimize for Bayesian optimization try: from skopt import gp_minimize, forest_minimize from skopt.space import Real, Integer from skopt.utils import use_named_args from skopt.plots import plot_convergence, plot_objective import matplotlib.pyplot as plt OPTIMIZE_AVAILABLE = True except ImportError: print("scikit-optimize not available. Install with: pip install scikit-optimize") print("Falling back to grid search...") OPTIMIZE_AVAILABLE = False class PersistenceOptimizer: """ Automated parameter optimization for the persistence thread using Bayesian optimization. This class finds the optimal configuration parameters to minimize commit latency by intelligently exploring the parameter space using Gaussian Process models. """ def __init__(self, optimization_budget: int = 50, simulation_duration: float = 20.0, arrival_rate: float = 1000.0, objective_metric: str = "p95_latency", random_seed: int = 42): self.optimization_budget = optimization_budget self.simulation_duration = simulation_duration self.arrival_rate = arrival_rate self.objective_metric = objective_metric self.random_seed = random_seed # Track optimization history self.optimization_history = [] self.best_params = None self.best_score = float('inf') # Define parameter search space self.parameter_space = self._define_search_space() self.parameter_names = [dim.name for dim in self.parameter_space] def _define_search_space(self) -> List: """ Define the parameter search space for optimization. Focus on the 3 core parameters that matter for persistence thread performance with 100% reliable S3. Retry parameters removed since S3 never fails. """ return [ # Core batching parameters Real(1.0, 50.0, name='batch_timeout_ms', prior='log-uniform'), # Log scale since small changes matter Integer(64 * 1024, 4 * 1024 * 1024, name='batch_size_threshold', # 64KB - 4MB prior='log-uniform'), # Flow control parameters - likely the most impactful Integer(1, 50, name='max_in_flight_requests'), ] def _run_simulation_with_params(self, params: Dict[str, float]) -> Dict: """Run simulation with given parameters and return results""" try: sim = PersistenceSimulation( batch_timeout_ms=params['batch_timeout_ms'], batch_size_threshold=int(params['batch_size_threshold']), max_in_flight_requests=int(params['max_in_flight_requests']), # Retry parameters fixed since S3 is 100% reliable max_retry_attempts=0, # No retries needed retry_base_delay_ms=100.0, # Irrelevant but needs a value # S3 parameters kept fixed - 100% reliable for optimization focus s3_latency_shape=2.0, # Fixed Gamma shape s3_latency_scale=15.0, # Fixed Gamma scale (30ms RTT + ~30ms variable = ~60ms mean) s3_failure_rate=0.0, # 100% reliable S3 arrival_rate_per_sec=self.arrival_rate, simulation_duration_sec=self.simulation_duration ) return sim.run_simulation() except Exception as e: print(f"Simulation failed with params {params}: {e}") # Return a high penalty score for failed simulations return { 'commit_metrics': { 'latency_ms': { 'mean': 10000, 'p95': 10000, 'p99': 10000 } }, 'error': str(e) } def _extract_objective_value(self, results: Dict) -> float: """Extract the objective value to minimize from simulation results""" try: commit_metrics = results['commit_metrics']['latency_ms'] if self.objective_metric == "mean_latency": return commit_metrics['mean'] elif self.objective_metric == "p95_latency": return commit_metrics['p95'] elif self.objective_metric == "p99_latency": return commit_metrics['p99'] elif self.objective_metric == "weighted_latency": # Weighted combination emphasizing tail latencies return (0.3 * commit_metrics['mean'] + 0.5 * commit_metrics['p95'] + 0.2 * commit_metrics['p99']) else: return commit_metrics['p95'] # Default to P95 except KeyError as e: print(f"Failed to extract objective from results: {e}") return 10000 # High penalty for invalid results def optimize_with_bayesian(self) -> Tuple[Dict, float]: """ Use Bayesian Optimization to find optimal parameters. This uses Gaussian Process models to build a probabilistic model of the objective function and intelligently choose where to sample next. """ if not OPTIMIZE_AVAILABLE: return self.optimize_with_grid_search() print(f"Starting Bayesian Optimization with {self.optimization_budget} evaluations") print(f"Objective: Minimize {self.objective_metric}") print(f"Parameter space: {len(self.parameter_space)} dimensions") print() @use_named_args(self.parameter_space) def objective(**params): """Objective function for Bayesian optimization""" print(f"Evaluating: {params}") start_time = time.time() results = self._run_simulation_with_params(params) eval_time = time.time() - start_time objective_value = self._extract_objective_value(results) # Track optimization history history_entry = { 'params': params.copy(), 'objective_value': objective_value, 'results': results, 'eval_time': eval_time, 'iteration': len(self.optimization_history) + 1 } self.optimization_history.append(history_entry) # Update best if improved if objective_value < self.best_score: self.best_score = objective_value self.best_params = params.copy() print(f"✓ NEW BEST: {objective_value:.2f}ms (evaluation {history_entry['iteration']})") else: print(f" Score: {objective_value:.2f}ms") print(f" Time: {eval_time:.1f}s") print() return objective_value # Run Bayesian optimization result = gp_minimize( func=objective, dimensions=self.parameter_space, n_calls=self.optimization_budget, n_initial_points=10, # Random exploration first acq_func='EI', # Expected Improvement acquisition random_state=self.random_seed ) # Extract best parameters best_params_list = result.x best_params_dict = dict(zip(self.parameter_names, best_params_list)) best_objective = result.fun return best_params_dict, best_objective def optimize_with_grid_search(self) -> Tuple[Dict, float]: """Fallback grid search optimization if scikit-optimize not available""" print("Using grid search optimization (install scikit-optimize for better results)") print() # Define a smaller grid for key parameters grid_configs = [ # Vary max_in_flight and batch_timeout {'max_in_flight_requests': 5, 'batch_timeout_ms': 5.0}, {'max_in_flight_requests': 10, 'batch_timeout_ms': 5.0}, {'max_in_flight_requests': 20, 'batch_timeout_ms': 5.0}, {'max_in_flight_requests': 10, 'batch_timeout_ms': 2.0}, {'max_in_flight_requests': 10, 'batch_timeout_ms': 10.0}, {'max_in_flight_requests': 15, 'batch_timeout_ms': 3.0}, {'max_in_flight_requests': 25, 'batch_timeout_ms': 7.0}, ] best_params = None best_score = float('inf') for i, config in enumerate(grid_configs): print(f"Evaluating config {i+1}/{len(grid_configs)}: {config}") # Use default values for unspecified parameters full_params = { 'batch_timeout_ms': 5.0, 'batch_size_threshold': 1024 * 1024, 'max_in_flight_requests': 5 } full_params.update(config) results = self._run_simulation_with_params(full_params) objective_value = self._extract_objective_value(results) if objective_value < best_score: best_score = objective_value best_params = full_params.copy() print(f"✓ NEW BEST: {objective_value:.2f}ms") else: print(f" Score: {objective_value:.2f}ms") print() return best_params, best_score def analyze_parameter_importance(self): """Analyze which parameters have the most impact on performance""" if not self.optimization_history: print("No optimization history available") return print("Parameter Importance Analysis") print("=" * 50) # Extract parameter values and objectives param_data = {} objectives = [] for entry in self.optimization_history: objectives.append(entry['objective_value']) for param_name, param_value in entry['params'].items(): if param_name not in param_data: param_data[param_name] = [] param_data[param_name].append(param_value) objectives = np.array(objectives) # Simple correlation analysis print("Parameter correlations with objective (lower is better):") correlations = [] for param_name, values in param_data.items(): correlation = np.corrcoef(values, objectives)[0, 1] correlations.append((param_name, correlation)) print(f" {param_name:<25}: {correlation:+.3f}") print("\nMost impactful parameters (by absolute correlation):") correlations.sort(key=lambda x: abs(x[1]), reverse=True) for param_name, correlation in correlations[:5]: impact = "reduces latency" if correlation < 0 else "increases latency" print(f" {param_name:<25}: {impact} (r={correlation:+.3f})") def plot_optimization_progress(self, save_path: Optional[str] = None): """Plot optimization convergence""" if not OPTIMIZE_AVAILABLE or not self.optimization_history: return iterations = [entry['iteration'] for entry in self.optimization_history] objectives = [entry['objective_value'] for entry in self.optimization_history] # Calculate running minimum (best so far) running_min = [] current_min = float('inf') for obj in objectives: current_min = min(current_min, obj) running_min.append(current_min) plt.figure(figsize=(12, 8)) # Plot 1: Objective value over iterations plt.subplot(2, 2, 1) plt.scatter(iterations, objectives, alpha=0.6, s=30) plt.plot(iterations, running_min, 'r-', linewidth=2, label='Best so far') plt.xlabel('Iteration') plt.ylabel(f'{self.objective_metric} (ms)') plt.title('Optimization Progress') plt.legend() plt.grid(True, alpha=0.3) # Plot 2: Parameter evolution for key parameters plt.subplot(2, 2, 2) key_params = ['max_in_flight_requests', 'batch_timeout_ms'] for param in key_params: if param in self.optimization_history[0]['params']: values = [entry['params'][param] for entry in self.optimization_history] plt.scatter(iterations, values, alpha=0.6, label=param, s=30) plt.xlabel('Iteration') plt.ylabel('Parameter Value') plt.title('Key Parameter Evolution') plt.legend() plt.grid(True, alpha=0.3) # Plot 3: Objective distribution plt.subplot(2, 2, 3) plt.hist(objectives, bins=20, alpha=0.7, edgecolor='black') plt.axvline(self.best_score, color='red', linestyle='--', label=f'Best: {self.best_score:.1f}ms') plt.xlabel(f'{self.objective_metric} (ms)') plt.ylabel('Count') plt.title('Objective Value Distribution') plt.legend() plt.grid(True, alpha=0.3) # Plot 4: Convergence rate plt.subplot(2, 2, 4) improvements = [] for i, entry in enumerate(self.optimization_history): if i == 0: improvements.append(0) else: prev_best = running_min[i-1] curr_best = running_min[i] improvement = prev_best - curr_best improvements.append(improvement) plt.plot(iterations, improvements, 'g-', marker='o', markersize=3) plt.xlabel('Iteration') plt.ylabel('Improvement (ms)') plt.title('Per-Iteration Improvement') plt.grid(True, alpha=0.3) plt.tight_layout() if save_path: plt.savefig(save_path, dpi=300, bbox_inches='tight') print(f"Optimization plots saved to {save_path}") else: plt.show() def run_optimization(self) -> Dict: """Run the full optimization process and return results""" start_time = time.time() # Run optimization if OPTIMIZE_AVAILABLE: best_params, best_score = self.optimize_with_bayesian() else: best_params, best_score = self.optimize_with_grid_search() total_time = time.time() - start_time # Run final simulation with best parameters for detailed results print("Running final simulation with optimal parameters...") final_results = self._run_simulation_with_params(best_params) # Prepare optimization summary optimization_summary = { 'best_parameters': best_params, 'best_objective_value': best_score, 'optimization_time': total_time, 'evaluations_performed': len(self.optimization_history), 'final_simulation_results': final_results, 'optimization_history': self.optimization_history } return optimization_summary def print_optimization_summary(self, summary: Dict): """Print a comprehensive summary of optimization results""" print("=" * 80) print("BAYESIAN OPTIMIZATION RESULTS") print("=" * 80) print(f"Optimization completed in {summary['optimization_time']:.1f} seconds") print(f"Performed {summary['evaluations_performed']} parameter evaluations") print(f"Best {self.objective_metric}: {summary['best_objective_value']:.2f}ms") print() print("OPTIMAL PARAMETERS:") print("-" * 40) for param, value in summary['best_parameters'].items(): if isinstance(value, float): if param.endswith('_rate'): print(f" {param:<25}: {value:.4f}") else: print(f" {param:<25}: {value:.2f}") else: print(f" {param:<25}: {value}") print("\nDETAILED PERFORMANCE WITH OPTIMAL PARAMETERS:") print("-" * 50) final_results = summary['final_simulation_results'] print_results(final_results) print("\nPARAMETER IMPACT ANALYSIS:") print("-" * 30) self.analyze_parameter_importance() def main(): """Main optimization workflow""" print("Persistence Thread Parameter Optimization") print("Using Bayesian Optimization for intelligent parameter search") print() # Create optimizer with different objective functions to test objectives_to_test = ["p95_latency", "weighted_latency"] for objective in objectives_to_test: print(f"\n{'='*80}") print(f"OPTIMIZING FOR: {objective.upper()}") print(f"{'='*80}") optimizer = PersistenceOptimizer( optimization_budget=30, # Reasonable for demo simulation_duration=15.0, # Shorter sims for faster optimization arrival_rate=1000.0, objective_metric=objective, random_seed=42 ) # Run optimization summary = optimizer.run_optimization() optimizer.print_optimization_summary(summary) # Generate plots try: optimizer.plot_optimization_progress(f'optimization_{objective}.png') except Exception as e: print(f"Could not generate plots: {e}") print(f"\nOptimization for {objective} completed!") print("="*80) if __name__ == "__main__": main()