From 5a88047b9fb28482d38dd7f2830b08636fb707a5 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 20:09:15 -0400 Subject: [PATCH] Two release threads --- src/commit_pipeline.cpp | 253 +++++++++++++--------------------------- src/commit_pipeline.hpp | 16 +-- src/thread_pipeline.hpp | 41 +++++++ 3 files changed, 130 insertions(+), 180 deletions(-) diff --git a/src/commit_pipeline.cpp b/src/commit_pipeline.cpp index a911593..a5dc7d7 100644 --- a/src/commit_pipeline.cpp +++ b/src/commit_pipeline.cpp @@ -23,60 +23,62 @@ CommitPipeline::CommitPipeline(const weaseldb::Config &config) // Stage 0: Sequence assignment thread sequence_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-sequence"); - for (;;) { + for (int shutdowns_received = 0; shutdowns_received < 2;) { auto guard = pipeline_.acquire<0, 0>(); - if (process_sequence_batch(guard.batch)) { - return; // Shutdown signal received - } + process_sequence_batch(guard.batch, shutdowns_received); } }}; // Stage 1: Precondition resolution thread resolve_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-resolve"); - for (;;) { + for (int shutdowns_received = 0; shutdowns_received < 2;) { auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); - if (process_resolve_batch(guard.batch)) { - return; // Shutdown signal received - } + process_resolve_batch(guard.batch, shutdowns_received); } }}; // Stage 2: Transaction persistence thread persist_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-persist"); - for (;;) { + for (int shutdowns_received = 0; shutdowns_received < 2;) { auto guard = pipeline_.acquire<2, 0>(); - if (process_persist_batch(guard.batch)) { - return; // Shutdown signal received - } + process_persist_batch(guard.batch, shutdowns_received); } }}; - // Stage 3: Connection return to server thread - release_thread_ = std::thread{[this]() { - pthread_setname_np(pthread_self(), "txn-release"); - for (;;) { + // Stage 3: Connection return to server threads (2 threads) + release_thread_1_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-release-1"); + for (int shutdowns_received = 0; shutdowns_received < 1;) { auto guard = pipeline_.acquire<3, 0>(); - if (process_release_batch(guard.batch)) { - return; // Shutdown signal received - } + process_release_batch(guard.batch, 0, shutdowns_received); + } + }}; + + release_thread_2_ = std::thread{[this]() { + pthread_setname_np(pthread_self(), "txn-release-2"); + for (int shutdowns_received = 0; shutdowns_received < 1;) { + auto guard = pipeline_.acquire<3, 1>(); + process_release_batch(guard.batch, 1, shutdowns_received); } }}; } CommitPipeline::~CommitPipeline() { - // Send single shutdown signal that flows through all pipeline stages + // Send two shutdown signals for both release threads (adjacent in same batch) { - auto guard = pipeline_.push(1, true); + auto guard = pipeline_.push(2, true); guard.batch[0] = ShutdownEntry{}; + guard.batch[1] = ShutdownEntry{}; } // Join all pipeline threads sequence_thread_.join(); resolve_thread_.join(); persist_thread_.join(); - release_thread_.join(); + release_thread_1_.join(); + release_thread_2_.join(); } void CommitPipeline::submit_batch(std::span entries) { @@ -93,7 +95,8 @@ void CommitPipeline::submit_batch(std::span entries) { // Guard destructor publishes batch to stage 0 } -bool CommitPipeline::process_sequence_batch(BatchType &batch) { +void CommitPipeline::process_sequence_batch(BatchType &batch, + int &shutdowns_received) { // Stage 0: Sequence assignment // This stage performs ONLY work that requires serial processing: // - Version/sequence number assignment (must be sequential) @@ -101,28 +104,17 @@ bool CommitPipeline::process_sequence_batch(BatchType &batch) { for (auto &entry : batch) { // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { + std::visit( + [&](auto &&e) { using T = std::decay_t; if constexpr (std::is_same_v) { - return true; // Signal shutdown + ++shutdowns_received; } else if constexpr (std::is_same_v) { // Process commit entry: check banned list, assign version auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } - if (!commit_entry.commit_request) { - // Should not happen - basic validation was done on I/O thread - conn_ref->send_response(commit_entry.protocol_context, - R"({"error":"Internal server error"})", - Arena{}); - return false; - } + assert(commit_entry.commit_request); // Check if request_id is banned (for status queries) // Only check CommitRequest request_id, not HTTP header @@ -130,72 +122,51 @@ bool CommitPipeline::process_sequence_batch(BatchType &batch) { commit_entry.commit_request->request_id().has_value()) { auto commit_request_id = commit_entry.commit_request->request_id().value(); - if (banned_request_ids_.find(commit_request_id) != - banned_request_ids_.end()) { + if (banned_request_ids_.contains(commit_request_id)) { // Request ID is banned, this commit should fail + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return; // Skip this entry and continue processing + } conn_ref->send_response( commit_entry.protocol_context, R"({"status": "not_committed", "error": "request_id_banned"})", Arena{}); - return false; + return; } } // Assign sequential version number commit_entry.assigned_version = next_version_++; - - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: add request_id to banned list, get version // upper bound auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } - if (!status_entry.status_request_id.empty()) { - // Add request_id to banned list - store the string in arena and - // use string_view - std::string_view request_id_view = - banned_request_arena_.copy_string( - status_entry.status_request_id); - banned_request_ids_.insert(request_id_view); + // Add request_id to banned list - store the string in arena and + // use string_view + std::string_view request_id_view = + banned_request_arena_.copy_string( + status_entry.status_request_id); + banned_request_ids_.insert(request_id_view); - // Update memory usage metric - banned_request_ids_memory_gauge.set( - banned_request_arena_.total_allocated()); + // Update memory usage metric + banned_request_ids_memory_gauge.set( + banned_request_arena_.total_allocated()); - // Set version upper bound to current highest assigned version - status_entry.version_upper_bound = next_version_ - 1; - } - - return false; // Continue processing + // Set version upper bound to current highest assigned version + status_entry.version_upper_bound = next_version_ - 1; } else if constexpr (std::is_same_v) { // Process health check entry: noop in sequence stage - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } - - return false; // Continue processing } - - return false; // Unknown type, continue }, entry); - - if (should_shutdown) { - return true; - } } - return false; // Continue processing } -bool CommitPipeline::process_resolve_batch(BatchType &batch) { +void CommitPipeline::process_resolve_batch(BatchType &batch, + int &shutdowns_received) { // Stage 1: Precondition resolution // This stage must be serialized to maintain consistent database state view // - Validate preconditions against current database state @@ -203,87 +174,52 @@ bool CommitPipeline::process_resolve_batch(BatchType &batch) { for (auto &entry : batch) { // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { + std::visit( + [&](auto &&e) { using T = std::decay_t; if constexpr (std::is_same_v) { - return true; // Signal shutdown + ++shutdowns_received; } else if constexpr (std::is_same_v) { // Process commit entry: accept all commits (simplified // implementation) auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } - - if (!commit_entry.commit_request) { - // Skip processing for failed sequence stage - return false; - } - // Accept all commits (simplified implementation) commit_entry.resolve_success = true; - - return false; // Continue processing } else if constexpr (std::is_same_v) { // Status entries are not processed in resolve stage // They were already handled in sequence stage - return false; } else if constexpr (std::is_same_v) { - // Process health check entry: perform configurable CPU work - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } - // Perform configurable CPU-intensive work for benchmarking spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); - - return false; // Continue processing } - - return false; // Unknown type, continue }, entry); - - if (should_shutdown) { - return true; - } } - return false; // Continue processing } -bool CommitPipeline::process_persist_batch(BatchType &batch) { +void CommitPipeline::process_persist_batch(BatchType &batch, + int &shutdowns_received) { // Stage 2: Transaction persistence // Mark everything as durable immediately (simplified implementation) // In real implementation: batch S3 writes, update subscribers, etc. for (auto &entry : batch) { // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { + std::visit( + [&](auto &&e) { using T = std::decay_t; if constexpr (std::is_same_v) { - return true; // Signal shutdown + ++shutdowns_received; } else if constexpr (std::is_same_v) { // Process commit entry: mark as durable, generate response auto &commit_entry = e; // Check if connection is still alive first - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } // Skip if resolve failed or connection is in error state if (!commit_entry.commit_request || !commit_entry.resolve_success) { - return false; + return; } // Mark as persisted and update committed version high water mark @@ -318,40 +254,23 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) { commit_entry.response_json = std::string_view(json_buffer, response_json.size()); - return false; // Continue processing + return; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: generate not_committed response auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; - } // Store JSON response for release stage status_entry.response_json = R"({"status": "not_committed"})"; - return false; } else if constexpr (std::is_same_v) { // Process health check entry: generate OK response auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } // Store plain text "OK" response for release stage health_check_entry.response_json = "OK"; - return false; // Continue processing } else if constexpr (std::is_same_v) { auto &get_version_entry = e; - auto conn_ref = get_version_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing - } // TODO validate we're still the leader at some version > the // proposed version for external consistency. @@ -360,38 +279,40 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) { get_version_entry.request_arena, R"({"version":%ld,"leader":""})", get_version_entry.version); } - - return false; // Unknown type, continue }, entry); - - if (should_shutdown) { - return true; - } } - - return false; // Continue processing } -bool CommitPipeline::process_release_batch(BatchType &batch) { +void CommitPipeline::process_release_batch(BatchType &batch, int thread_index, + int &shutdowns_received) { // Stage 3: Connection release // Return connections to server for response transmission - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - bool should_shutdown = std::visit( - [&](auto &&e) -> bool { + for (auto it = batch.begin(); it != batch.end(); ++it) { + auto &entry = *it; + + // Partition work: thread 0 handles even indices, thread 1 handles odd + // indices + if (static_cast(it.index() % 2) != thread_index) { + continue; + } + + // Process non-shutdown entries with partitioning + std::visit( + [&](auto &&e) { using T = std::decay_t; if constexpr (std::is_same_v) { - return true; // Signal shutdown + // Already handled above + ++shutdowns_received; } else if constexpr (std::is_same_v) { // Process commit entry: return connection to server auto &commit_entry = e; auto conn_ref = commit_entry.connection.lock(); if (!conn_ref) { // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing + return; // Skip this entry and continue processing } // Send the JSON response using protocol-agnostic interface @@ -399,15 +320,13 @@ bool CommitPipeline::process_release_batch(BatchType &batch) { conn_ref->send_response(commit_entry.protocol_context, commit_entry.response_json, std::move(commit_entry.request_arena)); - - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: return connection to server auto &status_entry = e; auto conn_ref = status_entry.connection.lock(); if (!conn_ref) { // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing + return; // Skip this entry and continue processing } // Send the JSON response using protocol-agnostic interface @@ -415,15 +334,13 @@ bool CommitPipeline::process_release_batch(BatchType &batch) { conn_ref->send_response(status_entry.protocol_context, status_entry.response_json, std::move(status_entry.request_arena)); - - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process health check entry: return connection to server auto &health_check_entry = e; auto conn_ref = health_check_entry.connection.lock(); if (!conn_ref) { // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing + return; // Skip this entry and continue processing } // Send the response using protocol-agnostic interface @@ -432,14 +349,12 @@ bool CommitPipeline::process_release_batch(BatchType &batch) { health_check_entry.protocol_context, health_check_entry.response_json, std::move(health_check_entry.request_arena)); - - return false; // Continue processing } else if constexpr (std::is_same_v) { auto &get_version_entry = e; auto conn_ref = get_version_entry.connection.lock(); if (!conn_ref) { // Connection is gone, drop the entry silently - return false; // Skip this entry and continue processing + return; // Skip this entry and continue processing } // Send the response using protocol-agnostic interface @@ -448,15 +363,7 @@ bool CommitPipeline::process_release_batch(BatchType &batch) { get_version_entry.response_json, std::move(get_version_entry.request_arena)); } - - return false; // Unknown type, continue }, entry); - - if (should_shutdown) { - return true; - } } - - return false; // Continue processing } diff --git a/src/commit_pipeline.hpp b/src/commit_pipeline.hpp index 95eb715..675d83d 100644 --- a/src/commit_pipeline.hpp +++ b/src/commit_pipeline.hpp @@ -109,21 +109,23 @@ private: static constexpr auto wait_strategy = WaitStrategy::WaitIfStageEmpty; // 4-stage pipeline: sequence -> resolve -> persist -> release - StaticThreadPipeline pipeline_; + StaticThreadPipeline pipeline_; // Stage processing threads std::thread sequence_thread_; std::thread resolve_thread_; std::thread persist_thread_; - std::thread release_thread_; + std::thread release_thread_1_; + std::thread release_thread_2_; // Pipeline stage processing methods (batch-based) using BatchType = - StaticThreadPipeline::Batch; - bool process_sequence_batch(BatchType &batch); - bool process_resolve_batch(BatchType &batch); - bool process_persist_batch(BatchType &batch); - bool process_release_batch(BatchType &batch); + StaticThreadPipeline::Batch; + void process_sequence_batch(BatchType &batch, int &shutdowns_received); + void process_resolve_batch(BatchType &batch, int &shutdowns_received); + void process_persist_batch(BatchType &batch, int &shutdowns_received); + void process_release_batch(BatchType &batch, int thread_index, + int &shutdowns_received); // Make non-copyable and non-movable CommitPipeline(const CommitPipeline &) = delete; diff --git a/src/thread_pipeline.hpp b/src/thread_pipeline.hpp index c153e18..3018468 100644 --- a/src/thread_pipeline.hpp +++ b/src/thread_pipeline.hpp @@ -268,6 +268,47 @@ int check_producer_capacity( // } // // Guard destructor marks items as consumed and available to next stage // +// Multi-Thread Stage Processing: +// When a stage has multiple threads (e.g., 1, 1, 1, 2 = 2 threads in stage 3): +// +// OVERLAPPING BATCHES - EACH THREAD SEES EVERY ENTRY: +// - Multiple threads in the same stage get OVERLAPPING batches from the ring +// buffer +// - Thread 0: calls acquire<3, 0>() - gets batch from ring positions 100-110 +// - Thread 1: calls acquire<3, 1>() - gets batch from ring positions 100-110 +// (SAME) +// - Both threads see the same entries and must coordinate processing +// +// PARTITIONING STRATEGIES: +// Choose your partitioning approach based on your use case: +// +// 1. Ring buffer position-based partitioning: +// for (auto it = batch.begin(); it != batch.end(); ++it) { +// if (it.index() % 2 != thread_index) continue; // Skip entries for other +// threads process(*it); // Process only entries assigned to this thread +// } +// +// 2. Entry content-based partitioning: +// for (auto& item : guard.batch) { +// if (hash(item.connection_id) % 2 != thread_index) continue; +// process(item); // Process based on entry properties +// } +// +// 3. Process all entries (when each thread does different work): +// for (auto& item : guard.batch) { +// process(item); // Both threads process all items, but differently +// } +// +// Common Partitioning Patterns: +// - Position-based: it.index() % num_threads == thread_index +// - Hash-based: hash(item.key) % num_threads == thread_index +// - Type-based: item.type == MY_THREAD_TYPE +// - Load balancing: assign work based on thread load +// - All entries: each thread processes all items but performs different +// operations +// +// Note: it.index() returns the position in the ring buffer (0 to buffer_size-1) +// // Memory Model: // - Ring buffer size must be power of 2 for efficient masking // - Actual ring slots accessed via: index & (slotCount - 1)