Update /ok to serve dual health check/benchmarking role

This commit is contained in:
2025-09-05 12:39:10 -04:00
parent 761eaa552b
commit e67e4aee17
15 changed files with 265 additions and 53 deletions

View File

@@ -14,6 +14,7 @@ ConfigParser::load_from_file(const std::string &file_path) {
parse_server_config(toml_data, config.server);
parse_commit_config(toml_data, config.commit);
parse_subscription_config(toml_data, config.subscription);
parse_benchmark_config(toml_data, config.benchmark);
if (!validate_config(config)) {
return std::nullopt;
@@ -36,6 +37,7 @@ ConfigParser::parse_toml_string(const std::string &toml_content) {
parse_server_config(toml_data, config.server);
parse_commit_config(toml_data, config.commit);
parse_subscription_config(toml_data, config.subscription);
parse_benchmark_config(toml_data, config.benchmark);
if (!validate_config(config)) {
return std::nullopt;
@@ -146,6 +148,13 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
});
}
void ConfigParser::parse_benchmark_config(const auto &toml_data,
BenchmarkConfig &config) {
parse_section(toml_data, "benchmark", [&](const auto &bench) {
parse_field(bench, "ok_resolve_iterations", config.ok_resolve_iterations);
});
}
bool ConfigParser::validate_config(const Config &config) {
bool valid = true;

View File

@@ -74,6 +74,15 @@ struct SubscriptionConfig {
std::chrono::seconds keepalive_interval{30};
};
/**
* @brief Configuration settings for benchmarking and health check behavior.
*/
struct BenchmarkConfig {
/// CPU-intensive loop iterations for /ok requests in resolve stage
/// 0 = health check only, 7000 = default benchmark load (650ns, 1M req/s)
int ok_resolve_iterations = 7000;
};
/**
* @brief Top-level configuration container for all WeaselDB settings.
*/
@@ -81,6 +90,7 @@ struct Config {
ServerConfig server; ///< Server networking and request handling settings
CommitConfig commit; ///< Commit processing and validation settings
SubscriptionConfig subscription; ///< Subscription streaming settings
BenchmarkConfig benchmark; ///< Benchmarking and health check settings
};
/**
@@ -152,6 +162,8 @@ private:
static void parse_commit_config(const auto &toml_data, CommitConfig &config);
static void parse_subscription_config(const auto &toml_data,
SubscriptionConfig &config);
static void parse_benchmark_config(const auto &toml_data,
BenchmarkConfig &config);
};
} // namespace weaseldb

10
src/cpu_work.cpp Normal file
View File

@@ -0,0 +1,10 @@
#include "cpu_work.hpp"
void spend_cpu_cycles(int iterations) {
// Perform CPU-intensive work that cannot be optimized away
// Use inline assembly to prevent compiler optimization
for (int i = 0; i < iterations; ++i) {
// Simple loop with inline assembly barrier to prevent optimization
asm volatile("");
}
}

21
src/cpu_work.hpp Normal file
View File

@@ -0,0 +1,21 @@
#pragma once
/**
* @brief Perform CPU-intensive work for benchmarking and health check purposes.
*
* This function performs a deterministic amount of CPU work that cannot be
* optimized away by the compiler. It's used both in the health check resolve
* stage and in benchmarks to measure the actual CPU time consumed.
*
* @param iterations Number of loop iterations to perform
*/
void spend_cpu_cycles(int iterations);
/**
* @brief Default CPU work iterations for health check benchmarking.
*
* Represents the number of CPU-intensive loop iterations used in the
* /ok health check resolve stage. This value provides 650ns of CPU work
* and achieves 1M requests/second throughput through the 4-stage pipeline.
*/
constexpr int DEFAULT_HEALTH_CHECK_ITERATIONS = 7000;

View File

@@ -7,6 +7,7 @@
#include "api_url_parser.hpp"
#include "arena_allocator.hpp"
#include "cpu_work.hpp"
#include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp"
@@ -26,6 +27,8 @@ thread_local auto status_counter =
requests_counter_family.create({{"path", "/v1/status"}});
thread_local auto version_counter =
requests_counter_family.create({{"path", "/v1/version"}});
thread_local auto ok_counter =
requests_counter_family.create({{"path", "/ok"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
@@ -83,10 +86,10 @@ void HttpHandler::on_write_buffer_drained(
void HttpHandler::on_batch_complete(
std::span<std::unique_ptr<Connection>> batch) {
// Collect commit requests and status requests for pipeline processing
// Collect commit, status, and health check requests for pipeline processing
int pipeline_count = 0;
// Count both commit and status requests
// Count commit, status, and health check requests
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
@@ -102,6 +105,12 @@ void HttpHandler::on_batch_complete(
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
// Count health check requests
else if (state->route == HttpRoute::GetOk &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
}
}
@@ -123,6 +132,10 @@ void HttpHandler::on_batch_complete(
else if (state->route == HttpRoute::GetStatus) {
*out_iter++ = StatusEntry{std::move(conn)};
}
// Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) {
*out_iter++ = HealthCheckEntry{std::move(conn)};
}
}
}
}
@@ -384,9 +397,11 @@ void HttpHandler::handle_get_metrics(Connection &conn,
void HttpHandler::handle_get_ok(Connection &conn,
const HttpConnectionState &state) {
ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
// Health check requests are processed through the pipeline
// Response will be generated in the release stage after pipeline processing
}
void HttpHandler::handle_not_found(Connection &conn,
@@ -395,9 +410,9 @@ void HttpHandler::handle_not_found(Connection &conn,
}
// HTTP utility methods
void HttpHandler::sendResponse(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body, bool close_connection) {
void HttpHandler::send_response(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body, bool close_connection) {
ArenaAllocator &arena = conn.get_arena();
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
@@ -446,7 +461,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection) {
sendResponse(conn, status_code, "application/json", json, close_connection);
send_response(conn, status_code, "application/json", json, close_connection);
}
void HttpHandler::send_error_response(Connection &conn, int status_code,
@@ -674,6 +689,18 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
R"({"status": "not_committed"})",
state->connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: noop in sequence stage
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "sequence_health_check",
perfetto::Flow::Global(state->http_request_id));
}
return false; // Continue processing
}
@@ -725,6 +752,21 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: perform configurable CPU work
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "resolve_health_check",
perfetto::Flow::Global(state->http_request_id));
}
// Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
return false; // Continue processing
}
return false; // Unknown type, continue
@@ -798,6 +840,22 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "persist_health_check",
perfetto::Flow::Global(state->http_request_id));
// Generate OK response
send_response(*health_check_entry.connection, 200, "text/plain",
"OK", state->connection_close);
}
return false; // Continue processing
}
return false; // Unknown type, continue
@@ -853,6 +911,22 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(status_entry.connection));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: return connection to server
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_health_check",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(
std::move(health_check_entry.connection));
return false; // Continue processing
}

