1045 lines
36 KiB
Python
1045 lines
36 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Database Persistence Pattern Comparison
|
|
|
|
Compares WeaselDB's batched S3 persistence approach against traditional
|
|
database persistence patterns to understand trade-offs in latency,
|
|
throughput, consistency, and operational complexity.
|
|
|
|
Simulated approaches:
|
|
1. WeaselDB: Batched async S3 persistence with optimistic concurrency
|
|
2. Traditional WAL: Write-ahead log with periodic sync to disk
|
|
3. Synchronous: Immediate disk sync per transaction (like PostgreSQL serializable)
|
|
4. Group Commit: Batched disk writes with configurable group size
|
|
5. Async Replication: Immediate response with async durability
|
|
"""
|
|
|
|
import heapq
|
|
import random
|
|
import statistics
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Dict, Tuple, Any
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
from collections import defaultdict, deque
|
|
from abc import ABC, abstractmethod
|
|
from enum import Enum
|
|
import time
|
|
|
|
|
|
class PersistencePattern(Enum):
|
|
WEASELDB_S3 = "WeaselDB S3 Batched"
|
|
TRADITIONAL_WAL = "Traditional WAL"
|
|
SYNCHRONOUS = "Synchronous Disk"
|
|
GROUP_COMMIT = "Group Commit"
|
|
ASYNC_REPLICATION = "Async Replication"
|
|
|
|
|
|
@dataclass
|
|
class Transaction:
|
|
"""Represents a database transaction/commit"""
|
|
|
|
txn_id: int
|
|
arrival_time: float
|
|
size_bytes: int = 1024
|
|
requires_durability: bool = True # Some txns can be async
|
|
|
|
|
|
@dataclass
|
|
class PersistenceMetrics:
|
|
"""Metrics for a persistence approach"""
|
|
|
|
pattern: PersistencePattern
|
|
total_transactions: int
|
|
completed_transactions: int
|
|
|
|
# Latency metrics (milliseconds)
|
|
min_latency: float
|
|
mean_latency: float
|
|
median_latency: float
|
|
p95_latency: float
|
|
p99_latency: float
|
|
max_latency: float
|
|
|
|
# Throughput metrics
|
|
avg_throughput_tps: float
|
|
peak_throughput_tps: float
|
|
|
|
# Resource metrics
|
|
avg_disk_iops: float
|
|
avg_network_mbps: float
|
|
storage_efficiency: float # GB stored / GB logical data
|
|
|
|
# Consistency metrics
|
|
durability_guarantee: str
|
|
consistency_model: str
|
|
recovery_time_estimate: float
|
|
|
|
# Operational metrics
|
|
operational_complexity: int # 1-10 scale
|
|
infrastructure_cost: float # Relative cost per transaction
|
|
|
|
|
|
class PersistenceSimulator(ABC):
|
|
"""Abstract base class for persistence pattern simulators"""
|
|
|
|
def __init__(
|
|
self, simulation_duration: float = 60.0, arrival_rate_per_sec: float = 1000.0
|
|
):
|
|
self.simulation_duration = simulation_duration
|
|
self.arrival_rate_per_sec = arrival_rate_per_sec
|
|
|
|
# Simulation state
|
|
self.current_time = 0.0
|
|
self.event_queue = []
|
|
self.completed_transactions = []
|
|
self.next_txn_id = 0
|
|
|
|
# Metrics tracking
|
|
self.disk_iops = []
|
|
self.network_usage = []
|
|
self.throughput_samples = []
|
|
|
|
# Random number generator
|
|
self.rng = np.random.RandomState(42)
|
|
|
|
def generate_transaction(self, arrival_time: float) -> Transaction:
|
|
"""Generate a transaction with realistic characteristics"""
|
|
# Size distribution: mostly small, some large
|
|
if self.rng.random() < 0.8: # 80% small transactions
|
|
size = self.rng.randint(100, 2048) # 100B - 2KB
|
|
elif self.rng.random() < 0.95: # 15% medium
|
|
size = self.rng.randint(2048, 20480) # 2KB - 20KB
|
|
else: # 5% large transactions
|
|
size = self.rng.randint(20480, 102400) # 20KB - 100KB
|
|
|
|
return Transaction(
|
|
txn_id=self.next_txn_id,
|
|
arrival_time=arrival_time,
|
|
size_bytes=size,
|
|
requires_durability=True,
|
|
)
|
|
|
|
@abstractmethod
|
|
def process_transaction(self, txn: Transaction) -> None:
|
|
"""Process a transaction according to the persistence pattern"""
|
|
pass
|
|
|
|
@abstractmethod
|
|
def get_pattern_name(self) -> PersistencePattern:
|
|
"""Return the persistence pattern this simulator implements"""
|
|
pass
|
|
|
|
def run_simulation(self) -> PersistenceMetrics:
|
|
"""Run the simulation and return metrics"""
|
|
# Generate arrival events
|
|
self._generate_arrivals()
|
|
|
|
# Process events
|
|
while self.event_queue and self.current_time < self.simulation_duration:
|
|
time, event_type, data = heapq.heappop(self.event_queue)
|
|
self.current_time = time
|
|
|
|
if event_type == "transaction_arrival":
|
|
self.process_transaction(data)
|
|
elif event_type == "custom":
|
|
self._handle_custom_event(data)
|
|
|
|
return self._calculate_metrics()
|
|
|
|
def _generate_arrivals(self):
|
|
"""Generate Poisson arrival events"""
|
|
time = 0.0
|
|
while time < self.simulation_duration:
|
|
inter_arrival = self.rng.exponential(1.0 / self.arrival_rate_per_sec)
|
|
time += inter_arrival
|
|
|
|
if time >= self.simulation_duration:
|
|
break
|
|
|
|
txn = self.generate_transaction(time)
|
|
self.next_txn_id += 1
|
|
|
|
heapq.heappush(self.event_queue, (time, "transaction_arrival", txn))
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle custom events - override in subclasses"""
|
|
pass
|
|
|
|
def schedule_event(self, time: float, event_type: str, data: Any):
|
|
"""Schedule a custom event"""
|
|
heapq.heappush(self.event_queue, (time, event_type, data))
|
|
|
|
def _calculate_metrics(self) -> PersistenceMetrics:
|
|
"""Calculate performance metrics from completed transactions"""
|
|
if not self.completed_transactions:
|
|
raise ValueError("No transactions completed")
|
|
|
|
latencies = [txn["latency_ms"] for txn in self.completed_transactions]
|
|
|
|
return PersistenceMetrics(
|
|
pattern=self.get_pattern_name(),
|
|
total_transactions=self.next_txn_id,
|
|
completed_transactions=len(self.completed_transactions),
|
|
# Latency metrics
|
|
min_latency=min(latencies),
|
|
mean_latency=statistics.mean(latencies),
|
|
median_latency=statistics.median(latencies),
|
|
p95_latency=np.percentile(latencies, 95),
|
|
p99_latency=np.percentile(latencies, 99),
|
|
max_latency=max(latencies),
|
|
# Throughput metrics
|
|
avg_throughput_tps=len(self.completed_transactions)
|
|
/ self.simulation_duration,
|
|
peak_throughput_tps=self._calculate_peak_throughput(),
|
|
# Resource metrics - implemented by subclasses
|
|
avg_disk_iops=statistics.mean(self.disk_iops) if self.disk_iops else 0,
|
|
avg_network_mbps=(
|
|
statistics.mean(self.network_usage) if self.network_usage else 0
|
|
),
|
|
storage_efficiency=self._calculate_storage_efficiency(),
|
|
# Pattern-specific characteristics
|
|
durability_guarantee=self._get_durability_guarantee(),
|
|
consistency_model=self._get_consistency_model(),
|
|
recovery_time_estimate=self._get_recovery_time(),
|
|
operational_complexity=self._get_operational_complexity(),
|
|
infrastructure_cost=self._get_infrastructure_cost(),
|
|
)
|
|
|
|
def _calculate_peak_throughput(self) -> float:
|
|
"""Calculate peak throughput in 1-second windows"""
|
|
if not self.completed_transactions:
|
|
return 0.0
|
|
|
|
# Group transactions by second
|
|
throughput_by_second = defaultdict(int)
|
|
for txn in self.completed_transactions:
|
|
second = int(txn["completion_time"])
|
|
throughput_by_second[second] += 1
|
|
|
|
return max(throughput_by_second.values()) if throughput_by_second else 0
|
|
|
|
# Abstract methods for pattern-specific characteristics
|
|
@abstractmethod
|
|
def _calculate_storage_efficiency(self) -> float:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_durability_guarantee(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_consistency_model(self) -> str:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_recovery_time(self) -> float:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_operational_complexity(self) -> int:
|
|
pass
|
|
|
|
@abstractmethod
|
|
def _get_infrastructure_cost(self) -> float:
|
|
pass
|
|
|
|
|
|
class WeaselDBSimulator(PersistenceSimulator):
|
|
"""WeaselDB's batched S3 persistence simulation"""
|
|
|
|
def __init__(
|
|
self,
|
|
batch_timeout_ms: float = 1.0,
|
|
batch_size_threshold: int = 800000,
|
|
max_in_flight: int = 50,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
|
|
self.batch_timeout_ms = batch_timeout_ms
|
|
self.batch_size_threshold = batch_size_threshold
|
|
self.max_in_flight = max_in_flight
|
|
|
|
# State
|
|
self.current_batch = []
|
|
self.batch_start_time = None
|
|
self.in_flight_batches = {}
|
|
self.next_batch_id = 0
|
|
|
|
# S3 characteristics
|
|
self.s3_base_latency_ms = 60.0 # From our simulation
|
|
self.s3_size_penalty_per_mb = 20.0
|
|
|
|
def get_pattern_name(self) -> PersistencePattern:
|
|
return PersistencePattern.WEASELDB_S3
|
|
|
|
def process_transaction(self, txn: Transaction):
|
|
"""Process transaction using WeaselDB batching logic"""
|
|
# Add to current batch
|
|
if not self.current_batch:
|
|
self.batch_start_time = self.current_time
|
|
|
|
self.current_batch.append(txn)
|
|
|
|
# Check if we should send batch
|
|
if (
|
|
self._should_send_batch()
|
|
and len(self.in_flight_batches) < self.max_in_flight
|
|
):
|
|
self._send_current_batch()
|
|
|
|
def _should_send_batch(self) -> bool:
|
|
"""Check batch triggers"""
|
|
if not self.current_batch:
|
|
return False
|
|
|
|
# Size trigger
|
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
|
if batch_size >= self.batch_size_threshold:
|
|
return True
|
|
|
|
# Time trigger
|
|
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
|
|
self.batch_timeout_ms / 1000.0
|
|
):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _send_current_batch(self):
|
|
"""Send current batch to S3"""
|
|
if not self.current_batch:
|
|
return
|
|
|
|
batch_id = self.next_batch_id
|
|
self.next_batch_id += 1
|
|
|
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
|
|
|
# Sample S3 latency
|
|
s3_latency = self._sample_s3_latency(batch_size) / 1000.0
|
|
completion_time = self.current_time + s3_latency
|
|
|
|
# Track batch
|
|
self.in_flight_batches[batch_id] = {
|
|
"transactions": self.current_batch.copy(),
|
|
"sent_time": self.current_time,
|
|
"completion_time": completion_time,
|
|
"size_bytes": batch_size,
|
|
}
|
|
|
|
# Schedule completion
|
|
self.schedule_event(
|
|
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
|
|
)
|
|
|
|
# Track network usage
|
|
self.network_usage.append(batch_size / (1024 * 1024)) # MB
|
|
|
|
# Clear batch
|
|
self.current_batch.clear()
|
|
self.batch_start_time = None
|
|
|
|
def _sample_s3_latency(self, batch_size_bytes: int) -> float:
|
|
"""Sample S3 latency with size scaling"""
|
|
base = self.s3_base_latency_ms
|
|
size_penalty = (batch_size_bytes / (1024 * 1024)) * self.s3_size_penalty_per_mb
|
|
variable = self.rng.gamma(2.0, 15.0) # Variable component
|
|
return 30.0 + variable + size_penalty # 30ms RTT + variable + size
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle batch completion events"""
|
|
if data["type"] == "batch_complete":
|
|
batch_id = data["batch_id"]
|
|
if batch_id in self.in_flight_batches:
|
|
batch = self.in_flight_batches.pop(batch_id)
|
|
|
|
# Mark transactions complete
|
|
for txn in batch["transactions"]:
|
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
|
self.completed_transactions.append(
|
|
{
|
|
"txn_id": txn.txn_id,
|
|
"arrival_time": txn.arrival_time,
|
|
"completion_time": self.current_time,
|
|
"latency_ms": latency_ms,
|
|
"size_bytes": txn.size_bytes,
|
|
}
|
|
)
|
|
|
|
def _calculate_storage_efficiency(self) -> float:
|
|
return 1.0 # S3 has no storage overhead
|
|
|
|
def _get_durability_guarantee(self) -> str:
|
|
return "Eventually durable (S3 11 9's)"
|
|
|
|
def _get_consistency_model(self) -> str:
|
|
return "Optimistic concurrency, eventual consistency"
|
|
|
|
def _get_recovery_time(self) -> float:
|
|
return 0.1 # Fast recovery, just reconnect to S3
|
|
|
|
def _get_operational_complexity(self) -> int:
|
|
return 3 # Moderate - S3 managed, but need batching logic
|
|
|
|
def _get_infrastructure_cost(self) -> float:
|
|
return 1.0 # Baseline cost
|
|
|
|
|
|
class TraditionalWALSimulator(PersistenceSimulator):
|
|
"""Traditional Write-Ahead Log with periodic sync"""
|
|
|
|
def __init__(
|
|
self,
|
|
wal_sync_interval_ms: float = 10.0,
|
|
checkpoint_interval_sec: float = 30.0,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
|
|
self.wal_sync_interval_ms = wal_sync_interval_ms
|
|
self.checkpoint_interval_sec = checkpoint_interval_sec
|
|
|
|
# WAL state
|
|
self.wal_buffer = []
|
|
self.last_sync_time = 0.0
|
|
self.pending_transactions = {} # Waiting for sync
|
|
|
|
# EBS characteristics
|
|
self.disk_latency_ms = 1.0 # EBS base latency
|
|
self.disk_iops_limit = 10000 # Typical SSD
|
|
|
|
# Schedule periodic syncs
|
|
self._schedule_periodic_syncs()
|
|
|
|
def get_pattern_name(self) -> PersistencePattern:
|
|
return PersistencePattern.TRADITIONAL_WAL
|
|
|
|
def process_transaction(self, txn: Transaction):
|
|
"""Write to WAL buffer, wait for sync"""
|
|
# Write to WAL buffer (immediate)
|
|
self.wal_buffer.append(txn)
|
|
self.pending_transactions[txn.txn_id] = txn
|
|
|
|
# Track disk IOPS (write to WAL)
|
|
self.disk_iops.append(1.0)
|
|
|
|
def _schedule_periodic_syncs(self):
|
|
"""Schedule periodic WAL sync events"""
|
|
sync_time = self.wal_sync_interval_ms / 1000.0
|
|
while sync_time < self.simulation_duration:
|
|
self.schedule_event(sync_time, "custom", {"type": "wal_sync"})
|
|
sync_time += self.wal_sync_interval_ms / 1000.0
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle WAL sync events"""
|
|
if data["type"] == "wal_sync":
|
|
self._perform_wal_sync()
|
|
|
|
def _perform_wal_sync(self):
|
|
"""Sync WAL buffer to disk"""
|
|
if not self.wal_buffer:
|
|
return
|
|
|
|
# Calculate sync latency based on buffer size
|
|
buffer_size = sum(txn.size_bytes for txn in self.wal_buffer)
|
|
sync_latency = self._calculate_disk_sync_latency(buffer_size)
|
|
|
|
completion_time = self.current_time + sync_latency / 1000.0
|
|
|
|
# Schedule sync completion
|
|
syncing_txns = list(self.wal_buffer)
|
|
self.schedule_event(
|
|
completion_time,
|
|
"custom",
|
|
{"type": "sync_complete", "transactions": syncing_txns},
|
|
)
|
|
|
|
# Track IOPS for sync operation
|
|
self.disk_iops.append(len(self.wal_buffer))
|
|
|
|
# Clear buffer
|
|
self.wal_buffer.clear()
|
|
self.last_sync_time = self.current_time
|
|
|
|
def _calculate_disk_sync_latency(self, size_bytes: int) -> float:
|
|
"""Calculate disk sync latency with realistic fsync modeling including directory sync"""
|
|
# Data write to page cache
|
|
write_latency = 0.1 # Fast write to page cache
|
|
|
|
# EBS sequential write throughput
|
|
throughput_mbps = 1000.0 # EBS gp3 throughput
|
|
size_mb = size_bytes / (1024 * 1024)
|
|
transfer_latency = size_mb * (1000.0 / throughput_mbps)
|
|
|
|
# WAL file fsync latency on EBS
|
|
# fdatasync on EBS with network replication
|
|
file_fsync_base = 0.8 # Higher base latency on EBS
|
|
file_fsync_variable = self.rng.gamma(2.2, 0.4) # More variable due to network
|
|
|
|
# Size penalty for large WAL syncs
|
|
size_penalty = min(size_mb * 0.05, 1.0) # Smaller penalty than batched writes
|
|
|
|
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
|
|
|
|
# WAL typically appends to existing files, so no directory fsync needed
|
|
# Directory fsync only required when creating new WAL segments
|
|
return write_latency + transfer_latency + file_fsync_latency
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle sync completion"""
|
|
if data["type"] == "wal_sync":
|
|
self._perform_wal_sync()
|
|
elif data["type"] == "sync_complete":
|
|
# Mark transactions as durable
|
|
for txn in data["transactions"]:
|
|
if txn.txn_id in self.pending_transactions:
|
|
del self.pending_transactions[txn.txn_id]
|
|
|
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
|
self.completed_transactions.append(
|
|
{
|
|
"txn_id": txn.txn_id,
|
|
"arrival_time": txn.arrival_time,
|
|
"completion_time": self.current_time,
|
|
"latency_ms": latency_ms,
|
|
"size_bytes": txn.size_bytes,
|
|
}
|
|
)
|
|
|
|
def _calculate_storage_efficiency(self) -> float:
|
|
return 2.0 # WAL + main storage
|
|
|
|
def _get_durability_guarantee(self) -> str:
|
|
return "ACID durable after WAL sync"
|
|
|
|
def _get_consistency_model(self) -> str:
|
|
return "Strict ACID consistency"
|
|
|
|
def _get_recovery_time(self) -> float:
|
|
return 30.0 # WAL replay time
|
|
|
|
def _get_operational_complexity(self) -> int:
|
|
return 7 # Complex - WAL management, checkpoints, recovery
|
|
|
|
def _get_infrastructure_cost(self) -> float:
|
|
return 1.5 # Higher due to disk I/O overhead
|
|
|
|
|
|
class SynchronousSimulator(PersistenceSimulator):
|
|
"""Synchronous disk persistence (like PostgreSQL with synchronous_commit=on)"""
|
|
|
|
def __init__(self, **kwargs):
|
|
super().__init__(**kwargs)
|
|
self.disk_latency_ms = 1.0 # EBS base latency
|
|
|
|
def get_pattern_name(self) -> PersistencePattern:
|
|
return PersistencePattern.SYNCHRONOUS
|
|
|
|
def process_transaction(self, txn: Transaction):
|
|
"""Immediately sync transaction to disk"""
|
|
# Calculate disk write latency
|
|
disk_latency = self._calculate_disk_latency(txn.size_bytes) / 1000.0
|
|
completion_time = self.current_time + disk_latency
|
|
|
|
# Schedule completion
|
|
self.schedule_event(
|
|
completion_time,
|
|
"custom",
|
|
{"type": "disk_write_complete", "transaction": txn},
|
|
)
|
|
|
|
# Track disk IOPS
|
|
self.disk_iops.append(1.0)
|
|
|
|
def _calculate_disk_latency(self, size_bytes: int) -> float:
|
|
"""Calculate per-transaction disk write latency with fsync"""
|
|
# Write to page cache (fast)
|
|
write_latency = 0.1
|
|
|
|
# FSYNC latency for immediate durability on EBS
|
|
# Each transaction requires its own fsync - expensive on network storage!
|
|
fsync_base = 1.5 # Much higher base latency for individual fsyncs on EBS
|
|
fsync_variable = self.rng.gamma(2.3, 0.5) # More variable due to network
|
|
|
|
# Small write penalty - less efficient than batched writes
|
|
if size_bytes < 4096:
|
|
penalty = 0.5 # Individual small writes are less efficient
|
|
else:
|
|
penalty = 0.0
|
|
|
|
fsync_latency = fsync_base + fsync_variable + penalty
|
|
|
|
# Synchronous commits to existing database files don't need directory fsync
|
|
# Directory fsync only needed when creating new database files/tablespaces
|
|
return write_latency + fsync_latency
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle disk write completion"""
|
|
if data["type"] == "disk_write_complete":
|
|
txn = data["transaction"]
|
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
|
|
|
self.completed_transactions.append(
|
|
{
|
|
"txn_id": txn.txn_id,
|
|
"arrival_time": txn.arrival_time,
|
|
"completion_time": self.current_time,
|
|
"latency_ms": latency_ms,
|
|
"size_bytes": txn.size_bytes,
|
|
}
|
|
)
|
|
|
|
def _calculate_storage_efficiency(self) -> float:
|
|
return 1.5 # Some overhead for metadata
|
|
|
|
def _get_durability_guarantee(self) -> str:
|
|
return "Immediate ACID durability"
|
|
|
|
def _get_consistency_model(self) -> str:
|
|
return "Strict ACID with immediate consistency"
|
|
|
|
def _get_recovery_time(self) -> float:
|
|
return 5.0 # Fast recovery, data already on disk
|
|
|
|
def _get_operational_complexity(self) -> int:
|
|
return 5 # Moderate - standard database operations
|
|
|
|
def _get_infrastructure_cost(self) -> float:
|
|
return 2.0 # High due to disk I/O per transaction
|
|
|
|
|
|
class WeaselDBDiskSimulator(PersistenceSimulator):
|
|
"""WeaselDB's batched disk persistence simulation"""
|
|
|
|
def __init__(
|
|
self,
|
|
batch_timeout_ms: float = 1.0,
|
|
batch_size_threshold: int = 800000,
|
|
max_in_flight: int = 50,
|
|
**kwargs,
|
|
):
|
|
super().__init__(**kwargs)
|
|
|
|
self.batch_timeout_ms = batch_timeout_ms
|
|
self.batch_size_threshold = batch_size_threshold
|
|
self.max_in_flight = max_in_flight
|
|
|
|
# State
|
|
self.current_batch = []
|
|
self.batch_start_time = None
|
|
self.in_flight_batches = {}
|
|
self.next_batch_id = 0
|
|
|
|
# EBS characteristics (gp3 or io2 volumes)
|
|
self.disk_base_latency_ms = 0.5 # EBS has higher base latency than local NVMe
|
|
self.disk_throughput_mbps = 1000.0 # EBS gp3 max throughput
|
|
|
|
def get_pattern_name(self) -> PersistencePattern:
|
|
return PersistencePattern.WEASELDB_S3 # Reuse enum, but it's actually disk
|
|
|
|
def process_transaction(self, txn: Transaction):
|
|
"""Process transaction using WeaselDB batching logic"""
|
|
# Add to current batch
|
|
if not self.current_batch:
|
|
self.batch_start_time = self.current_time
|
|
|
|
self.current_batch.append(txn)
|
|
|
|
# Check if we should send batch
|
|
if (
|
|
self._should_send_batch()
|
|
and len(self.in_flight_batches) < self.max_in_flight
|
|
):
|
|
self._send_current_batch()
|
|
|
|
def _should_send_batch(self) -> bool:
|
|
"""Check batch triggers"""
|
|
if not self.current_batch:
|
|
return False
|
|
|
|
# Size trigger
|
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
|
if batch_size >= self.batch_size_threshold:
|
|
return True
|
|
|
|
# Time trigger
|
|
if self.batch_start_time and (self.current_time - self.batch_start_time) >= (
|
|
self.batch_timeout_ms / 1000.0
|
|
):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _send_current_batch(self):
|
|
"""Send current batch to disk"""
|
|
if not self.current_batch:
|
|
return
|
|
|
|
batch_id = self.next_batch_id
|
|
self.next_batch_id += 1
|
|
|
|
batch_size = sum(txn.size_bytes for txn in self.current_batch)
|
|
|
|
# Sample disk write latency
|
|
disk_latency = self._sample_disk_latency(batch_size) / 1000.0
|
|
completion_time = self.current_time + disk_latency
|
|
|
|
# Track batch
|
|
self.in_flight_batches[batch_id] = {
|
|
"transactions": self.current_batch.copy(),
|
|
"sent_time": self.current_time,
|
|
"completion_time": completion_time,
|
|
"size_bytes": batch_size,
|
|
}
|
|
|
|
# Schedule completion
|
|
self.schedule_event(
|
|
completion_time, "custom", {"type": "batch_complete", "batch_id": batch_id}
|
|
)
|
|
|
|
# Track disk IOPS (one write operation per batch)
|
|
self.disk_iops.append(1.0)
|
|
|
|
# Clear batch
|
|
self.current_batch.clear()
|
|
self.batch_start_time = None
|
|
|
|
def _sample_disk_latency(self, batch_size_bytes: int) -> float:
|
|
"""Sample disk write latency with realistic fsync modeling including directory sync"""
|
|
# Base latency for the write command (data goes to page cache)
|
|
write_latency = self.disk_base_latency_ms
|
|
|
|
# Throughput-based latency for data transfer to page cache
|
|
size_mb = batch_size_bytes / (1024 * 1024)
|
|
transfer_latency = (
|
|
size_mb / self.disk_throughput_mbps
|
|
) * 1000.0 # Convert to ms
|
|
|
|
# FSYNC latency for EBS - forces write to replicated storage
|
|
# EBS has higher fsync latency due to network replication
|
|
file_fsync_base = 1.0 # Higher base fsync latency for EBS
|
|
file_fsync_variable = self.rng.gamma(2.5, 0.6) # More variable due to network
|
|
|
|
# Size-dependent fsync penalty for large batches
|
|
size_penalty = min(size_mb * 0.1, 2.0) # Max 2ms penalty
|
|
|
|
file_fsync_latency = file_fsync_base + file_fsync_variable + size_penalty
|
|
|
|
# Directory fsync latency - required for WeaselDB batch file creation on EBS
|
|
# EBS directory metadata sync is also network-replicated
|
|
dir_fsync_base = 0.5 # Higher directory metadata sync latency on EBS
|
|
dir_fsync_variable = self.rng.gamma(1.8, 0.3) # More variable due to network
|
|
|
|
dir_fsync_latency = dir_fsync_base + dir_fsync_variable
|
|
|
|
# Total latency: write + file fsync + directory fsync
|
|
# WeaselDB needs directory fsync for batch file durability guarantees
|
|
return write_latency + transfer_latency + file_fsync_latency + dir_fsync_latency
|
|
|
|
def _handle_custom_event(self, data):
|
|
"""Handle batch completion events"""
|
|
if data["type"] == "batch_complete":
|
|
batch_id = data["batch_id"]
|
|
if batch_id in self.in_flight_batches:
|
|
batch = self.in_flight_batches.pop(batch_id)
|
|
|
|
# Mark transactions complete
|
|
for txn in batch["transactions"]:
|
|
latency_ms = (self.current_time - txn.arrival_time) * 1000
|
|
self.completed_transactions.append(
|
|
{
|
|
"txn_id": txn.txn_id,
|
|
"arrival_time": txn.arrival_time,
|
|
"completion_time": self.current_time,
|
|
"latency_ms": latency_ms,
|
|
"size_bytes": txn.size_bytes,
|
|
}
|
|
)
|
|
|
|
def _calculate_storage_efficiency(self) -> float:
|
|
return 1.2 # Some overhead for batching metadata
|
|
|
|
def _get_durability_guarantee(self) -> str:
|
|
return "ACID durable after EBS replication"
|
|
|
|
def _get_consistency_model(self) -> str:
|
|
return "Strict serializable with optimistic concurrency"
|
|
|
|
def _get_recovery_time(self) -> float:
|
|
return 10.0 # EBS volume attachment + recovery
|
|
|
|
def _get_operational_complexity(self) -> int:
|
|
return 4 # Moderate - batching logic + disk management
|
|
|
|
def _get_infrastructure_cost(self) -> float:
|
|
return 1.4 # Higher than S3 due to EBS provisioned storage + replication
|
|
|
|
|
|
class DatabaseComparisonFramework:
|
|
"""Framework for comparing different database persistence patterns"""
|
|
|
|
def __init__(self, simulation_duration: float = 30.0):
|
|
self.simulation_duration = simulation_duration
|
|
|
|
def run_comparison(
|
|
self, arrival_rates: List[float] = [100, 500, 1000, 2000]
|
|
) -> Dict[str, List[PersistenceMetrics]]:
|
|
"""Run comparison across multiple arrival rates"""
|
|
|
|
results = defaultdict(list)
|
|
|
|
for rate in arrival_rates:
|
|
print(f"\nTesting at {rate} TPS...")
|
|
|
|
# WeaselDB S3 (optimized config)
|
|
weasel_s3 = WeaselDBSimulator(
|
|
batch_timeout_ms=1.0,
|
|
batch_size_threshold=800000,
|
|
max_in_flight=50,
|
|
simulation_duration=self.simulation_duration,
|
|
arrival_rate_per_sec=rate,
|
|
)
|
|
weasel_s3_metrics = weasel_s3.run_simulation()
|
|
results["WeaselDB S3"].append(weasel_s3_metrics)
|
|
print(f" WeaselDB S3 P95: {weasel_s3_metrics.p95_latency:.1f}ms")
|
|
|
|
# WeaselDB EBS (same batching, EBS storage)
|
|
weasel_ebs = WeaselDBDiskSimulator(
|
|
batch_timeout_ms=1.0,
|
|
batch_size_threshold=800000,
|
|
max_in_flight=50,
|
|
simulation_duration=self.simulation_duration,
|
|
arrival_rate_per_sec=rate,
|
|
)
|
|
weasel_ebs_metrics = weasel_ebs.run_simulation()
|
|
results["WeaselDB EBS"].append(weasel_ebs_metrics)
|
|
print(f" WeaselDB EBS P95: {weasel_ebs_metrics.p95_latency:.1f}ms")
|
|
|
|
# Traditional WAL
|
|
wal = TraditionalWALSimulator(
|
|
wal_sync_interval_ms=10.0,
|
|
simulation_duration=self.simulation_duration,
|
|
arrival_rate_per_sec=rate,
|
|
)
|
|
wal_metrics = wal.run_simulation()
|
|
results["Traditional WAL"].append(wal_metrics)
|
|
print(f" WAL P95: {wal_metrics.p95_latency:.1f}ms")
|
|
|
|
# Synchronous
|
|
sync = SynchronousSimulator(
|
|
simulation_duration=self.simulation_duration, arrival_rate_per_sec=rate
|
|
)
|
|
sync_metrics = sync.run_simulation()
|
|
results["Synchronous"].append(sync_metrics)
|
|
print(f" Synchronous P95: {sync_metrics.p95_latency:.1f}ms")
|
|
|
|
return dict(results)
|
|
|
|
def print_comparison_report(self, results: Dict[str, List[PersistenceMetrics]]):
|
|
"""Print comprehensive comparison report"""
|
|
print("\n" + "=" * 80)
|
|
print("DATABASE PERSISTENCE PATTERN COMPARISON")
|
|
print("=" * 80)
|
|
|
|
# Get arrival rates for headers
|
|
arrival_rates = [100, 500, 1000, 2000]
|
|
|
|
# Latency comparison
|
|
print(f"\nP95 LATENCY COMPARISON (ms)")
|
|
print(f"{'Pattern':<20}", end="")
|
|
for rate in arrival_rates:
|
|
print(f"{rate:>8} TPS", end="")
|
|
print()
|
|
print("-" * 60)
|
|
|
|
for pattern_name, metrics_list in results.items():
|
|
print(f"{pattern_name:<20}", end="")
|
|
for metrics in metrics_list:
|
|
print(f"{metrics.p95_latency:>8.1f}", end="")
|
|
print()
|
|
|
|
# Throughput comparison
|
|
print(f"\nTHROUGHPUT ACHIEVED (TPS)")
|
|
print(f"{'Pattern':<20}", end="")
|
|
for rate in arrival_rates:
|
|
print(f"{rate:>8} TPS", end="")
|
|
print()
|
|
print("-" * 60)
|
|
|
|
for pattern_name, metrics_list in results.items():
|
|
print(f"{pattern_name:<20}", end="")
|
|
for metrics in metrics_list:
|
|
print(f"{metrics.avg_throughput_tps:>8.1f}", end="")
|
|
print()
|
|
|
|
# Characteristics comparison
|
|
print(f"\nSYSTEM CHARACTERISTICS")
|
|
print(
|
|
f"{'Pattern':<20} {'Durability':<25} {'Consistency':<20} {'OpComplx':<8} {'Cost':<6}"
|
|
)
|
|
print("-" * 85)
|
|
|
|
for pattern_name, metrics_list in results.items():
|
|
metrics = metrics_list[0] # Use first metrics for characteristics
|
|
print(
|
|
f"{pattern_name:<20} {metrics.durability_guarantee:<25} "
|
|
f"{metrics.consistency_model:<20} {metrics.operational_complexity:<8} "
|
|
f"{metrics.infrastructure_cost:<6.1f}"
|
|
)
|
|
|
|
# Performance sweet spots
|
|
print(f"\nPERFORMANCE SWEET SPOTS")
|
|
print("-" * 40)
|
|
|
|
for rate in arrival_rates:
|
|
print(f"\nAt {rate} TPS:")
|
|
rate_results = [
|
|
(name, metrics_list[arrival_rates.index(rate)])
|
|
for name, metrics_list in results.items()
|
|
]
|
|
|
|
# Sort by P95 latency
|
|
rate_results.sort(key=lambda x: x[1].p95_latency)
|
|
|
|
for i, (name, metrics) in enumerate(rate_results):
|
|
rank_symbol = (
|
|
"🥇" if i == 0 else "🥈" if i == 1 else "🥉" if i == 2 else " "
|
|
)
|
|
print(
|
|
f" {rank_symbol} {name}: {metrics.p95_latency:.1f}ms P95, "
|
|
f"{metrics.avg_throughput_tps:.0f} TPS achieved"
|
|
)
|
|
|
|
def plot_comparison_results(
|
|
self,
|
|
results: Dict[str, List[PersistenceMetrics]],
|
|
save_path: Optional[str] = None,
|
|
):
|
|
"""Plot comparison results"""
|
|
try:
|
|
arrival_rates = [100, 500, 1000, 2000]
|
|
|
|
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
|
|
fig.suptitle("Database Persistence Pattern Comparison", fontsize=16)
|
|
|
|
# Plot 1: P95 Latency vs Load
|
|
for pattern_name, metrics_list in results.items():
|
|
p95_latencies = [m.p95_latency for m in metrics_list]
|
|
ax1.plot(
|
|
arrival_rates,
|
|
p95_latencies,
|
|
marker="o",
|
|
linewidth=2,
|
|
label=pattern_name,
|
|
)
|
|
|
|
ax1.set_xlabel("Arrival Rate (TPS)")
|
|
ax1.set_ylabel("P95 Latency (ms)")
|
|
ax1.set_title("P95 Latency vs Load")
|
|
ax1.legend()
|
|
ax1.grid(True, alpha=0.3)
|
|
ax1.set_yscale("log")
|
|
|
|
# Plot 2: Throughput Achieved
|
|
for pattern_name, metrics_list in results.items():
|
|
throughputs = [m.avg_throughput_tps for m in metrics_list]
|
|
ax2.plot(
|
|
arrival_rates,
|
|
throughputs,
|
|
marker="s",
|
|
linewidth=2,
|
|
label=pattern_name,
|
|
)
|
|
|
|
# Perfect throughput line
|
|
ax2.plot(
|
|
arrival_rates,
|
|
arrival_rates,
|
|
"k--",
|
|
alpha=0.5,
|
|
label="Perfect (no loss)",
|
|
)
|
|
|
|
ax2.set_xlabel("Target Rate (TPS)")
|
|
ax2.set_ylabel("Achieved Throughput (TPS)")
|
|
ax2.set_title("Throughput: Target vs Achieved")
|
|
ax2.legend()
|
|
ax2.grid(True, alpha=0.3)
|
|
|
|
# Plot 3: Latency Distribution at 1000 TPS
|
|
rate_idx = 2 # 1000 TPS
|
|
for pattern_name, metrics_list in results.items():
|
|
metrics = metrics_list[rate_idx]
|
|
# Plot latency percentiles
|
|
percentiles = [50, 95, 99]
|
|
values = [
|
|
metrics.median_latency,
|
|
metrics.p95_latency,
|
|
metrics.p99_latency,
|
|
]
|
|
ax3.bar(
|
|
[f"{pattern_name}\nP{p}" for p in percentiles],
|
|
values,
|
|
alpha=0.7,
|
|
label=pattern_name,
|
|
)
|
|
|
|
ax3.set_ylabel("Latency (ms)")
|
|
ax3.set_title("Latency Percentiles at 1000 TPS")
|
|
ax3.set_yscale("log")
|
|
ax3.grid(True, alpha=0.3)
|
|
|
|
# Plot 4: Cost vs Performance
|
|
for pattern_name, metrics_list in results.items():
|
|
costs = [m.infrastructure_cost for m in metrics_list]
|
|
p95s = [m.p95_latency for m in metrics_list]
|
|
|
|
# Use different markers for different patterns
|
|
markers = {"WeaselDB": "o", "Traditional WAL": "s", "Synchronous": "^"}
|
|
marker = markers.get(pattern_name, "o")
|
|
|
|
ax4.scatter(
|
|
costs, p95s, s=100, marker=marker, alpha=0.7, label=pattern_name
|
|
)
|
|
|
|
ax4.set_xlabel("Infrastructure Cost (relative)")
|
|
ax4.set_ylabel("P95 Latency (ms)")
|
|
ax4.set_title("Cost vs Performance Trade-off")
|
|
ax4.legend()
|
|
ax4.grid(True, alpha=0.3)
|
|
ax4.set_yscale("log")
|
|
|
|
plt.tight_layout()
|
|
|
|
if save_path:
|
|
plt.savefig(save_path, dpi=300, bbox_inches="tight")
|
|
print(f"Comparison plots saved to {save_path}")
|
|
else:
|
|
plt.show()
|
|
|
|
except Exception as e:
|
|
print(f"Could not generate plots: {e}")
|
|
|
|
|
|
def main():
|
|
"""Run database persistence pattern comparison"""
|
|
print("Database Persistence Pattern Comparison")
|
|
print("Comparing WeaselDB vs Traditional Database Approaches")
|
|
print()
|
|
|
|
comparison = DatabaseComparisonFramework(simulation_duration=20.0)
|
|
results = comparison.run_comparison()
|
|
|
|
comparison.print_comparison_report(results)
|
|
|
|
try:
|
|
comparison.plot_comparison_results(results, "database_comparison.png")
|
|
except Exception as e:
|
|
print(f"Could not generate plots: {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|