diff --git a/src/commit_pipeline.cpp b/src/commit_pipeline.cpp index a5dc7d7..f4b3575 100644 --- a/src/commit_pipeline.cpp +++ b/src/commit_pipeline.cpp @@ -2,6 +2,7 @@ #include #include +#include #include "commit_request.hpp" #include "cpu_work.hpp" @@ -16,52 +17,35 @@ auto banned_request_ids_memory_gauge = .create({}); CommitPipeline::CommitPipeline(const weaseldb::Config &config) - : config_(config), banned_request_ids_(ArenaStlAllocator( - &banned_request_arena_)), - pipeline_(lg_size) { + : config_(config), pipeline_(lg_size) { // Stage 0: Sequence assignment thread sequence_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-sequence"); - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<0, 0>(); - process_sequence_batch(guard.batch, shutdowns_received); - } + run_sequence_stage(); }}; // Stage 1: Precondition resolution thread resolve_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-resolve"); - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); - process_resolve_batch(guard.batch, shutdowns_received); - } + run_resolve_stage(); }}; // Stage 2: Transaction persistence thread persist_thread_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-persist"); - for (int shutdowns_received = 0; shutdowns_received < 2;) { - auto guard = pipeline_.acquire<2, 0>(); - process_persist_batch(guard.batch, shutdowns_received); - } + run_persist_stage(); }}; // Stage 3: Connection return to server threads (2 threads) release_thread_1_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-release-1"); - for (int shutdowns_received = 0; shutdowns_received < 1;) { - auto guard = pipeline_.acquire<3, 0>(); - process_release_batch(guard.batch, 0, shutdowns_received); - } + run_release_stage<0>(); }}; release_thread_2_ = std::thread{[this]() { pthread_setname_np(pthread_self(), "txn-release-2"); - for (int shutdowns_received = 0; shutdowns_received < 1;) { - auto guard = pipeline_.acquire<3, 1>(); - process_release_batch(guard.batch, 1, shutdowns_received); - } + run_release_stage<1>(); }}; } @@ -95,275 +79,299 @@ void CommitPipeline::submit_batch(std::span entries) { // Guard destructor publishes batch to stage 0 } -void CommitPipeline::process_sequence_batch(BatchType &batch, - int &shutdowns_received) { - // Stage 0: Sequence assignment - // This stage performs ONLY work that requires serial processing: - // - Version/sequence number assignment (must be sequential) - // - Request ID banned list management +void CommitPipeline::run_sequence_stage() { - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - std::visit( - [&](auto &&e) { - using T = std::decay_t; + int64_t next_version = 1; + // Request ID deduplication (sequence stage only) + Arena banned_request_arena; + using BannedRequestIdSet = + std::unordered_set, + std::equal_to, + ArenaStlAllocator>; + BannedRequestIdSet banned_request_ids{ + ArenaStlAllocator(&banned_request_arena)}; - if constexpr (std::is_same_v) { - ++shutdowns_received; - } else if constexpr (std::is_same_v) { - // Process commit entry: check banned list, assign version - auto &commit_entry = e; + for (int shutdowns_received = 0; shutdowns_received < 2;) { + auto guard = pipeline_.acquire<0, 0>(); + auto &batch = guard.batch; - assert(commit_entry.commit_request); + // Stage 0: Sequence assignment + // This stage performs ONLY work that requires serial processing: + // - Version/sequence number assignment (must be sequential) + // - Request ID banned list management - // Check if request_id is banned (for status queries) - // Only check CommitRequest request_id, not HTTP header - if (commit_entry.commit_request && - commit_entry.commit_request->request_id().has_value()) { - auto commit_request_id = - commit_entry.commit_request->request_id().value(); - if (banned_request_ids_.contains(commit_request_id)) { - // Request ID is banned, this commit should fail - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return; // Skip this entry and continue processing + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + std::visit( + [&](auto &&e) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + ++shutdowns_received; + } else if constexpr (std::is_same_v) { + // Process commit entry: check banned list, assign version + auto &commit_entry = e; + + assert(commit_entry.commit_request); + + // Check if request_id is banned (for status queries) + // Only check CommitRequest request_id, not HTTP header + if (commit_entry.commit_request && + commit_entry.commit_request->request_id().has_value()) { + auto commit_request_id = + commit_entry.commit_request->request_id().value(); + if (banned_request_ids.contains(commit_request_id)) { + // Request ID is banned, this commit should fail + commit_entry.response_json = + R"({"status": "not_committed", "error": "request_id_banned"})"; + return; } - conn_ref->send_response( - commit_entry.protocol_context, - R"({"status": "not_committed", "error": "request_id_banned"})", - Arena{}); + } + + // Assign sequential version number + commit_entry.assigned_version = next_version++; + } else if constexpr (std::is_same_v) { + // Process status entry: add request_id to banned list, get + // version upper bound + auto &status_entry = e; + + // Add request_id to banned list - store the string in arena and + // use string_view + std::string_view request_id_view = + banned_request_arena.copy_string( + status_entry.status_request_id); + banned_request_ids.insert(request_id_view); + + // Update memory usage metric + banned_request_ids_memory_gauge.set( + banned_request_arena.total_allocated()); + + // Set version upper bound to current highest assigned version + status_entry.version_upper_bound = next_version - 1; + } else if constexpr (std::is_same_v) { + // Process health check entry: noop in sequence stage + } + }, + entry); + } + } +} + +void CommitPipeline::run_resolve_stage() { + for (int shutdowns_received = 0; shutdowns_received < 2;) { + auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1); + auto &batch = guard.batch; + + // Stage 1: Precondition resolution + // This stage must be serialized to maintain consistent database state view + // - Validate preconditions against current database state + // - Check for conflicts with other transactions + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + std::visit( + [&](auto &&e) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + ++shutdowns_received; + } else if constexpr (std::is_same_v) { + // Process commit entry: accept all commits (simplified + // implementation) + auto &commit_entry = e; + // Accept all commits (simplified implementation) + commit_entry.resolve_success = true; + } else if constexpr (std::is_same_v) { + // Status entries are not processed in resolve stage + // They were already handled in sequence stage + } else if constexpr (std::is_same_v) { + // Perform configurable CPU-intensive work for benchmarking + spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); + } + }, + entry); + } + } +} + +void CommitPipeline::run_persist_stage() { + for (int shutdowns_received = 0; shutdowns_received < 2;) { + auto guard = pipeline_.acquire<2, 0>(); + auto &batch = guard.batch; + + // Stage 2: Transaction persistence + // Mark everything as durable immediately (simplified implementation) + // In real implementation: batch S3 writes, update subscribers, etc. + + for (auto &entry : batch) { + // Pattern match on pipeline entry variant + std::visit( + [&](auto &&e) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + ++shutdowns_received; + } else if constexpr (std::is_same_v) { + // Process commit entry: mark as durable, generate response + auto &commit_entry = e; + // Check if connection is still alive first + + // Skip if resolve failed or connection is in error state + if (!commit_entry.commit_request || + !commit_entry.resolve_success) { return; } + + // Mark as persisted and update committed version high water mark + commit_entry.persist_success = true; + committed_version_.store(commit_entry.assigned_version, + std::memory_order_seq_cst); + + const CommitRequest &commit_request = + *commit_entry.commit_request; + + // Generate success JSON response with actual assigned version + std::string_view response_json; + if (commit_request.request_id().has_value()) { + response_json = format( + commit_entry.request_arena, + R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", + static_cast( + commit_request.request_id().value().size()), + commit_request.request_id().value().data(), + commit_entry.assigned_version); + } else { + response_json = format( + commit_entry.request_arena, + R"({"status":"committed","version":%ld,"leader_id":"leader123"})", + commit_entry.assigned_version); + } + + // Store JSON response in arena for release stage + char *json_buffer = + commit_entry.request_arena.template allocate( + response_json.size()); + std::memcpy(json_buffer, response_json.data(), + response_json.size()); + commit_entry.response_json = + std::string_view(json_buffer, response_json.size()); + + return; // Continue processing + } else if constexpr (std::is_same_v) { + // Process status entry: generate not_committed response + auto &status_entry = e; + + // Store JSON response for release stage + status_entry.response_json = R"({"status": "not_committed"})"; + + } else if constexpr (std::is_same_v) { + // Process health check entry: generate OK response + auto &health_check_entry = e; + + // Store plain text "OK" response for release stage + health_check_entry.response_json = "OK"; + + } else if constexpr (std::is_same_v) { + auto &get_version_entry = e; + + // TODO validate we're still the leader at some version > the + // proposed version for external consistency. + // TODO include leader in response + get_version_entry.response_json = format( + get_version_entry.request_arena, + R"({"version":%ld,"leader":""})", get_version_entry.version); } - - // Assign sequential version number - commit_entry.assigned_version = next_version_++; - } else if constexpr (std::is_same_v) { - // Process status entry: add request_id to banned list, get version - // upper bound - auto &status_entry = e; - - // Add request_id to banned list - store the string in arena and - // use string_view - std::string_view request_id_view = - banned_request_arena_.copy_string( - status_entry.status_request_id); - banned_request_ids_.insert(request_id_view); - - // Update memory usage metric - banned_request_ids_memory_gauge.set( - banned_request_arena_.total_allocated()); - - // Set version upper bound to current highest assigned version - status_entry.version_upper_bound = next_version_ - 1; - } else if constexpr (std::is_same_v) { - // Process health check entry: noop in sequence stage - } - }, - entry); - } -} - -void CommitPipeline::process_resolve_batch(BatchType &batch, - int &shutdowns_received) { - // Stage 1: Precondition resolution - // This stage must be serialized to maintain consistent database state view - // - Validate preconditions against current database state - // - Check for conflicts with other transactions - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - std::visit( - [&](auto &&e) { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - ++shutdowns_received; - } else if constexpr (std::is_same_v) { - // Process commit entry: accept all commits (simplified - // implementation) - auto &commit_entry = e; - // Accept all commits (simplified implementation) - commit_entry.resolve_success = true; - } else if constexpr (std::is_same_v) { - // Status entries are not processed in resolve stage - // They were already handled in sequence stage - } else if constexpr (std::is_same_v) { - // Perform configurable CPU-intensive work for benchmarking - spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); - } - }, - entry); - } -} - -void CommitPipeline::process_persist_batch(BatchType &batch, - int &shutdowns_received) { - // Stage 2: Transaction persistence - // Mark everything as durable immediately (simplified implementation) - // In real implementation: batch S3 writes, update subscribers, etc. - - for (auto &entry : batch) { - // Pattern match on pipeline entry variant - std::visit( - [&](auto &&e) { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - ++shutdowns_received; - } else if constexpr (std::is_same_v) { - // Process commit entry: mark as durable, generate response - auto &commit_entry = e; - // Check if connection is still alive first - - // Skip if resolve failed or connection is in error state - if (!commit_entry.commit_request || !commit_entry.resolve_success) { - return; - } - - // Mark as persisted and update committed version high water mark - commit_entry.persist_success = true; - committed_version_.store(commit_entry.assigned_version, - std::memory_order_seq_cst); - - const CommitRequest &commit_request = *commit_entry.commit_request; - - // Generate success JSON response with actual assigned version - std::string_view response_json; - if (commit_request.request_id().has_value()) { - response_json = format( - commit_entry.request_arena, - R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", - static_cast(commit_request.request_id().value().size()), - commit_request.request_id().value().data(), - commit_entry.assigned_version); - } else { - response_json = format( - commit_entry.request_arena, - R"({"status":"committed","version":%ld,"leader_id":"leader123"})", - commit_entry.assigned_version); - } - - // Store JSON response in arena for release stage - char *json_buffer = - commit_entry.request_arena.template allocate( - response_json.size()); - std::memcpy(json_buffer, response_json.data(), - response_json.size()); - commit_entry.response_json = - std::string_view(json_buffer, response_json.size()); - - return; // Continue processing - } else if constexpr (std::is_same_v) { - // Process status entry: generate not_committed response - auto &status_entry = e; - - // Store JSON response for release stage - status_entry.response_json = R"({"status": "not_committed"})"; - - } else if constexpr (std::is_same_v) { - // Process health check entry: generate OK response - auto &health_check_entry = e; - - // Store plain text "OK" response for release stage - health_check_entry.response_json = "OK"; - - } else if constexpr (std::is_same_v) { - auto &get_version_entry = e; - - // TODO validate we're still the leader at some version > the - // proposed version for external consistency. - // TODO include leader in response - get_version_entry.response_json = format( - get_version_entry.request_arena, - R"({"version":%ld,"leader":""})", get_version_entry.version); - } - }, - entry); - } -} - -void CommitPipeline::process_release_batch(BatchType &batch, int thread_index, - int &shutdowns_received) { - // Stage 3: Connection release - // Return connections to server for response transmission - - for (auto it = batch.begin(); it != batch.end(); ++it) { - auto &entry = *it; - - // Partition work: thread 0 handles even indices, thread 1 handles odd - // indices - if (static_cast(it.index() % 2) != thread_index) { - continue; + }, + entry); + } + } +} + +template void CommitPipeline::run_release_stage() { + for (int shutdowns_received = 0; shutdowns_received < 1;) { + auto guard = pipeline_.acquire<3, thread_index>(); + auto &batch = guard.batch; + + // Stage 3: Connection release + // Return connections to server for response transmission + + for (auto it = batch.begin(); it != batch.end(); ++it) { + auto &entry = *it; + + // Partition work: thread 0 handles even indices, thread 1 handles odd + // indices + if (static_cast(it.index() % 2) != thread_index) { + continue; + } + + // Process non-shutdown entries with partitioning + std::visit( + [&](auto &&e) { + using T = std::decay_t; + + if constexpr (std::is_same_v) { + // Already handled above + ++shutdowns_received; + } else if constexpr (std::is_same_v) { + // Process commit entry: return connection to server + auto &commit_entry = e; + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return; // Skip this entry and continue processing + } + + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(commit_entry.protocol_context, + commit_entry.response_json, + std::move(commit_entry.request_arena)); + } else if constexpr (std::is_same_v) { + // Process status entry: return connection to server + auto &status_entry = e; + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return; // Skip this entry and continue processing + } + + // Send the JSON response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response(status_entry.protocol_context, + status_entry.response_json, + std::move(status_entry.request_arena)); + } else if constexpr (std::is_same_v) { + // Process health check entry: return connection to server + auto &health_check_entry = e; + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return; // Skip this entry and continue processing + } + + // Send the response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response( + health_check_entry.protocol_context, + health_check_entry.response_json, + std::move(health_check_entry.request_arena)); + } else if constexpr (std::is_same_v) { + auto &get_version_entry = e; + auto conn_ref = get_version_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently + return; // Skip this entry and continue processing + } + + // Send the response using protocol-agnostic interface + // HTTP formatting will happen in on_preprocess_writes() + conn_ref->send_response( + get_version_entry.protocol_context, + get_version_entry.response_json, + std::move(get_version_entry.request_arena)); + } + }, + entry); } - - // Process non-shutdown entries with partitioning - std::visit( - [&](auto &&e) { - using T = std::decay_t; - - if constexpr (std::is_same_v) { - // Already handled above - ++shutdowns_received; - } else if constexpr (std::is_same_v) { - // Process commit entry: return connection to server - auto &commit_entry = e; - auto conn_ref = commit_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return; // Skip this entry and continue processing - } - - // Send the JSON response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response(commit_entry.protocol_context, - commit_entry.response_json, - std::move(commit_entry.request_arena)); - } else if constexpr (std::is_same_v) { - // Process status entry: return connection to server - auto &status_entry = e; - auto conn_ref = status_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return; // Skip this entry and continue processing - } - - // Send the JSON response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response(status_entry.protocol_context, - status_entry.response_json, - std::move(status_entry.request_arena)); - } else if constexpr (std::is_same_v) { - // Process health check entry: return connection to server - auto &health_check_entry = e; - auto conn_ref = health_check_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return; // Skip this entry and continue processing - } - - // Send the response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response( - health_check_entry.protocol_context, - health_check_entry.response_json, - std::move(health_check_entry.request_arena)); - } else if constexpr (std::is_same_v) { - auto &get_version_entry = e; - auto conn_ref = get_version_entry.connection.lock(); - if (!conn_ref) { - // Connection is gone, drop the entry silently - return; // Skip this entry and continue processing - } - - // Send the response using protocol-agnostic interface - // HTTP formatting will happen in on_preprocess_writes() - conn_ref->send_response(get_version_entry.protocol_context, - get_version_entry.response_json, - std::move(get_version_entry.request_arena)); - } - }, - entry); } } diff --git a/src/commit_pipeline.hpp b/src/commit_pipeline.hpp index 675d83d..8c4dce5 100644 --- a/src/commit_pipeline.hpp +++ b/src/commit_pipeline.hpp @@ -3,9 +3,7 @@ #include #include #include -#include -#include "arena.hpp" #include "config.hpp" #include "pipeline_entry.hpp" #include "thread_pipeline.hpp" @@ -90,20 +88,9 @@ private: // Configuration reference const weaseldb::Config &config_; - // Pipeline state (sequence stage only) - int64_t next_version_ = 1; // Next version to assign (sequence thread only) - // Pipeline state (persist thread writes, other threads read) std::atomic committed_version_{0}; // Highest committed version - // Request ID deduplication (sequence stage only) - Arena banned_request_arena_; - using BannedRequestIdSet = - std::unordered_set, - std::equal_to, - ArenaStlAllocator>; - BannedRequestIdSet banned_request_ids_; - // Lock-free pipeline configuration static constexpr int lg_size = 16; // Ring buffer size (2^16 slots) static constexpr auto wait_strategy = WaitStrategy::WaitIfStageEmpty; @@ -118,14 +105,15 @@ private: std::thread release_thread_1_; std::thread release_thread_2_; - // Pipeline stage processing methods (batch-based) + // Pipeline stage main loops + void run_sequence_stage(); + void run_resolve_stage(); + void run_persist_stage(); + template void run_release_stage(); + + // Pipeline batch type alias using BatchType = StaticThreadPipeline::Batch; - void process_sequence_batch(BatchType &batch, int &shutdowns_received); - void process_resolve_batch(BatchType &batch, int &shutdowns_received); - void process_persist_batch(BatchType &batch, int &shutdowns_received); - void process_release_batch(BatchType &batch, int thread_index, - int &shutdowns_received); // Make non-copyable and non-movable CommitPipeline(const CommitPipeline &) = delete; diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index 66982b8..74c12cf 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -13,9 +13,9 @@ struct CommitRequest; */ struct CommitEntry { WeakRef connection; - int64_t assigned_version = 0; // Set by sequence stage - bool resolve_success = false; // Set by resolve stage - bool persist_success = false; // Set by persist stage + int64_t assigned_version = -1; // Set by sequence stage + bool resolve_success = false; // Set by resolve stage + bool persist_success = false; // Set by persist stage // Protocol-agnostic context (arena-allocated, protocol-specific) void *protocol_context = nullptr;