Move thread local state into stack

This commit is contained in:
2025-09-15 20:33:45 -04:00
parent 5a88047b9f
commit 917066d8c0
3 changed files with 302 additions and 306 deletions

View File

@@ -2,6 +2,7 @@
#include <cstring> #include <cstring>
#include <pthread.h> #include <pthread.h>
#include <unordered_set>
#include "commit_request.hpp" #include "commit_request.hpp"
#include "cpu_work.hpp" #include "cpu_work.hpp"
@@ -16,52 +17,35 @@ auto banned_request_ids_memory_gauge =
.create({}); .create({});
CommitPipeline::CommitPipeline(const weaseldb::Config &config) CommitPipeline::CommitPipeline(const weaseldb::Config &config)
: config_(config), banned_request_ids_(ArenaStlAllocator<std::string_view>( : config_(config), pipeline_(lg_size) {
&banned_request_arena_)),
pipeline_(lg_size) {
// Stage 0: Sequence assignment thread // Stage 0: Sequence assignment thread
sequence_thread_ = std::thread{[this]() { sequence_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence"); pthread_setname_np(pthread_self(), "txn-sequence");
for (int shutdowns_received = 0; shutdowns_received < 2;) { run_sequence_stage();
auto guard = pipeline_.acquire<0, 0>();
process_sequence_batch(guard.batch, shutdowns_received);
}
}}; }};
// Stage 1: Precondition resolution thread // Stage 1: Precondition resolution thread
resolve_thread_ = std::thread{[this]() { resolve_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve"); pthread_setname_np(pthread_self(), "txn-resolve");
for (int shutdowns_received = 0; shutdowns_received < 2;) { run_resolve_stage();
auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1);
process_resolve_batch(guard.batch, shutdowns_received);
}
}}; }};
// Stage 2: Transaction persistence thread // Stage 2: Transaction persistence thread
persist_thread_ = std::thread{[this]() { persist_thread_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist"); pthread_setname_np(pthread_self(), "txn-persist");
for (int shutdowns_received = 0; shutdowns_received < 2;) { run_persist_stage();
auto guard = pipeline_.acquire<2, 0>();
process_persist_batch(guard.batch, shutdowns_received);
}
}}; }};
// Stage 3: Connection return to server threads (2 threads) // Stage 3: Connection return to server threads (2 threads)
release_thread_1_ = std::thread{[this]() { release_thread_1_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release-1"); pthread_setname_np(pthread_self(), "txn-release-1");
for (int shutdowns_received = 0; shutdowns_received < 1;) { run_release_stage<0>();
auto guard = pipeline_.acquire<3, 0>();
process_release_batch(guard.batch, 0, shutdowns_received);
}
}}; }};
release_thread_2_ = std::thread{[this]() { release_thread_2_ = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release-2"); pthread_setname_np(pthread_self(), "txn-release-2");
for (int shutdowns_received = 0; shutdowns_received < 1;) { run_release_stage<1>();
auto guard = pipeline_.acquire<3, 1>();
process_release_batch(guard.batch, 1, shutdowns_received);
}
}}; }};
} }
@@ -95,275 +79,299 @@ void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) {
// Guard destructor publishes batch to stage 0 // Guard destructor publishes batch to stage 0
} }
void CommitPipeline::process_sequence_batch(BatchType &batch, void CommitPipeline::run_sequence_stage() {
int &shutdowns_received) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
// - Request ID banned list management
for (auto &entry : batch) { int64_t next_version = 1;
// Pattern match on pipeline entry variant // Request ID deduplication (sequence stage only)
std::visit( Arena banned_request_arena;
[&](auto &&e) { using BannedRequestIdSet =
using T = std::decay_t<decltype(e)>; std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids{
ArenaStlAllocator<std::string_view>(&banned_request_arena)};
if constexpr (std::is_same_v<T, ShutdownEntry>) { for (int shutdowns_received = 0; shutdowns_received < 2;) {
++shutdowns_received; auto guard = pipeline_.acquire<0, 0>();
} else if constexpr (std::is_same_v<T, CommitEntry>) { auto &batch = guard.batch;
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
assert(commit_entry.commit_request); // Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
// - Request ID banned list management
// Check if request_id is banned (for status queries) for (auto &entry : batch) {
// Only check CommitRequest request_id, not HTTP header // Pattern match on pipeline entry variant
if (commit_entry.commit_request && std::visit(
commit_entry.commit_request->request_id().has_value()) { [&](auto &&e) {
auto commit_request_id = using T = std::decay_t<decltype(e)>;
commit_entry.commit_request->request_id().value();
if (banned_request_ids_.contains(commit_request_id)) { if constexpr (std::is_same_v<T, ShutdownEntry>) {
// Request ID is banned, this commit should fail ++shutdowns_received;
auto conn_ref = commit_entry.connection.lock(); } else if constexpr (std::is_same_v<T, CommitEntry>) {
if (!conn_ref) { // Process commit entry: check banned list, assign version
// Connection is gone, drop the entry silently auto &commit_entry = e;
return; // Skip this entry and continue processing
assert(commit_entry.commit_request);
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
if (commit_entry.commit_request &&
commit_entry.commit_request->request_id().has_value()) {
auto commit_request_id =
commit_entry.commit_request->request_id().value();
if (banned_request_ids.contains(commit_request_id)) {
// Request ID is banned, this commit should fail
commit_entry.response_json =
R"({"status": "not_committed", "error": "request_id_banned"})";
return;
} }
conn_ref->send_response( }
commit_entry.protocol_context,
R"({"status": "not_committed", "error": "request_id_banned"})", // Assign sequential version number
Arena{}); commit_entry.assigned_version = next_version++;
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get
// version upper bound
auto &status_entry = e;
// Add request_id to banned list - store the string in arena and
// use string_view
std::string_view request_id_view =
banned_request_arena.copy_string(
status_entry.status_request_id);
banned_request_ids.insert(request_id_view);
// Update memory usage metric
banned_request_ids_memory_gauge.set(
banned_request_arena.total_allocated());
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version - 1;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: noop in sequence stage
}
},
entry);
}
}
}
void CommitPipeline::run_resolve_stage() {
for (int shutdowns_received = 0; shutdowns_received < 2;) {
auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1);
auto &batch = guard.batch;
// Stage 1: Precondition resolution
// This stage must be serialized to maintain consistent database state view
// - Validate preconditions against current database state
// - Check for conflicts with other transactions
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
}
},
entry);
}
}
}
void CommitPipeline::run_persist_stage() {
for (int shutdowns_received = 0; shutdowns_received < 2;) {
auto guard = pipeline_.acquire<2, 0>();
auto &batch = guard.batch;
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
// Check if connection is still alive first
// Skip if resolve failed or connection is in error state
if (!commit_entry.commit_request ||
!commit_entry.resolve_success) {
return; return;
} }
// Mark as persisted and update committed version high water mark
commit_entry.persist_success = true;
committed_version_.store(commit_entry.assigned_version,
std::memory_order_seq_cst);
const CommitRequest &commit_request =
*commit_entry.commit_request;
// Generate success JSON response with actual assigned version
std::string_view response_json;
if (commit_request.request_id().has_value()) {
response_json = format(
commit_entry.request_arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(
commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response_json = format(
commit_entry.request_arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
// Store JSON response in arena for release stage
char *json_buffer =
commit_entry.request_arena.template allocate<char>(
response_json.size());
std::memcpy(json_buffer, response_json.data(),
response_json.size());
commit_entry.response_json =
std::string_view(json_buffer, response_json.size());
return; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: generate not_committed response
auto &status_entry = e;
// Store JSON response for release stage
status_entry.response_json = R"({"status": "not_committed"})";
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
// Store plain text "OK" response for release stage
health_check_entry.response_json = "OK";
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
// TODO validate we're still the leader at some version > the
// proposed version for external consistency.
// TODO include leader in response
get_version_entry.response_json = format(
get_version_entry.request_arena,
R"({"version":%ld,"leader":""})", get_version_entry.version);
} }
},
// Assign sequential version number entry);
commit_entry.assigned_version = next_version_++; }
} else if constexpr (std::is_same_v<T, StatusEntry>) { }
// Process status entry: add request_id to banned list, get version }
// upper bound
auto &status_entry = e; template <int thread_index> void CommitPipeline::run_release_stage() {
for (int shutdowns_received = 0; shutdowns_received < 1;) {
// Add request_id to banned list - store the string in arena and auto guard = pipeline_.acquire<3, thread_index>();
// use string_view auto &batch = guard.batch;
std::string_view request_id_view =
banned_request_arena_.copy_string( // Stage 3: Connection release
status_entry.status_request_id); // Return connections to server for response transmission
banned_request_ids_.insert(request_id_view);
for (auto it = batch.begin(); it != batch.end(); ++it) {
// Update memory usage metric auto &entry = *it;
banned_request_ids_memory_gauge.set(
banned_request_arena_.total_allocated()); // Partition work: thread 0 handles even indices, thread 1 handles odd
// indices
// Set version upper bound to current highest assigned version if (static_cast<int>(it.index() % 2) != thread_index) {
status_entry.version_upper_bound = next_version_ - 1; continue;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) { }
// Process health check entry: noop in sequence stage
} // Process non-shutdown entries with partitioning
}, std::visit(
entry); [&](auto &&e) {
} using T = std::decay_t<decltype(e)>;
}
if constexpr (std::is_same_v<T, ShutdownEntry>) {
void CommitPipeline::process_resolve_batch(BatchType &batch, // Already handled above
int &shutdowns_received) { ++shutdowns_received;
// Stage 1: Precondition resolution } else if constexpr (std::is_same_v<T, CommitEntry>) {
// This stage must be serialized to maintain consistent database state view // Process commit entry: return connection to server
// - Validate preconditions against current database state auto &commit_entry = e;
// - Check for conflicts with other transactions auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
for (auto &entry : batch) { // Connection is gone, drop the entry silently
// Pattern match on pipeline entry variant return; // Skip this entry and continue processing
std::visit( }
[&](auto &&e) {
using T = std::decay_t<decltype(e)>; // Send the JSON response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
if constexpr (std::is_same_v<T, ShutdownEntry>) { conn_ref->send_response(commit_entry.protocol_context,
++shutdowns_received; commit_entry.response_json,
} else if constexpr (std::is_same_v<T, CommitEntry>) { std::move(commit_entry.request_arena));
// Process commit entry: accept all commits (simplified } else if constexpr (std::is_same_v<T, StatusEntry>) {
// implementation) // Process status entry: return connection to server
auto &commit_entry = e; auto &status_entry = e;
// Accept all commits (simplified implementation) auto conn_ref = status_entry.connection.lock();
commit_entry.resolve_success = true; if (!conn_ref) {
} else if constexpr (std::is_same_v<T, StatusEntry>) { // Connection is gone, drop the entry silently
// Status entries are not processed in resolve stage return; // Skip this entry and continue processing
// They were already handled in sequence stage }
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Perform configurable CPU-intensive work for benchmarking // Send the JSON response using protocol-agnostic interface
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); // HTTP formatting will happen in on_preprocess_writes()
} conn_ref->send_response(status_entry.protocol_context,
}, status_entry.response_json,
entry); std::move(status_entry.request_arena));
} } else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
} // Process health check entry: return connection to server
auto &health_check_entry = e;
void CommitPipeline::process_persist_batch(BatchType &batch, auto conn_ref = health_check_entry.connection.lock();
int &shutdowns_received) { if (!conn_ref) {
// Stage 2: Transaction persistence // Connection is gone, drop the entry silently
// Mark everything as durable immediately (simplified implementation) return; // Skip this entry and continue processing
// In real implementation: batch S3 writes, update subscribers, etc. }
for (auto &entry : batch) { // Send the response using protocol-agnostic interface
// Pattern match on pipeline entry variant // HTTP formatting will happen in on_preprocess_writes()
std::visit( conn_ref->send_response(
[&](auto &&e) { health_check_entry.protocol_context,
using T = std::decay_t<decltype(e)>; health_check_entry.response_json,
std::move(health_check_entry.request_arena));
if constexpr (std::is_same_v<T, ShutdownEntry>) { } else if constexpr (std::is_same_v<T, GetVersionEntry>) {
++shutdowns_received; auto &get_version_entry = e;
} else if constexpr (std::is_same_v<T, CommitEntry>) { auto conn_ref = get_version_entry.connection.lock();
// Process commit entry: mark as durable, generate response if (!conn_ref) {
auto &commit_entry = e; // Connection is gone, drop the entry silently
// Check if connection is still alive first return; // Skip this entry and continue processing
}
// Skip if resolve failed or connection is in error state
if (!commit_entry.commit_request || !commit_entry.resolve_success) { // Send the response using protocol-agnostic interface
return; // HTTP formatting will happen in on_preprocess_writes()
} conn_ref->send_response(
get_version_entry.protocol_context,
// Mark as persisted and update committed version high water mark get_version_entry.response_json,
commit_entry.persist_success = true; std::move(get_version_entry.request_arena));
committed_version_.store(commit_entry.assigned_version, }
std::memory_order_seq_cst); },
entry);
const CommitRequest &commit_request = *commit_entry.commit_request;
// Generate success JSON response with actual assigned version
std::string_view response_json;
if (commit_request.request_id().has_value()) {
response_json = format(
commit_entry.request_arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response_json = format(
commit_entry.request_arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
// Store JSON response in arena for release stage
char *json_buffer =
commit_entry.request_arena.template allocate<char>(
response_json.size());
std::memcpy(json_buffer, response_json.data(),
response_json.size());
commit_entry.response_json =
std::string_view(json_buffer, response_json.size());
return; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: generate not_committed response
auto &status_entry = e;
// Store JSON response for release stage
status_entry.response_json = R"({"status": "not_committed"})";
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
// Store plain text "OK" response for release stage
health_check_entry.response_json = "OK";
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
// TODO validate we're still the leader at some version > the
// proposed version for external consistency.
// TODO include leader in response
get_version_entry.response_json = format(
get_version_entry.request_arena,
R"({"version":%ld,"leader":""})", get_version_entry.version);
}
},
entry);
}
}
void CommitPipeline::process_release_batch(BatchType &batch, int thread_index,
int &shutdowns_received) {
// Stage 3: Connection release
// Return connections to server for response transmission
for (auto it = batch.begin(); it != batch.end(); ++it) {
auto &entry = *it;
// Partition work: thread 0 handles even indices, thread 1 handles odd
// indices
if (static_cast<int>(it.index() % 2) != thread_index) {
continue;
} }
// Process non-shutdown entries with partitioning
std::visit(
[&](auto &&e) {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
// Already handled above
++shutdowns_received;
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto conn_ref = commit_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return; // Skip this entry and continue processing
}
// Send the JSON response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(commit_entry.protocol_context,
commit_entry.response_json,
std::move(commit_entry.request_arena));
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto conn_ref = status_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return; // Skip this entry and continue processing
}
// Send the JSON response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(status_entry.protocol_context,
status_entry.response_json,
std::move(status_entry.request_arena));
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: return connection to server
auto &health_check_entry = e;
auto conn_ref = health_check_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return; // Skip this entry and continue processing
}
// Send the response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(
health_check_entry.protocol_context,
health_check_entry.response_json,
std::move(health_check_entry.request_arena));
} else if constexpr (std::is_same_v<T, GetVersionEntry>) {
auto &get_version_entry = e;
auto conn_ref = get_version_entry.connection.lock();
if (!conn_ref) {
// Connection is gone, drop the entry silently
return; // Skip this entry and continue processing
}
// Send the response using protocol-agnostic interface
// HTTP formatting will happen in on_preprocess_writes()
conn_ref->send_response(get_version_entry.protocol_context,
get_version_entry.response_json,
std::move(get_version_entry.request_arena));
}
},
entry);
} }
} }

