Files
weaseldb/commit_pipeline.md

18 KiB

Commit Processing Pipeline

Overview

WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.

Architecture

The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:

HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads

Pipeline Flow

  1. HTTP I/O Threads: Parse and validate incoming commit requests
  2. Sequence Stage: Assign sequential version numbers to commits
  3. Resolve Stage: Validate preconditions and check for conflicts
  4. Persist Stage: Write commits to durable storage and notify subscribers
  5. Release Stage: Return connections to HTTP I/O threads for response handling

Stage Details

Stage 0: Sequence Assignment

Thread: txn-sequence Purpose: Version assignment and request ID management Serialization: Required - Must be single-threaded

Responsibilities:

  • For CommitEntry: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
  • For StatusEntry: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
  • Record version assignments for transaction tracking

Why Serialization is Required:

  • Version numbers must be strictly sequential without gaps
  • Banned list updates must be atomic with version assignment
  • Status requests must get accurate upper bound on potential commit versions

Request ID Banned List:

  • Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
  • Lifecycle: Grows indefinitely until process restart (leader change)
  • Removal: Only on process restart/leader change, which invalidates all old request IDs

Current Implementation:

bool HttpHandler::process_sequence_batch(BatchType &batch) {
  for (auto &entry : batch) {
    if (std::holds_alternative<ShutdownEntry>(entry)) {
      return true; // Shutdown signal
    }
    // TODO: Pattern match on CommitEntry vs StatusEntry
    // TODO: Implement sequence assignment logic for each type
  }
  return false; // Continue processing
}

Stage 1: Precondition Resolution

Thread: txn-resolve Purpose: Validate preconditions and detect conflicts Serialization: Required - Must be single-threaded

Responsibilities:

  • For CommitEntry: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
  • For StatusEntry: N/A (transferred to status threadpool after sequence stage)
  • Mark failed commits with failure information (including which preconditions failed)

Why Serialization is Required:

  • Must maintain consistent view of in-memory recent writes set
  • Conflict detection requires atomic evaluation of all preconditions against recent writes
  • Recent writes set updates must be synchronized

Transaction State Transitions:

  • Assigned Version (from sequence) → Semi-committed (resolve accepts) → Committed (persist completes)
  • Failed transactions continue through pipeline with failure information for client response

Current Implementation:

bool HttpHandler::process_resolve_batch(BatchType &batch) {
  // TODO: Implement precondition resolution logic:
  // 1. For CommitEntry: Check preconditions against in-memory recent writes set
  // 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
  // 3. If failed: Mark with failure info (which preconditions failed)
  // 4. For StatusEntry: N/A (already transferred to status threadpool)
}

Stage 2: Transaction Persistence

Thread: txn-persist Purpose: Write semi-committed transactions to durable storage Serialization: Required - Must mark batches durable in order

Responsibilities:

  • For CommitEntry: Apply operations to persistent storage, update committed version high water mark
  • For StatusEntry: N/A (transferred to status threadpool after sequence stage)
  • Generate durability events for /v1/subscribe when committed version advances
  • Batch multiple commits for efficient persistence operations

Why Serialization is Required:

  • Batches must be marked durable in sequential version order
  • High water mark updates must reflect strict ordering of committed versions
  • Ensures consistency guarantees across all endpoints

Committed Version High Water Mark:

  • Global atomic value tracking highest durably committed version
  • Updated after each batch commits: set to highest version in the batch
  • Read by /v1/version endpoint using atomic seq_cst reads
  • Enables /v1/subscribe durability events when high water mark advances

Batching Strategy:

  • Multiple semi-committed transactions can be persisted in a single batch
  • High water mark updated once per batch to highest version in that batch
  • See persistence.md for detailed persistence design

Current Implementation:

bool HttpHandler::process_persist_batch(BatchType &batch) {
  // TODO: Implement actual persistence logic:
  // 1. For CommitEntry: Apply operations to persistent storage
  // 2. Update committed version high water mark to highest version in batch
  // 3. Generate durability events for /v1/subscribe
  // 4. For StatusEntry: N/A (already transferred to status threadpool)
}

Stage 3: Connection Release

Thread: txn-release Purpose: Return connections to HTTP server for client response Serialization: Not required - Independent connection handling

Responsibilities:

  • Return processed connections to HTTP server for all request types
  • Connection carries response data (success/failure) and status information
  • Trigger response transmission to clients

Response Handling:

  • CommitRequests: Response generated by persist stage (success with version, or failure with conflicting preconditions)
  • StatusRequests: Response generated by separate status lookup logic (not part of pipeline)
  • Failed transactions carry failure information through entire pipeline for proper client response

Implementation:

