Two release threads

This commit is contained in:
2025-09-15 20:09:15 -04:00
parent 1acdc1e753
commit 5a88047b9f
3 changed files with 130 additions and 180 deletions

View File

@@ -23,60 +23,62 @@ CommitPipeline::CommitPipeline(const weaseldb::Config &config)
// Stage 0: Sequence assignment thread
sequence_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
for (int shutdowns_received = 0; shutdowns_received < 2;) {
auto guard = pipeline_.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
process_sequence_batch(guard.batch, shutdowns_received);
}
}};
// Stage 1: Precondition resolution thread
resolve_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) {
for (int shutdowns_received = 0; shutdowns_received < 2;) {
auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1);
if (process_resolve_batch(guard.batch)) {
return; // Shutdown signal received
}
process_resolve_batch(guard.batch, shutdowns_received);
}
}};
// Stage 2: Transaction persistence thread
persist_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist");
for (;;) {
for (int shutdowns_received = 0; shutdowns_received < 2;) {
auto guard = pipeline_.acquire<2, 0>();
if (process_persist_batch(guard.batch)) {
return; // Shutdown signal received
}
process_persist_batch(guard.batch, shutdowns_received);
}
}};
// Stage 3: Connection return to server thread
release_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release");
for (;;) {
// Stage 3: Connection return to server threads (2 threads)
release_thread_1_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release-1");
for (int shutdowns_received = 0; shutdowns_received < 1;) {
auto guard = pipeline_.acquire<3, 0>();
if (process_release_batch(guard.batch)) {
return; // Shutdown signal received
process_release_batch(guard.batch, 0, shutdowns_received);
}
}};
release_thread_2_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release-2");
for (int shutdowns_received = 0; shutdowns_received < 1;) {
auto guard = pipeline_.acquire<3, 1>();
process_release_batch(guard.batch, 1, shutdowns_received);
}
}};
}
CommitPipeline::~CommitPipeline() {
// Send single shutdown signal that flows through all pipeline stages
// Send two shutdown signals for both release threads (adjacent in same batch)
{
auto guard = pipeline_.push(1, true);
auto guard = pipeline_.push(2, true);
guard.batch[0] = ShutdownEntry{};
guard.batch[1] = ShutdownEntry{};
}
// Join all pipeline threads
sequence_thread_.join();
resolve_thread_.join();
persist_thread_.join();
release_thread_.join();
release_thread_1_.join();
release_thread_2_.join();
}
void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) {
@@ -93,7 +95,8 @@ void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) {
// Guard destructor publishes batch to stage 0
}
bool CommitPipeline::process_sequence_batch(BatchType &batch) {
void CommitPipeline::process_sequence_batch(BatchType &batch,
int &shutdowns_received) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
@@ -101,28 +104,17 @@ bool CommitPipeline::process_sequence_batch(BatchType &batch) {
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
if (!commit_entry.commit_request) {
// Should not happen - basic validation was done on I/O thread
conn_ref->send_response(commit_entry.protocol_context,
R"({"error":"Internal server error"})",
Arena{});
return false;
}
assert(commit_entry.commit_request);
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
@@ -130,32 +122,28 @@ bool CommitPipeline::process_sequence_batch(BatchType &batch) {
commit_entry.commit_request->request_id().has_value()) {
auto commit_request_id =
commit_entry.commit_request->request_id().value();
if (banned_request_ids_.find(commit_request_id) !=
banned_request_ids_.end()) {
if (banned_request_ids_.contains(commit_request_id)) {
// Request ID is banned, this commit should fail
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return; // Skip this entry and continue processing
}
conn_ref->send_response(
commit_entry.protocol_context,
R"({"status": "not_committed", "error": "request_id_banned"})",
Arena{});
return false;
return;
}
}
// Assign sequential version number
commit_entry.assigned_version = next_version_++;
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version
// upper bound
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
if (!status_entry.status_request_id.empty()) {
// Add request_id to banned list - store the string in arena and
// use string_view
std::string_view request_id_view =
@@ -169,33 +157,16 @@ bool CommitPipeline::process_sequence_batch(BatchType &batch) {
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version_ - 1;
}
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: noop in sequence stage
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool CommitPipeline::process_resolve_batch(BatchType &batch) {
void CommitPipeline::process_resolve_batch(BatchType &batch,
int &shutdowns_received) {
// Stage 1: Precondition resolution
// This stage must be serialized to maintain consistent database state view
// - Validate preconditions against current database state
@@ -203,87 +174,52 @@ bool CommitPipeline::process_resolve_batch(BatchType &batch) {
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
if (!commit_entry.commit_request) {
// Skip processing for failed sequence stage
return false;
}
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: perform configurable CPU work
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool CommitPipeline::process_persist_batch(BatchType &batch) {
void CommitPipeline::process_persist_batch(BatchType &batch,
int &shutdowns_received) {
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
// Check if connection is still alive first
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// Skip if resolve failed or connection is in error state
if (!commit_entry.commit_request || !commit_entry.resolve_success) {
return false;
return;
}
// Mark as persisted and update committed version high water mark
@@ -318,40 +254,23 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) {
commit_entry.response_json =
std::string_view(json_buffer, response_json.size());
return false; // Continue processing
return; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: generate not_committed response
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false;
}
// Store JSON response for release stage
status_entry.response_json = R"({"status": "not_committed"})";
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// Store plain text "OK" response for release stage
health_check_entry.response_json = "OK";
return false; // Continue processing
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
auto conn_ref = get_version_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
}
// TODO validate we're still the leader at some version > the
// proposed version for external consistency.
@@ -360,38 +279,40 @@ bool CommitPipeline::process_persist_batch(BatchType &batch) {
get_version_entry.request_arena,
R"({"version":%ld,"leader":""})", get_version_entry.version);
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool CommitPipeline::process_release_batch(BatchType &batch) {
void CommitPipeline::process_release_batch(BatchType &batch, int thread_index,
int &shutdowns_received) {
// Stage 3: Connection release
// Return connections to server for response transmission
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
for (auto it = batch.begin(); it != batch.end(); ++it) {
auto &entry = *it;
// Partition work: thread 0 handles even indices, thread 1 handles odd
// indices
if (static_cast<int>(it.index() % 2) != thread_index) {
continue;
}
// Process non-shutdown entries with partitioning
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
// Already handled above
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
return; // Skip this entry and continue processing
}
// Send the JSON response using protocol-agnostic interface
@@ -399,15 +320,13 @@ bool CommitPipeline::process_release_batch(BatchType &batch) {
conn_ref->send_response(commit_entry.protocol_context,
commit_entry.response_json,
std::move(commit_entry.request_arena));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
return; // Skip this entry and continue processing
}
// Send the JSON response using protocol-agnostic interface
@@ -415,15 +334,13 @@ bool CommitPipeline::process_release_batch(BatchType &batch) {
conn_ref->send_response(status_entry.protocol_context,
status_entry.response_json,
std::move(status_entry.request_arena));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: return connection to server
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
return; // Skip this entry and continue processing
}
// Send the response using protocol-agnostic interface
@@ -432,14 +349,12 @@ bool CommitPipeline::process_release_batch(BatchType &batch) {
health_check_entry.protocol_context,
health_check_entry.response_json,
std::move(health_check_entry.request_arena));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
auto conn_ref = get_version_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return false; // Skip this entry and continue processing
return; // Skip this entry and continue processing
}
// Send the response using protocol-agnostic interface
@@ -448,15 +363,7 @@ bool CommitPipeline::process_release_batch(BatchType &batch) {
get_version_entry.response_json,
std::move(get_version_entry.request_arena));
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}

View File

@@ -109,21 +109,23 @@ private:
static constexpr auto wait_strategy = WaitStrategy::WaitIfStageEmpty;
// 4-stage pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1> pipeline_;
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2> pipeline_;
// Stage processing threads
std::thread sequence_thread_;
std::thread resolve_thread_;
std::thread persist_thread_;
std::thread release_thread_;
std::thread release_thread_1_;
std::thread release_thread_2_;
// Pipeline stage processing methods (batch-based)
using BatchType =
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1>::Batch;
bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch);
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2>::Batch;
void process_sequence_batch(BatchType &batch, int &shutdowns_received);
void process_resolve_batch(BatchType &batch, int &shutdowns_received);
void process_persist_batch(BatchType &batch, int &shutdowns_received);
void process_release_batch(BatchType &batch, int thread_index,
int &shutdowns_received);
// Make non-copyable and non-movable
CommitPipeline(const CommitPipeline &) = delete;

View File

@@ -268,6 +268,47 @@ int check_producer_capacity(
// }
// // Guard destructor marks items as consumed and available to next stage
//
// Multi-Thread Stage Processing:
// When a stage has multiple threads (e.g., 1, 1, 1, 2 = 2 threads in stage 3):
//
// OVERLAPPING BATCHES - EACH THREAD SEES EVERY ENTRY:
// - Multiple threads in the same stage get OVERLAPPING batches from the ring
// buffer
// - Thread 0: calls acquire<3, 0>() - gets batch from ring positions 100-110
// - Thread 1: calls acquire<3, 1>() - gets batch from ring positions 100-110
// (SAME)
// - Both threads see the same entries and must coordinate processing
//
// PARTITIONING STRATEGIES:
// Choose your partitioning approach based on your use case:
//
// 1. Ring buffer position-based partitioning:
// for (auto it = batch.begin(); it != batch.end(); ++it) {
// if (it.index() % 2 != thread_index) continue; // Skip entries for other
// threads process(*it); // Process only entries assigned to this thread
// }
//
// 2. Entry content-based partitioning:
// for (auto& item : guard.batch) {
// if (hash(item.connection_id) % 2 != thread_index) continue;
// process(item); // Process based on entry properties
// }
//
// 3. Process all entries (when each thread does different work):
// for (auto& item : guard.batch) {
// process(item); // Both threads process all items, but differently
// }
//
// Common Partitioning Patterns:
// - Position-based: it.index() % num_threads == thread_index
// - Hash-based: hash(item.key) % num_threads == thread_index
// - Type-based: item.type == MY_THREAD_TYPE
// - Load balancing: assign work based on thread load
// - All entries: each thread processes all items but performs different
// operations
//
// Note: it.index() returns the position in the ring buffer (0 to buffer_size-1)
//
// Memory Model:
// - Ring buffer size must be power of 2 for efficient masking
// - Actual ring slots accessed via: index & (slotCount - 1)