Extract commit pipeline to its own module
This commit is contained in:
@@ -139,6 +139,7 @@ target_link_libraries(nanobench_impl PUBLIC nanobench)
|
|||||||
# Define all source files in one place
|
# Define all source files in one place
|
||||||
set(WEASELDB_SOURCES
|
set(WEASELDB_SOURCES
|
||||||
src/arena.cpp
|
src/arena.cpp
|
||||||
|
src/commit_pipeline.cpp
|
||||||
src/cpu_work.cpp
|
src/cpu_work.cpp
|
||||||
src/format.cpp
|
src/format.cpp
|
||||||
src/metric.cpp
|
src/metric.cpp
|
||||||
|
|||||||
434
src/commit_pipeline.cpp
Normal file
434
src/commit_pipeline.cpp
Normal file
@@ -0,0 +1,434 @@
|
|||||||
|
#include "commit_pipeline.hpp"
|
||||||
|
|
||||||
|
#include <cstring>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
|
#include "commit_request.hpp"
|
||||||
|
#include "cpu_work.hpp"
|
||||||
|
#include "format.hpp"
|
||||||
|
#include "metric.hpp"
|
||||||
|
|
||||||
|
// Metric for banned request IDs memory usage
|
||||||
|
auto banned_request_ids_memory_gauge =
|
||||||
|
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
|
||||||
|
"Memory used by banned request IDs arena")
|
||||||
|
.create({});
|
||||||
|
|
||||||
|
CommitPipeline::CommitPipeline(const weaseldb::Config &config)
|
||||||
|
: config_(config), banned_request_ids_(ArenaStlAllocator<std::string_view>(
|
||||||
|
&banned_request_arena_)),
|
||||||
|
pipeline_(lg_size) {
|
||||||
|
|
||||||
|
// Stage 0: Sequence assignment thread
|
||||||
|
sequence_thread_ = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-sequence");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = pipeline_.acquire<0, 0>();
|
||||||
|
if (process_sequence_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
|
||||||
|
// Stage 1: Precondition resolution thread
|
||||||
|
resolve_thread_ = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-resolve");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = pipeline_.acquire<1, 0>(/*maxBatch*/ 1);
|
||||||
|
if (process_resolve_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
|
||||||
|
// Stage 2: Transaction persistence thread
|
||||||
|
persist_thread_ = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-persist");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = pipeline_.acquire<2, 0>();
|
||||||
|
if (process_persist_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
|
||||||
|
// Stage 3: Connection return to server thread
|
||||||
|
release_thread_ = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-release");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = pipeline_.acquire<3, 0>();
|
||||||
|
if (process_release_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
}
|
||||||
|
|
||||||
|
CommitPipeline::~CommitPipeline() {
|
||||||
|
// Send single shutdown signal that flows through all pipeline stages
|
||||||
|
{
|
||||||
|
auto guard = pipeline_.push(1, true);
|
||||||
|
guard.batch[0] = ShutdownEntry{};
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join all pipeline threads
|
||||||
|
sequence_thread_.join();
|
||||||
|
resolve_thread_.join();
|
||||||
|
persist_thread_.join();
|
||||||
|
release_thread_.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
void CommitPipeline::submit_batch(std::span<PipelineEntry> entries) {
|
||||||
|
if (entries.empty()) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get pipeline guard for batch size
|
||||||
|
auto guard = pipeline_.push(entries.size(), /*block=*/true);
|
||||||
|
|
||||||
|
// Move entries into pipeline slots
|
||||||
|
std::move(entries.begin(), entries.end(), guard.batch.begin());
|
||||||
|
|
||||||
|
// Guard destructor publishes batch to stage 0
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CommitPipeline::process_sequence_batch(BatchType &batch) {
|
||||||
|
// Stage 0: Sequence assignment
|
||||||
|
// This stage performs ONLY work that requires serial processing:
|
||||||
|
// - Version/sequence number assignment (must be sequential)
|
||||||
|
// - Request ID banned list management
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: check banned list, assign version
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto conn_ref = commit_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!commit_entry.commit_request) {
|
||||||
|
// Should not happen - basic validation was done on I/O thread
|
||||||
|
conn_ref->send_response(commit_entry.protocol_context,
|
||||||
|
R"({"error":"Internal server error"})",
|
||||||
|
Arena{});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if request_id is banned (for status queries)
|
||||||
|
// Only check CommitRequest request_id, not HTTP header
|
||||||
|
if (commit_entry.commit_request &&
|
||||||
|
commit_entry.commit_request->request_id().has_value()) {
|
||||||
|
auto commit_request_id =
|
||||||
|
commit_entry.commit_request->request_id().value();
|
||||||
|
if (banned_request_ids_.find(commit_request_id) !=
|
||||||
|
banned_request_ids_.end()) {
|
||||||
|
// Request ID is banned, this commit should fail
|
||||||
|
conn_ref->send_response(
|
||||||
|
commit_entry.protocol_context,
|
||||||
|
R"({"status": "not_committed", "error": "request_id_banned"})",
|
||||||
|
Arena{});
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign sequential version number
|
||||||
|
commit_entry.assigned_version = next_version_++;
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Process status entry: add request_id to banned list, get version
|
||||||
|
// upper bound
|
||||||
|
auto &status_entry = e;
|
||||||
|
auto conn_ref = status_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!status_entry.status_request_id.empty()) {
|
||||||
|
// Add request_id to banned list - store the string in arena and
|
||||||
|
// use string_view
|
||||||
|
std::string_view request_id_view =
|
||||||
|
banned_request_arena_.copy_string(
|
||||||
|
status_entry.status_request_id);
|
||||||
|
banned_request_ids_.insert(request_id_view);
|
||||||
|
|
||||||
|
// Update memory usage metric
|
||||||
|
banned_request_ids_memory_gauge.set(
|
||||||
|
banned_request_arena_.total_allocated());
|
||||||
|
|
||||||
|
// Set version upper bound to current highest assigned version
|
||||||
|
status_entry.version_upper_bound = next_version_ - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
||||||
|
// Process health check entry: noop in sequence stage
|
||||||
|
auto &health_check_entry = e;
|
||||||
|
auto conn_ref = health_check_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CommitPipeline::process_resolve_batch(BatchType &batch) {
|
||||||
|
// Stage 1: Precondition resolution
|
||||||
|
// This stage must be serialized to maintain consistent database state view
|
||||||
|
// - Validate preconditions against current database state
|
||||||
|
// - Check for conflicts with other transactions
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: accept all commits (simplified
|
||||||
|
// implementation)
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto conn_ref = commit_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!commit_entry.commit_request) {
|
||||||
|
// Skip processing for failed sequence stage
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept all commits (simplified implementation)
|
||||||
|
commit_entry.resolve_success = true;
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Status entries are not processed in resolve stage
|
||||||
|
// They were already handled in sequence stage
|
||||||
|
return false;
|
||||||
|
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
||||||
|
// Process health check entry: perform configurable CPU work
|
||||||
|
auto &health_check_entry = e;
|
||||||
|
auto conn_ref = health_check_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Perform configurable CPU-intensive work for benchmarking
|
||||||
|
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CommitPipeline::process_persist_batch(BatchType &batch) {
|
||||||
|
// Stage 2: Transaction persistence
|
||||||
|
// Mark everything as durable immediately (simplified implementation)
|
||||||
|
// In real implementation: batch S3 writes, update subscribers, etc.
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: mark as durable, generate response
|
||||||
|
auto &commit_entry = e;
|
||||||
|
// Check if connection is still alive first
|
||||||
|
auto conn_ref = commit_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip if resolve failed or connection is in error state
|
||||||
|
if (!commit_entry.commit_request || !commit_entry.resolve_success) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as persisted and update committed version high water mark
|
||||||
|
commit_entry.persist_success = true;
|
||||||
|
committed_version_.store(commit_entry.assigned_version,
|
||||||
|
std::memory_order_seq_cst);
|
||||||
|
|
||||||
|
const CommitRequest &commit_request = *commit_entry.commit_request;
|
||||||
|
|
||||||
|
// Generate success JSON response with actual assigned version
|
||||||
|
std::string_view response_json;
|
||||||
|
if (commit_request.request_id().has_value()) {
|
||||||
|
response_json = format(
|
||||||
|
commit_entry.request_arena,
|
||||||
|
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||||
|
static_cast<int>(commit_request.request_id().value().size()),
|
||||||
|
commit_request.request_id().value().data(),
|
||||||
|
commit_entry.assigned_version);
|
||||||
|
} else {
|
||||||
|
response_json = format(
|
||||||
|
commit_entry.request_arena,
|
||||||
|
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||||
|
commit_entry.assigned_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store JSON response in arena for release stage
|
||||||
|
char *json_buffer =
|
||||||
|
commit_entry.request_arena.template allocate<char>(
|
||||||
|
response_json.size());
|
||||||
|
std::memcpy(json_buffer, response_json.data(),
|
||||||
|
response_json.size());
|
||||||
|
commit_entry.response_json =
|
||||||
|
std::string_view(json_buffer, response_json.size());
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Process status entry: generate not_committed response
|
||||||
|
auto &status_entry = e;
|
||||||
|
auto conn_ref = status_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store JSON response for release stage
|
||||||
|
status_entry.response_json = R"({"status": "not_committed"})";
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
||||||
|
// Process health check entry: generate OK response
|
||||||
|
auto &health_check_entry = e;
|
||||||
|
auto conn_ref = health_check_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store plain text "OK" response for release stage
|
||||||
|
health_check_entry.response_json = "OK";
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool CommitPipeline::process_release_batch(BatchType &batch) {
|
||||||
|
// Stage 3: Connection release
|
||||||
|
// Return connections to server for response transmission
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: return connection to server
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto conn_ref = commit_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the JSON response using protocol-agnostic interface
|
||||||
|
// HTTP formatting will happen in on_preprocess_writes()
|
||||||
|
conn_ref->send_response(commit_entry.protocol_context,
|
||||||
|
commit_entry.response_json,
|
||||||
|
std::move(commit_entry.request_arena));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Process status entry: return connection to server
|
||||||
|
auto &status_entry = e;
|
||||||
|
auto conn_ref = status_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the JSON response using protocol-agnostic interface
|
||||||
|
// HTTP formatting will happen in on_preprocess_writes()
|
||||||
|
conn_ref->send_response(status_entry.protocol_context,
|
||||||
|
status_entry.response_json,
|
||||||
|
std::move(status_entry.request_arena));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
||||||
|
// Process health check entry: return connection to server
|
||||||
|
auto &health_check_entry = e;
|
||||||
|
auto conn_ref = health_check_entry.connection.lock();
|
||||||
|
if (!conn_ref) {
|
||||||
|
// Connection is gone, drop the entry silently
|
||||||
|
return false; // Skip this entry and continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send the response using protocol-agnostic interface
|
||||||
|
// HTTP formatting will happen in on_preprocess_writes()
|
||||||
|
conn_ref->send_response(
|
||||||
|
health_check_entry.protocol_context,
|
||||||
|
health_check_entry.response_json,
|
||||||
|
std::move(health_check_entry.request_arena));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
133
src/commit_pipeline.hpp
Normal file
133
src/commit_pipeline.hpp
Normal file
@@ -0,0 +1,133 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
|
#include <span>
|
||||||
|
#include <thread>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
|
#include "arena.hpp"
|
||||||
|
#include "config.hpp"
|
||||||
|
#include "pipeline_entry.hpp"
|
||||||
|
#include "thread_pipeline.hpp"
|
||||||
|
|
||||||
|
/**
|
||||||
|
* High-performance 4-stage commit processing pipeline.
|
||||||
|
*
|
||||||
|
* Provides protocol-agnostic transaction processing through a lock-free
|
||||||
|
* multi-stage pipeline optimized for high throughput and low latency.
|
||||||
|
*
|
||||||
|
* Pipeline Stages:
|
||||||
|
* 1. Sequence: Version assignment and request ID deduplication
|
||||||
|
* 2. Resolve: Precondition validation and conflict detection
|
||||||
|
* 3. Persist: Transaction durability and response generation
|
||||||
|
* 4. Release: Connection return and response transmission
|
||||||
|
*
|
||||||
|
* Thread Safety:
|
||||||
|
* - submit_batch() is thread-safe for concurrent producers
|
||||||
|
* - Internal pipeline uses lock-free algorithms
|
||||||
|
* - Each stage runs on dedicated threads for optimal performance
|
||||||
|
*
|
||||||
|
* Usage:
|
||||||
|
* ```cpp
|
||||||
|
* CommitPipeline pipeline(config);
|
||||||
|
*
|
||||||
|
* // Build pipeline entries
|
||||||
|
* std::vector<PipelineEntry> entries;
|
||||||
|
* entries.emplace_back(CommitEntry(connection, context, request, arena));
|
||||||
|
*
|
||||||
|
* // Submit for processing
|
||||||
|
* pipeline.submit_batch(entries);
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
struct CommitPipeline {
|
||||||
|
/**
|
||||||
|
* Create pipeline with 4 processing stages.
|
||||||
|
*
|
||||||
|
* @param config Server configuration for pipeline tuning
|
||||||
|
*/
|
||||||
|
explicit CommitPipeline(const weaseldb::Config &config);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Destructor ensures clean shutdown and thread join.
|
||||||
|
* Sends shutdown signal through pipeline and waits for all stages to
|
||||||
|
* complete.
|
||||||
|
*/
|
||||||
|
~CommitPipeline();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Submit batch of pipeline entries for processing.
|
||||||
|
*
|
||||||
|
* Thread-safe method for submitting work to the pipeline. Entries flow
|
||||||
|
* through all 4 stages in order with proper synchronization.
|
||||||
|
*
|
||||||
|
* @param entries Span of pipeline entries to process
|
||||||
|
*
|
||||||
|
* Entry types:
|
||||||
|
* - CommitEntry: Full transaction processing through all stages
|
||||||
|
* - StatusEntry: Request status lookup with sequence stage processing
|
||||||
|
* - HealthCheckEntry: Health check with configurable CPU work
|
||||||
|
* - ShutdownEntry: Coordinated pipeline shutdown signal
|
||||||
|
*
|
||||||
|
* @note Thread Safety: Safe for concurrent calls from multiple threads
|
||||||
|
* @note Performance: Batching reduces pipeline contention - prefer larger
|
||||||
|
* batches
|
||||||
|
* @note Blocking: May block if pipeline is at capacity (backpressure)
|
||||||
|
*/
|
||||||
|
void submit_batch(std::span<PipelineEntry> entries);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the highest committed version number.
|
||||||
|
*
|
||||||
|
* @return Current committed version (persist thread writes, other threads
|
||||||
|
* read)
|
||||||
|
* @note Thread Safety: Safe to read from any thread
|
||||||
|
*/
|
||||||
|
int64_t get_committed_version() const {
|
||||||
|
return committed_version_.load(std::memory_order_seq_cst);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Configuration reference
|
||||||
|
const weaseldb::Config &config_;
|
||||||
|
|
||||||
|
// Pipeline state (sequence stage only)
|
||||||
|
int64_t next_version_ = 1; // Next version to assign (sequence thread only)
|
||||||
|
|
||||||
|
// Pipeline state (persist thread writes, other threads read)
|
||||||
|
std::atomic<int64_t> committed_version_{0}; // Highest committed version
|
||||||
|
|
||||||
|
// Request ID deduplication (sequence stage only)
|
||||||
|
Arena banned_request_arena_;
|
||||||
|
using BannedRequestIdSet =
|
||||||
|
std::unordered_set<std::string_view, std::hash<std::string_view>,
|
||||||
|
std::equal_to<std::string_view>,
|
||||||
|
ArenaStlAllocator<std::string_view>>;
|
||||||
|
BannedRequestIdSet banned_request_ids_;
|
||||||
|
|
||||||
|
// Lock-free pipeline configuration
|
||||||
|
static constexpr int lg_size = 16; // Ring buffer size (2^16 slots)
|
||||||
|
static constexpr auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
|
||||||
|
|
||||||
|
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
||||||
|
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1> pipeline_;
|
||||||
|
|
||||||
|
// Stage processing threads
|
||||||
|
std::thread sequence_thread_;
|
||||||
|
std::thread resolve_thread_;
|
||||||
|
std::thread persist_thread_;
|
||||||
|
std::thread release_thread_;
|
||||||
|
|
||||||
|
// Pipeline stage processing methods (batch-based)
|
||||||
|
using BatchType =
|
||||||
|
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1>::Batch;
|
||||||
|
bool process_sequence_batch(BatchType &batch);
|
||||||
|
bool process_resolve_batch(BatchType &batch);
|
||||||
|
bool process_persist_batch(BatchType &batch);
|
||||||
|
bool process_release_batch(BatchType &batch);
|
||||||
|
|
||||||
|
// Make non-copyable and non-movable
|
||||||
|
CommitPipeline(const CommitPipeline &) = delete;
|
||||||
|
CommitPipeline &operator=(const CommitPipeline &) = delete;
|
||||||
|
CommitPipeline(CommitPipeline &&) = delete;
|
||||||
|
CommitPipeline &operator=(CommitPipeline &&) = delete;
|
||||||
|
};
|
||||||
@@ -8,7 +8,6 @@
|
|||||||
#include "api_url_parser.hpp"
|
#include "api_url_parser.hpp"
|
||||||
#include "arena.hpp"
|
#include "arena.hpp"
|
||||||
#include "connection.hpp"
|
#include "connection.hpp"
|
||||||
#include "cpu_work.hpp"
|
|
||||||
#include "format.hpp"
|
#include "format.hpp"
|
||||||
#include "json_commit_request_parser.hpp"
|
#include "json_commit_request_parser.hpp"
|
||||||
#include "metric.hpp"
|
#include "metric.hpp"
|
||||||
@@ -30,12 +29,6 @@ thread_local auto version_counter =
|
|||||||
thread_local auto ok_counter =
|
thread_local auto ok_counter =
|
||||||
requests_counter_family.create({{"path", "/ok"}});
|
requests_counter_family.create({{"path", "/ok"}});
|
||||||
|
|
||||||
// Metric for banned request IDs memory usage
|
|
||||||
auto banned_request_ids_memory_gauge =
|
|
||||||
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
|
|
||||||
"Memory used by banned request IDs arena")
|
|
||||||
.create({});
|
|
||||||
|
|
||||||
HttpConnectionState::HttpConnectionState() {
|
HttpConnectionState::HttpConnectionState() {
|
||||||
llhttp_settings_init(&settings);
|
llhttp_settings_init(&settings);
|
||||||
|
|
||||||
@@ -82,11 +75,6 @@ void HttpHandler::on_preprocess_writes(
|
|||||||
for (auto &pending : pending_responses) {
|
for (auto &pending : pending_responses) {
|
||||||
auto *ctx = static_cast<HttpResponseContext *>(pending.protocol_context);
|
auto *ctx = static_cast<HttpResponseContext *>(pending.protocol_context);
|
||||||
|
|
||||||
printf(
|
|
||||||
"Processing response: sequence_id=%ld, request_id=%ld, json='%.*s'\n",
|
|
||||||
ctx->sequence_id, ctx->http_request_id,
|
|
||||||
(int)pending.response_json.size(), pending.response_json.data());
|
|
||||||
|
|
||||||
// Determine HTTP status code and content type from response content
|
// Determine HTTP status code and content type from response content
|
||||||
int status_code = 200;
|
int status_code = 200;
|
||||||
std::string_view content_type = "application/json";
|
std::string_view content_type = "application/json";
|
||||||
@@ -109,28 +97,21 @@ void HttpHandler::on_preprocess_writes(
|
|||||||
status_code, content_type, pending.response_json, pending.arena,
|
status_code, content_type, pending.response_json, pending.arena,
|
||||||
ctx->http_request_id, ctx->connection_close);
|
ctx->http_request_id, ctx->connection_close);
|
||||||
|
|
||||||
printf("Adding response to queue: sequence_id=%ld\n", ctx->sequence_id);
|
|
||||||
state->ready_responses[ctx->sequence_id] = ResponseData{
|
state->ready_responses[ctx->sequence_id] = ResponseData{
|
||||||
http_response, std::move(pending.arena), ctx->connection_close};
|
http_response, std::move(pending.arena), ctx->connection_close};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send responses in sequential order
|
// Send responses in sequential order
|
||||||
printf("Checking for sequential responses, next_sequence_to_send=%ld\n",
|
|
||||||
state->next_sequence_to_send);
|
|
||||||
auto iter = state->ready_responses.begin();
|
auto iter = state->ready_responses.begin();
|
||||||
while (iter != state->ready_responses.end() &&
|
while (iter != state->ready_responses.end() &&
|
||||||
iter->first == state->next_sequence_to_send) {
|
iter->first == state->next_sequence_to_send) {
|
||||||
auto &[sequence_id, response_data] = *iter;
|
auto &[sequence_id, response_data] = *iter;
|
||||||
|
|
||||||
printf("Sending response: sequence_id=%ld\n", sequence_id);
|
|
||||||
conn.append_bytes(response_data.data, std::move(response_data.arena),
|
conn.append_bytes(response_data.data, std::move(response_data.arena),
|
||||||
response_data.connection_close);
|
response_data.connection_close);
|
||||||
state->next_sequence_to_send++;
|
state->next_sequence_to_send++;
|
||||||
iter = state->ready_responses.erase(iter);
|
iter = state->ready_responses.erase(iter);
|
||||||
}
|
}
|
||||||
printf("After processing, next_sequence_to_send=%ld, "
|
|
||||||
"ready_responses.size()=%zu\n",
|
|
||||||
state->next_sequence_to_send, state->ready_responses.size());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -245,12 +226,10 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
|
|||||||
state->queue.clear();
|
state->queue.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send requests to 4-stage pipeline in batch. Batching here reduces
|
// Send requests to commit pipeline in batch. Batching here reduces
|
||||||
// contention on the way into the pipeline.
|
// contention on the way into the pipeline.
|
||||||
if (g_batch_entries.size() > 0) {
|
if (!g_batch_entries.empty()) {
|
||||||
auto guard = commitPipeline.push(g_batch_entries.size(), true);
|
commit_pipeline_.submit_batch(g_batch_entries);
|
||||||
std::move(g_batch_entries.begin(), g_batch_entries.end(),
|
|
||||||
guard.batch.begin());
|
|
||||||
}
|
}
|
||||||
g_batch_entries.clear();
|
g_batch_entries.clear();
|
||||||
}
|
}
|
||||||
@@ -305,9 +284,8 @@ void HttpHandler::handle_get_version(Connection &conn,
|
|||||||
version_counter.inc();
|
version_counter.inc();
|
||||||
|
|
||||||
// Generate JSON response
|
// Generate JSON response
|
||||||
auto json_response =
|
auto json_response = format(state.arena, R"({"version":%ld,"leader":""})",
|
||||||
format(state.arena, R"({"version":%ld,"leader":""})",
|
commit_pipeline_.get_committed_version());
|
||||||
this->committed_version.load(std::memory_order_seq_cst));
|
|
||||||
|
|
||||||
// Format HTTP response
|
// Format HTTP response
|
||||||
auto http_response =
|
auto http_response =
|
||||||
@@ -809,368 +787,3 @@ int HttpHandler::onMessageComplete(llhttp_t *parser) {
|
|||||||
state->message_complete = true;
|
state->message_complete = true;
|
||||||
return HPE_PAUSED;
|
return HPE_PAUSED;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipeline stage implementations (batch-based)
|
|
||||||
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
|
||||||
// Stage 0: Sequence assignment
|
|
||||||
// This stage performs ONLY work that requires serial processing:
|
|
||||||
// - Version/sequence number assignment (must be sequential)
|
|
||||||
// - Request ID banned list management
|
|
||||||
|
|
||||||
for (auto &entry : batch) {
|
|
||||||
// Pattern match on pipeline entry variant
|
|
||||||
bool should_shutdown = std::visit(
|
|
||||||
[&](auto &&e) -> bool {
|
|
||||||
using T = std::decay_t<decltype(e)>;
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
|
||||||
return true; // Signal shutdown
|
|
||||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
|
||||||
// Process commit entry: check banned list, assign version
|
|
||||||
auto &commit_entry = e;
|
|
||||||
auto conn_ref = commit_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!commit_entry.commit_request) {
|
|
||||||
// Should not happen - basic validation was done on I/O thread
|
|
||||||
conn_ref->send_response(commit_entry.protocol_context,
|
|
||||||
R"({"error":"Internal server error"})",
|
|
||||||
Arena{});
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if request_id is banned (for status queries)
|
|
||||||
// Only check CommitRequest request_id, not HTTP header
|
|
||||||
if (commit_entry.commit_request &&
|
|
||||||
commit_entry.commit_request->request_id().has_value()) {
|
|
||||||
auto commit_request_id =
|
|
||||||
commit_entry.commit_request->request_id().value();
|
|
||||||
if (banned_request_ids.find(commit_request_id) !=
|
|
||||||
banned_request_ids.end()) {
|
|
||||||
// Request ID is banned, this commit should fail
|
|
||||||
conn_ref->send_response(
|
|
||||||
commit_entry.protocol_context,
|
|
||||||
R"({"status": "not_committed", "error": "request_id_banned"})",
|
|
||||||
Arena{});
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Assign sequential version number
|
|
||||||
commit_entry.assigned_version = next_version++;
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
|
||||||
// Process status entry: add request_id to banned list, get version
|
|
||||||
// upper bound
|
|
||||||
auto &status_entry = e;
|
|
||||||
auto conn_ref = status_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!status_entry.status_request_id.empty()) {
|
|
||||||
// Add request_id to banned list - store the string in arena and
|
|
||||||
// use string_view
|
|
||||||
std::string_view request_id_view =
|
|
||||||
banned_request_arena.copy_string(
|
|
||||||
status_entry.status_request_id);
|
|
||||||
banned_request_ids.insert(request_id_view);
|
|
||||||
|
|
||||||
// Update memory usage metric
|
|
||||||
banned_request_ids_memory_gauge.set(
|
|
||||||
banned_request_arena.total_allocated());
|
|
||||||
|
|
||||||
// Set version upper bound to current highest assigned version
|
|
||||||
status_entry.version_upper_bound = next_version - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: Transfer to status threadpool - for now mark as processed
|
|
||||||
// Response will be generated in persist stage
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
|
||||||
// Process health check entry: noop in sequence stage
|
|
||||||
auto &health_check_entry = e;
|
|
||||||
auto conn_ref = health_check_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Unknown type, continue
|
|
||||||
},
|
|
||||||
entry);
|
|
||||||
|
|
||||||
if (should_shutdown) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
|
||||||
// Stage 1: Precondition resolution
|
|
||||||
// This stage must be serialized to maintain consistent database state view
|
|
||||||
// - Validate preconditions against current database state
|
|
||||||
// - Check for conflicts with other transactions
|
|
||||||
|
|
||||||
for (auto &entry : batch) {
|
|
||||||
// Pattern match on pipeline entry variant
|
|
||||||
bool should_shutdown = std::visit(
|
|
||||||
[&](auto &&e) -> bool {
|
|
||||||
using T = std::decay_t<decltype(e)>;
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
|
||||||
return true; // Signal shutdown
|
|
||||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
|
||||||
// Process commit entry: accept all commits (simplified
|
|
||||||
// implementation)
|
|
||||||
auto &commit_entry = e;
|
|
||||||
auto conn_ref = commit_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!commit_entry.commit_request) {
|
|
||||||
// Skip processing for failed sequence stage
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Accept all commits (simplified implementation)
|
|
||||||
commit_entry.resolve_success = true;
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
|
||||||
// Status entries are not processed in resolve stage
|
|
||||||
// They were already handled in sequence stage
|
|
||||||
return false;
|
|
||||||
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
|
||||||
// Process health check entry: perform configurable CPU work
|
|
||||||
auto &health_check_entry = e;
|
|
||||||
auto conn_ref = health_check_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Perform configurable CPU-intensive work for benchmarking
|
|
||||||
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Unknown type, continue
|
|
||||||
},
|
|
||||||
entry);
|
|
||||||
|
|
||||||
if (should_shutdown) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
|
||||||
// Stage 2: Transaction persistence
|
|
||||||
// Mark everything as durable immediately (simplified implementation)
|
|
||||||
// In real implementation: batch S3 writes, update subscribers, etc.
|
|
||||||
|
|
||||||
for (auto &entry : batch) {
|
|
||||||
// Pattern match on pipeline entry variant
|
|
||||||
bool should_shutdown = std::visit(
|
|
||||||
[&](auto &&e) -> bool {
|
|
||||||
using T = std::decay_t<decltype(e)>;
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
|
||||||
return true; // Signal shutdown
|
|
||||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
|
||||||
// Process commit entry: mark as durable, generate response
|
|
||||||
auto &commit_entry = e;
|
|
||||||
// Check if connection is still alive first
|
|
||||||
auto conn_ref = commit_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Skip if resolve failed or connection is in error state
|
|
||||||
if (!commit_entry.commit_request || !commit_entry.resolve_success) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Mark as persisted and update committed version high water mark
|
|
||||||
commit_entry.persist_success = true;
|
|
||||||
committed_version.store(commit_entry.assigned_version,
|
|
||||||
std::memory_order_seq_cst);
|
|
||||||
|
|
||||||
const CommitRequest &commit_request = *commit_entry.commit_request;
|
|
||||||
|
|
||||||
// Generate success JSON response with actual assigned version
|
|
||||||
std::string_view response_json;
|
|
||||||
if (commit_request.request_id().has_value()) {
|
|
||||||
response_json = format(
|
|
||||||
commit_entry.request_arena,
|
|
||||||
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
|
|
||||||
static_cast<int>(commit_request.request_id().value().size()),
|
|
||||||
commit_request.request_id().value().data(),
|
|
||||||
commit_entry.assigned_version);
|
|
||||||
} else {
|
|
||||||
response_json = format(
|
|
||||||
commit_entry.request_arena,
|
|
||||||
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
|
|
||||||
commit_entry.assigned_version);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store JSON response in arena for release stage
|
|
||||||
char *json_buffer =
|
|
||||||
commit_entry.request_arena.template allocate<char>(
|
|
||||||
response_json.size());
|
|
||||||
std::memcpy(json_buffer, response_json.data(),
|
|
||||||
response_json.size());
|
|
||||||
commit_entry.response_json =
|
|
||||||
std::string_view(json_buffer, response_json.size());
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
|
||||||
// Process status entry: generate not_committed response
|
|
||||||
auto &status_entry = e;
|
|
||||||
auto conn_ref = status_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store JSON response for release stage
|
|
||||||
status_entry.response_json = R"({"status": "not_committed"})";
|
|
||||||
|
|
||||||
return false;
|
|
||||||
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
|
||||||
// Process health check entry: generate OK response
|
|
||||||
auto &health_check_entry = e;
|
|
||||||
auto conn_ref = health_check_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store plain text "OK" response for release stage
|
|
||||||
health_check_entry.response_json = "OK";
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Unknown type, continue
|
|
||||||
},
|
|
||||||
entry);
|
|
||||||
|
|
||||||
if (should_shutdown) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
bool HttpHandler::process_release_batch(BatchType &batch) {
|
|
||||||
// Stage 3: Connection release
|
|
||||||
// Return connections to server for response transmission
|
|
||||||
|
|
||||||
for (auto &entry : batch) {
|
|
||||||
// Pattern match on pipeline entry variant
|
|
||||||
bool should_shutdown = std::visit(
|
|
||||||
[&](auto &&e) -> bool {
|
|
||||||
using T = std::decay_t<decltype(e)>;
|
|
||||||
|
|
||||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
|
||||||
return true; // Signal shutdown
|
|
||||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
|
||||||
// Process commit entry: return connection to server
|
|
||||||
auto &commit_entry = e;
|
|
||||||
auto conn_ref = commit_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send the JSON response using protocol-agnostic interface
|
|
||||||
// HTTP formatting will happen in on_preprocess_writes()
|
|
||||||
conn_ref->send_response(commit_entry.protocol_context,
|
|
||||||
commit_entry.response_json,
|
|
||||||
std::move(commit_entry.request_arena));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
|
||||||
// Process status entry: return connection to server
|
|
||||||
auto &status_entry = e;
|
|
||||||
auto conn_ref = status_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send the JSON response using protocol-agnostic interface
|
|
||||||
// HTTP formatting will happen in on_preprocess_writes()
|
|
||||||
conn_ref->send_response(status_entry.protocol_context,
|
|
||||||
status_entry.response_json,
|
|
||||||
std::move(status_entry.request_arena));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
|
||||||
// Process health check entry: return connection to server
|
|
||||||
auto &health_check_entry = e;
|
|
||||||
auto conn_ref = health_check_entry.connection.lock();
|
|
||||||
if (!conn_ref) {
|
|
||||||
// Connection is gone, drop the entry silently and increment
|
|
||||||
// metric
|
|
||||||
// TODO: Add dropped_pipeline_entries metric
|
|
||||||
return false; // Skip this entry and continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
// Send the response using protocol-agnostic interface
|
|
||||||
// HTTP formatting will happen in on_preprocess_writes()
|
|
||||||
conn_ref->send_response(
|
|
||||||
health_check_entry.protocol_context,
|
|
||||||
health_check_entry.response_json,
|
|
||||||
std::move(health_check_entry.request_arena));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Unknown type, continue
|
|
||||||
},
|
|
||||||
entry);
|
|
||||||
|
|
||||||
if (should_shutdown) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false; // Continue processing
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -13,11 +13,10 @@
|
|||||||
|
|
||||||
#include "api_url_parser.hpp"
|
#include "api_url_parser.hpp"
|
||||||
#include "arena.hpp"
|
#include "arena.hpp"
|
||||||
|
#include "commit_pipeline.hpp"
|
||||||
#include "config.hpp"
|
#include "config.hpp"
|
||||||
#include "connection.hpp"
|
#include "connection.hpp"
|
||||||
#include "connection_handler.hpp"
|
#include "connection_handler.hpp"
|
||||||
#include "pipeline_entry.hpp"
|
|
||||||
#include "thread_pipeline.hpp"
|
|
||||||
|
|
||||||
// Forward declarations
|
// Forward declarations
|
||||||
struct CommitRequest;
|
struct CommitRequest;
|
||||||
@@ -108,66 +107,7 @@ struct HttpConnectionState {
|
|||||||
*/
|
*/
|
||||||
struct HttpHandler : ConnectionHandler {
|
struct HttpHandler : ConnectionHandler {
|
||||||
explicit HttpHandler(const weaseldb::Config &config)
|
explicit HttpHandler(const weaseldb::Config &config)
|
||||||
: config_(config), banned_request_ids(ArenaStlAllocator<std::string_view>(
|
: config_(config), commit_pipeline_(config) {}
|
||||||
&banned_request_arena)) {
|
|
||||||
// Stage 0: Sequence assignment thread
|
|
||||||
sequenceThread = std::thread{[this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "txn-sequence");
|
|
||||||
for (;;) {
|
|
||||||
auto guard = commitPipeline.acquire<0, 0>();
|
|
||||||
if (process_sequence_batch(guard.batch)) {
|
|
||||||
return; // Shutdown signal received
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}};
|
|
||||||
|
|
||||||
// Stage 1: Precondition resolution thread
|
|
||||||
resolveThread = std::thread{[this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "txn-resolve");
|
|
||||||
for (;;) {
|
|
||||||
auto guard = commitPipeline.acquire<1, 0>(/*maxBatch*/ 1);
|
|
||||||
if (process_resolve_batch(guard.batch)) {
|
|
||||||
return; // Shutdown signal received
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}};
|
|
||||||
|
|
||||||
// Stage 2: Transaction persistence thread
|
|
||||||
persistThread = std::thread{[this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "txn-persist");
|
|
||||||
for (;;) {
|
|
||||||
auto guard = commitPipeline.acquire<2, 0>();
|
|
||||||
if (process_persist_batch(guard.batch)) {
|
|
||||||
return; // Shutdown signal received
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}};
|
|
||||||
|
|
||||||
// Stage 3: Connection return to server thread
|
|
||||||
releaseThread = std::thread{[this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "txn-release");
|
|
||||||
for (;;) {
|
|
||||||
auto guard = commitPipeline.acquire<3, 0>();
|
|
||||||
if (process_release_batch(guard.batch)) {
|
|
||||||
return; // Shutdown signal received
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
~HttpHandler() {
|
|
||||||
// Send single shutdown signal that flows through all pipeline stages
|
|
||||||
{
|
|
||||||
auto guard = commitPipeline.push(1, true);
|
|
||||||
guard.batch[0] =
|
|
||||||
ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
|
||||||
}
|
|
||||||
|
|
||||||
// Join all pipeline threads
|
|
||||||
sequenceThread.join();
|
|
||||||
resolveThread.join();
|
|
||||||
persistThread.join();
|
|
||||||
releaseThread.join();
|
|
||||||
}
|
|
||||||
|
|
||||||
void on_connection_established(Connection &conn) override;
|
void on_connection_established(Connection &conn) override;
|
||||||
void on_connection_closed(Connection &conn) override;
|
void on_connection_closed(Connection &conn) override;
|
||||||
@@ -188,47 +128,11 @@ struct HttpHandler : ConnectionHandler {
|
|||||||
static int onMessageComplete(llhttp_t *parser);
|
static int onMessageComplete(llhttp_t *parser);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
static constexpr int lg_size = 16;
|
|
||||||
|
|
||||||
// Configuration reference
|
// Configuration reference
|
||||||
const weaseldb::Config &config_;
|
const weaseldb::Config &config_;
|
||||||
|
|
||||||
// Pipeline state (sequence thread only)
|
// Commit processing pipeline
|
||||||
int64_t next_version = 1; // Next version to assign (sequence thread only)
|
CommitPipeline commit_pipeline_;
|
||||||
|
|
||||||
// Pipeline state (persist thread writes, I/O threads read)
|
|
||||||
std::atomic<int64_t> committed_version{
|
|
||||||
0}; // Highest committed version (persist thread writes, I/O threads read)
|
|
||||||
|
|
||||||
// Arena for banned request IDs and related data structures (sequence thread
|
|
||||||
// only)
|
|
||||||
Arena banned_request_arena;
|
|
||||||
using BannedRequestIdSet =
|
|
||||||
std::unordered_set<std::string_view, std::hash<std::string_view>,
|
|
||||||
std::equal_to<std::string_view>,
|
|
||||||
ArenaStlAllocator<std::string_view>>;
|
|
||||||
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
|
|
||||||
// (string_views into arena)
|
|
||||||
|
|
||||||
constexpr static auto wait_strategy = WaitStrategy::WaitIfUpstreamIdle;
|
|
||||||
|
|
||||||
// Main commit processing pipeline: sequence -> resolve -> persist -> release
|
|
||||||
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1> commitPipeline{
|
|
||||||
lg_size};
|
|
||||||
|
|
||||||
// Pipeline stage threads
|
|
||||||
std::thread sequenceThread;
|
|
||||||
std::thread resolveThread;
|
|
||||||
std::thread persistThread;
|
|
||||||
std::thread releaseThread;
|
|
||||||
|
|
||||||
// Pipeline stage processing methods (batch-based)
|
|
||||||
using BatchType =
|
|
||||||
StaticThreadPipeline<PipelineEntry, wait_strategy, 1, 1, 1, 1>::Batch;
|
|
||||||
bool process_sequence_batch(BatchType &batch);
|
|
||||||
bool process_resolve_batch(BatchType &batch);
|
|
||||||
bool process_persist_batch(BatchType &batch);
|
|
||||||
bool process_release_batch(BatchType &batch);
|
|
||||||
|
|
||||||
// Route handlers
|
// Route handlers
|
||||||
void handle_get_version(Connection &conn, HttpRequestState &state);
|
void handle_get_version(Connection &conn, HttpRequestState &state);
|
||||||
|
|||||||
Reference in New Issue
Block a user