bool HttpHandler::process_release_batch(BatchType &batch) {
  // Stage 3: Connection release
  for (auto &conn : batch) {
    if (!conn) {
      return true; // Shutdown signal
    }
    // Return connection to server for further processing or cleanup
    Server::release_back_to_server(std::move(conn));
  }
  return false; // Continue processing
}

Threading Model

Thread Pipeline Configuration

// 4-stage pipeline: sequence -> resolve -> persist -> release
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
StaticThreadPipeline<PipelineEntry,  // Was: std::unique_ptr<Connection>
                     WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
    commitPipeline{lg_size};

// Pipeline entry type (to be implemented)
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;

Thread Creation and Management

HttpHandler() {
  // Stage 0: Sequence assignment thread
  sequenceThread = std::thread{[this]() {
    pthread_setname_np(pthread_self(), "txn-sequence");
    for (;;) {
      auto guard = commitPipeline.acquire<0, 0>();
      if (process_sequence_batch(guard.batch)) {
        return; // Shutdown signal received
      }
    }
  }};

  // Similar pattern for resolve, persist, and release threads...
}

Batch Processing

Each stage processes connections in batches using RAII guards:

auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
// Process batch
for (auto &conn : guard.batch) {
  // Stage-specific processing
}
// Guard destructor automatically publishes batch to next stage

Flow Control

Pipeline Entry

Commit requests enter the pipeline via HttpHandler::on_batch_complete():

void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
  // Collect commit requests that passed basic validation for 4-stage pipeline processing
  int commit_count = 0;
  for (auto &conn : batch) {
    if (conn && conn->user_data) {
      auto *state = static_cast<HttpConnectionState *>(conn->user_data);
      if (state->route == HttpRoute::POST_commit &&
          state->commit_request &&
          state->parsing_commit) {
        commit_count++;
      }
    }
  }

  // Send commit requests to 4-stage pipeline in batch
  if (commit_count > 0) {
    auto guard = commitPipeline.push(commit_count, true);
    // Move qualifying connections into pipeline
  }
}

Backpressure Handling

The pipeline implements natural backpressure:

  • Each stage blocks if downstream stages are full
  • WaitIfUpstreamIdle strategy balances latency vs throughput
  • Ring buffer size (lg_size = 16) controls maximum queued batches

Shutdown Coordination

Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:

~HttpHandler() {
  // Send single shutdown signal that flows through all pipeline stages
  {
    auto guard = commitPipeline.push(1, true);
    guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
  }

  // Join all pipeline threads
  sequenceThread.join();
  resolveThread.join();
  persistThread.join();
  releaseThread.join();
}

Note: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.

Error Handling

Stage-Level Error Handling

Each stage handles different entry types:

// Pattern matching on pipeline entry variant
std::visit([&](auto&& entry) {
  using T = std::decay_t<decltype(entry)>;
  if constexpr (std::is_same_v<T, ShutdownEntry>) {
    return true; // Signal shutdown
  } else if constexpr (std::is_same_v<T, CommitEntry>) {
    // Process commit entry
  } else if constexpr (std::is_same_v<T, StatusEntry>) {
    // Handle status entry (or skip if transferred)
  }
}, pipeline_entry);

Connection Error States

  • Failed CommitEntries are passed through the pipeline with error information
  • Downstream stages skip processing for error connections but forward them
  • Error responses are sent when connection reaches release stage
  • Connection ownership is always transferred to ensure cleanup

Pipeline Integrity

  • ShutdownEntry signals shutdown to all stages
  • Each stage checks for ShutdownEntry and returns true to signal shutdown
  • RAII guards ensure entries are always published downstream
  • No entries are lost even during error conditions

Performance Characteristics

Throughput Optimization

  • Batching: Multiple connections processed per stage activation
  • Lock-Free Communication: Ring buffer between stages
  • Minimal Context Switching: Dedicated threads per stage
  • Arena Allocation: Efficient memory management throughout pipeline

Latency Optimization

  • Single-Pass Processing: Each connection flows through all stages once
  • Streaming Design: Stages process concurrently
  • Minimal Copying: Connection ownership transfer, not data copying
  • Direct Response: Release stage triggers immediate response transmission

Scalability Characteristics

  • Batch Size Tuning: Ring buffer size controls memory vs latency tradeoff
  • Thread Affinity: Dedicated threads reduce scheduling overhead
  • NUMA Awareness: Can pin threads to specific CPU cores

Configuration

Pipeline Parameters

private:
  static constexpr int lg_size = 16;  // Ring buffer size = 2^16 entries

  // 4-stage pipeline configuration
  StaticThreadPipeline<std::unique_ptr<Connection>,
                       WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
      commitPipeline{lg_size};

Tuning Considerations

  • Ring Buffer Size: Larger buffers increase memory usage but improve batching
  • Wait Strategy: WaitIfUpstreamIdle balances CPU usage vs latency
  • Thread Affinity: OS scheduling vs explicit CPU pinning tradeoffs