View File

@@ -3,9 +3,7 @@
#include <atomic> #include <atomic>
#include <span> #include <span>
#include <thread> #include <thread>
#include <unordered_set>
#include "arena.hpp"
#include "config.hpp" #include "config.hpp"
#include "pipeline_entry.hpp" #include "pipeline_entry.hpp"
#include "thread_pipeline.hpp" #include "thread_pipeline.hpp"
@@ -90,20 +88,9 @@ private:
// Configuration reference // Configuration reference
const weaseldb::Config &config_; const weaseldb::Config &config_;
// Pipeline state (sequence stage only)
int64_t next_version_ = 1; // Next version to assign (sequence thread only)
// Pipeline state (persist thread writes, other threads read) // Pipeline state (persist thread writes, other threads read)
std::atomic<int64_t> committed_version_{0}; // Highest committed version std::atomic<int64_t> committed_version_{0}; // Highest committed version
// Request ID deduplication (sequence stage only)
Arena banned_request_arena_;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids_;
// Lock-free pipeline configuration // Lock-free pipeline configuration
static constexpr int lg_size = 16; // Ring buffer size (2^16 slots) static constexpr int lg_size = 16; // Ring buffer size (2^16 slots)
static constexpr auto wait_strategy = WaitStrategy::WaitIfStageEmpty; static constexpr auto wait_strategy = WaitStrategy::WaitIfStageEmpty;
@@ -118,14 +105,15 @@ private:
std::thread release_thread_1_; std::thread release_thread_1_;
std::thread release_thread_2_; std::thread release_thread_2_;
// Pipeline stage processing methods (batch-based) // Pipeline stage main loops
void run_sequence_stage();
void run_resolve_stage();
void run_persist_stage();
template <int thread_index> void run_release_stage();
// Pipeline batch type alias
using BatchType = using BatchType =
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2>::Batch; StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 2>::Batch;
void process_sequence_batch(BatchType &batch, int &shutdowns_received);
void process_resolve_batch(BatchType &batch, int &shutdowns_received);
void process_persist_batch(BatchType &batch, int &shutdowns_received);
void process_release_batch(BatchType &batch, int thread_index,
int &shutdowns_received);
// Make non-copyable and non-movable // Make non-copyable and non-movable
CommitPipeline(const CommitPipeline &) = delete; CommitPipeline(const CommitPipeline &) = delete;

View File

@@ -13,9 +13,9 @@ struct CommitRequest;
*/ */
struct CommitEntry { struct CommitEntry {
WeakRef<MessageSender> connection; WeakRef<MessageSender> connection;
int64_t assigned_version = 0; // Set by sequence stage int64_t assigned_version = -1; // Set by sequence stage
bool resolve_success = false; // Set by resolve stage bool resolve_success = false; // Set by resolve stage
bool persist_success = false; // Set by persist stage bool persist_success = false; // Set by persist stage
// Protocol-agnostic context (arena-allocated, protocol-specific) // Protocol-agnostic context (arena-allocated, protocol-specific)
void *protocol_context = nullptr; void *protocol_context = nullptr;