View File

@@ -10,9 +10,9 @@
#include "api_url_parser.hpp"
#include "arena_allocator.hpp"
#include "config.hpp"
#include "connection.hpp"
#include "connection_handler.hpp"
#include "loop_iterations.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
@@ -70,9 +70,9 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
struct HttpHandler : ConnectionHandler {
HttpHandler()
: banned_request_ids(
ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
explicit HttpHandler(const weaseldb::Config &config)
: config_(config), banned_request_ids(ArenaStlAllocator<std::string_view>(
&banned_request_arena)) {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
@@ -153,6 +153,9 @@ struct HttpHandler : ConnectionHandler {
private:
static constexpr int lg_size = 16;
// Configuration reference
const weaseldb::Config &config_;
// Pipeline state (sequence thread only)
int64_t next_version = 1; // Next version to assign (sequence thread only)
@@ -208,9 +211,10 @@ private:
void handle_not_found(Connection &conn, const HttpConnectionState &state);
// HTTP utilities
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body,
bool close_connection = false);
static void send_response(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body,
bool close_connection = false);
static void send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);

View File

@@ -1,3 +0,0 @@
#pragma once
constexpr int loop_iterations = 1725;

View File

@@ -252,12 +252,14 @@ int main(int argc, char *argv[]) {
std::cout << "Keepalive interval: "
<< config->subscription.keepalive_interval.count() << " seconds"
<< std::endl;
std::cout << "Health check resolve iterations: "
<< config->benchmark.ok_resolve_iterations << std::endl;
// Create listen sockets
std::vector<int> listen_fds = create_listen_sockets(*config);
// Create handler and server
HttpHandler http_handler;
HttpHandler http_handler(*config);
auto server = Server::create(*config, http_handler, listen_fds);
g_server = server.get();

View File

@@ -32,6 +32,19 @@ struct StatusEntry {
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for /ok health check requests.
* Flows through all pipeline stages as a noop except resolve stage.
* Resolve stage can perform configurable CPU work for benchmarking.
*/
struct HealthCheckEntry {
std::unique_ptr<Connection> connection;
HealthCheckEntry() = default; // Default constructor for variant
explicit HealthCheckEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup.
@@ -44,4 +57,5 @@ struct ShutdownEntry {
* Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately.
*/
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
using PipelineEntry =
std::variant<CommitEntry, StatusEntry, HealthCheckEntry, ShutdownEntry>;