Pipeline Entry Types

The pipeline processes different types of entries using a variant/union type system instead of std::unique_ptr<Connection>:

Pipeline Entry Variants

  • CommitEntry: Contains std::unique_ptr<Connection> with CommitRequest and connection state
  • StatusEntry: Contains std::unique_ptr<Connection> with StatusRequest (transferred to status threadpool after sequence)
  • ShutdownEntry: Signals pipeline shutdown to all stages
  • Future types: Pipeline design supports additional entry types

Stage Processing by Type

Stage CommitEntry StatusEntry ShutdownEntry Serialization
Sequence Check banned list, assign version Add to banned list, transfer to status threadpool Return true (shutdown) Required
Resolve Check preconditions, update recent writes N/A (transferred) Return true (shutdown) Required
Persist Apply operations, update high water mark N/A (transferred) Return true (shutdown) Required
Release Return connection to HTTP threads N/A (transferred) Return true (shutdown) Not required

API Endpoint Integration

/v1/commit - Transaction Submission

Pipeline Interaction: Full pipeline traversal as CommitEntry

Request Processing Flow

  1. HTTP I/O Thread Processing (src/http_handler.cpp:210-273):

    void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
      // Parse and validate anything that doesn't need serialization:
      // - JSON parsing and CommitRequest construction
      // - Basic validation: leader_id check, operation format validation
      // - Check that we have at least one operation
    
      // If validation fails, send error response immediately and return
      // If validation succeeds, connection will enter pipeline in on_batch_complete()
    }
    
  2. Pipeline Entry: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)

  3. Pipeline Processing:

    • Sequence: Check banned list → assign version (or reject)
    • Resolve: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
    • Persist: Apply operations → mark committed, update high water mark
    • Release: Return connection with response data
  4. Response Generation: Based on pipeline results

    • Success: {"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}
    • Failure: {"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}

/v1/status - Commit Status Lookup

Pipeline Interaction: StatusEntry through sequence stage, then transfer to status threadpool

Request Processing Flow

  1. HTTP I/O Thread Processing:

    void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
      // TODO: Extract request_id from URL and min_version from query params
      // Current: Returns placeholder static response
    }
    
  2. Two-Phase Processing:

    • Phase 1 - Sequence Stage: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
    • Phase 2 - Status Threadpool: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
  3. Status Lookup Logic: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id

/v1/subscribe - Real-time Transaction Stream

Pipeline Integration: Consumes events from resolve and persist stages

Event Sources

  • Resolve Stage: Semi-committed transactions (accepted preconditions) for low-latency streaming
  • Persist Stage: Durability events when committed version high water mark advances

Current Implementation

void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
  // TODO: Parse query parameters (after, durable)
  // TODO: Establish Server-Sent Events stream
  // TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
}

/v1/version - Version Information

Pipeline Integration: Direct atomic read, no pipeline interaction

// TODO: Implement direct atomic read of committed version high water mark
// No pipeline interaction needed - seq_cst atomic read
// Leader ID is process-lifetime constant

Response: {"version": <high_water_mark>, "leader_id": "<process_leader_id>"}

Integration Points

HTTP Handler Integration

The pipeline integrates with the HTTP handler at two points:

  1. Entry: on_batch_complete() feeds connections into sequence stage
  2. Exit: Release stage calls Server::release_back_to_server()

Persistence Layer Integration

The persist stage interfaces with:

  • S3 Backend: Batch writes for durability (see persistence.md)
  • Subscriber System: Real-time change stream notifications
  • Metrics System: Transaction throughput and latency tracking

Database State Integration

  • Sequence Stage: Updates version number generator
  • Resolve Stage: Queries current database state for precondition validation
  • Persist Stage: Applies mutations to authoritative database state

Future Optimizations

Potential Enhancements

  1. Dynamic Thread Counts: Make resolve and release thread counts configurable
  2. NUMA Optimization: Pin pipeline threads to specific CPU cores
  3. Batch Size Tuning: Dynamic batch size based on load
  4. Stage Bypassing: Skip resolve stage for transactions without preconditions
  5. Persistence Batching: Aggregate multiple commits into larger S3 writes

Monitoring and Observability

  1. Stage Metrics: Throughput, latency, and queue depth per stage
  2. Error Tracking: Error rates and types by stage
  3. Resource Utilization: CPU and memory usage per pipeline thread
  4. Flow Control Events: Backpressure and stall detection

Implementation Status

Current State

  • Pipeline structure implemented with 4 stages
  • Thread creation and management
  • RAII batch processing
  • Error handling framework
  • Shutdown coordination

TODO Items

  • Sequence assignment logic implementation
  • Precondition resolution implementation
  • S3 persistence batching implementation
  • Subscriber notification system
  • Performance monitoring and metrics
  • Configuration tuning and optimization