Compare commits

...

12 Commits

44 changed files with 1757 additions and 696 deletions

View File

@@ -1,2 +1,2 @@
CompileFlags:
Add: [-Wno-vla-cxx-extension, -UNDEBUG]
Add: [-UNDEBUG]

View File

@@ -49,6 +49,12 @@ FetchContent_MakeAvailable(nlohmann_json)
set(RAPIDJSON_BUILD_TESTS
OFF
CACHE BOOL "Disable RapidJSON tests" FORCE)
set(RAPIDJSON_BUILD_DOC
OFF
CACHE BOOL "Disable RapidJSON documentation" FORCE)
set(RAPIDJSON_BUILD_EXAMPLES
OFF
CACHE BOOL "Disable RapidJSON examples" FORCE)
FetchContent_Declare(
RapidJSON
GIT_REPOSITORY https://github.com/Tencent/rapidjson.git
@@ -90,6 +96,15 @@ include_directories(src)
find_package(weaseljson REQUIRED)
# Suppress deprecated literal operator warnings globally (from nlohmann_json and
# toml11)
if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wno-unknown-warning-option
-Wno-deprecated-literal-operator)
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# GCC doesn't have deprecated-literal-operator warning, so no need to suppress
endif()
# Generate JSON token hash table using gperf
find_program(GPERF_EXECUTABLE gperf REQUIRED)
add_custom_command(
@@ -103,30 +118,8 @@ add_custom_command(
add_custom_target(generate_json_tokens
DEPENDS ${CMAKE_BINARY_DIR}/json_tokens.cpp)
set(SOURCES
src/main.cpp
src/config.cpp
src/connection.cpp
src/connection_registry.cpp
src/server.cpp
src/json_commit_request_parser.cpp
src/http_handler.cpp
src/arena_allocator.cpp
src/format.cpp
src/metric.cpp
src/process_collector.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_executable(weaseldb ${SOURCES})
add_dependencies(weaseldb generate_json_tokens)
target_link_libraries(
weaseldb
Threads::Threads
toml11::toml11
weaseljson
simdutf::simdutf
llhttp_static
perfetto)
add_executable(weaseldb src/main.cpp)
target_link_libraries(weaseldb weaseldb_sources)
enable_testing()
@@ -135,155 +128,143 @@ add_library(test_data STATIC benchmarks/test_data.cpp)
target_include_directories(test_data PUBLIC benchmarks)
target_link_libraries(test_data simdutf::simdutf)
add_executable(test_arena_allocator tests/test_arena_allocator.cpp
src/arena_allocator.cpp src/format.cpp)
target_link_libraries(test_arena_allocator doctest::doctest)
target_include_directories(test_arena_allocator PRIVATE src)
target_compile_options(test_arena_allocator PRIVATE -UNDEBUG)
# Create doctest implementation library
add_library(doctest_impl STATIC doctest_impl.cpp)
target_link_libraries(doctest_impl PUBLIC doctest::doctest)
# Create nanobench implementation library
add_library(nanobench_impl STATIC nanobench_impl.cpp)
target_link_libraries(nanobench_impl PUBLIC nanobench)
# Define all source files in one place
set(WEASELDB_SOURCES
src/arena.cpp
src/cpu_work.cpp
src/format.cpp
src/metric.cpp
src/json_commit_request_parser.cpp
src/api_url_parser.cpp
src/server.cpp
src/connection.cpp
src/connection_registry.cpp
src/http_handler.cpp
src/config.cpp
src/process_collector.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)
# Create library based on build type
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
# In debug builds, use single library with assertions enabled
add_library(weaseldb_sources STATIC ${WEASELDB_SOURCES})
add_dependencies(weaseldb_sources generate_json_tokens)
target_include_directories(weaseldb_sources PUBLIC src)
target_link_libraries(
weaseldb_sources PUBLIC simdutf::simdutf weaseljson Threads::Threads
llhttp_static toml11::toml11 perfetto)
target_compile_options(weaseldb_sources PRIVATE -UNDEBUG)
# Alias for tests to use same target name
add_library(weaseldb_sources_debug ALIAS weaseldb_sources)
else()
# In release builds, create both variants
add_library(weaseldb_sources STATIC ${WEASELDB_SOURCES})
add_dependencies(weaseldb_sources generate_json_tokens)
target_include_directories(weaseldb_sources PUBLIC src)
target_link_libraries(
weaseldb_sources PUBLIC simdutf::simdutf weaseljson Threads::Threads
llhttp_static toml11::toml11 perfetto)
# Debug version with assertions enabled for tests
add_library(weaseldb_sources_debug STATIC ${WEASELDB_SOURCES})
add_dependencies(weaseldb_sources_debug generate_json_tokens)
target_include_directories(weaseldb_sources_debug PUBLIC src)
target_link_libraries(
weaseldb_sources_debug PUBLIC simdutf::simdutf weaseljson Threads::Threads
llhttp_static toml11::toml11 perfetto)
target_compile_options(weaseldb_sources_debug PRIVATE -UNDEBUG)
endif()
add_executable(test_arena tests/test_arena.cpp)
target_link_libraries(test_arena doctest_impl weaseldb_sources_debug)
target_compile_options(test_arena PRIVATE -UNDEBUG)
add_executable(
test_commit_request
tests/test_commit_request.cpp src/json_commit_request_parser.cpp
tests/nlohmann_reference_parser.cpp tests/parser_comparison.cpp
src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(test_commit_request generate_json_tokens)
target_link_libraries(test_commit_request doctest::doctest weaseljson test_data
nlohmann_json::nlohmann_json simdutf::simdutf)
target_include_directories(test_commit_request PRIVATE src tests)
tests/test_commit_request.cpp tests/nlohmann_reference_parser.cpp
tests/parser_comparison.cpp)
target_link_libraries(test_commit_request doctest_impl weaseldb_sources_debug
test_data nlohmann_json::nlohmann_json)
target_include_directories(test_commit_request PRIVATE tests)
target_compile_options(test_commit_request PRIVATE -UNDEBUG)
add_executable(
test_http_handler
tests/test_http_handler.cpp
src/http_handler.cpp
src/server.cpp
src/config.cpp
src/json_commit_request_parser.cpp
src/arena_allocator.cpp
src/format.cpp
src/connection.cpp
src/connection_registry.cpp
src/metric.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(test_http_handler generate_json_tokens)
target_link_libraries(
test_http_handler
doctest::doctest
llhttp_static
Threads::Threads
toml11::toml11
perfetto
simdutf::simdutf
weaseljson)
target_include_directories(test_http_handler PRIVATE src)
target_compile_definitions(test_http_handler
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
add_executable(test_http_handler tests/test_http_handler.cpp)
target_link_libraries(test_http_handler doctest_impl weaseldb_sources_debug)
target_compile_options(test_http_handler PRIVATE -UNDEBUG)
add_executable(
test_server_connection_return
tests/test_server_connection_return.cpp
src/server.cpp
src/connection.cpp
src/connection_registry.cpp
src/arena_allocator.cpp
src/config.cpp
src/http_handler.cpp
src/json_commit_request_parser.cpp
src/format.cpp
src/metric.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(test_server_connection_return generate_json_tokens)
target_link_libraries(
test_server_connection_return
doctest::doctest
llhttp_static
Threads::Threads
toml11::toml11
perfetto
weaseljson
simdutf::simdutf)
target_include_directories(test_server_connection_return PRIVATE src)
target_compile_definitions(test_server_connection_return
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
add_executable(test_server_connection_return
tests/test_server_connection_return.cpp)
target_link_libraries(test_server_connection_return doctest_impl
weaseldb_sources_debug)
target_compile_options(test_server_connection_return PRIVATE -UNDEBUG)
# Metrics system test
add_executable(test_metric tests/test_metric.cpp src/metric.cpp
src/arena_allocator.cpp src/format.cpp)
target_link_libraries(test_metric doctest::doctest Threads::Threads
simdutf::simdutf weaseljson)
target_include_directories(test_metric PRIVATE src)
add_executable(test_metric tests/test_metric.cpp)
target_link_libraries(test_metric doctest_impl weaseldb_sources_debug)
target_compile_options(test_metric PRIVATE -UNDEBUG)
# Register with CTest
add_test(NAME metric_tests COMMAND test_metric)
add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp
src/arena_allocator.cpp)
target_link_libraries(bench_arena_allocator nanobench)
target_include_directories(bench_arena_allocator PRIVATE src)
add_executable(bench_arena benchmarks/bench_arena.cpp)
target_link_libraries(bench_arena nanobench_impl weaseldb_sources)
add_executable(bench_volatile_loop benchmarks/bench_volatile_loop.cpp)
target_link_libraries(bench_volatile_loop nanobench)
add_executable(bench_cpu_work benchmarks/bench_cpu_work.cpp src/cpu_work.cpp)
target_link_libraries(bench_cpu_work nanobench_impl)
add_executable(
bench_commit_request
benchmarks/bench_commit_request.cpp src/json_commit_request_parser.cpp
src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(bench_commit_request generate_json_tokens)
target_link_libraries(bench_commit_request nanobench weaseljson test_data
simdutf::simdutf)
target_include_directories(bench_commit_request PRIVATE src)
add_executable(bench_commit_request benchmarks/bench_commit_request.cpp)
target_link_libraries(bench_commit_request nanobench_impl weaseldb_sources
test_data)
add_executable(
bench_parser_comparison
benchmarks/bench_parser_comparison.cpp src/json_commit_request_parser.cpp
src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(bench_parser_comparison generate_json_tokens)
target_link_libraries(bench_parser_comparison nanobench weaseljson test_data
nlohmann_json::nlohmann_json simdutf::simdutf)
add_executable(bench_parser_comparison benchmarks/bench_parser_comparison.cpp)
target_link_libraries(bench_parser_comparison nanobench_impl weaseldb_sources
test_data nlohmann_json::nlohmann_json)
target_include_directories(bench_parser_comparison
PRIVATE src ${rapidjson_SOURCE_DIR}/include)
PRIVATE ${rapidjson_SOURCE_DIR}/include)
add_executable(bench_thread_pipeline benchmarks/bench_thread_pipeline.cpp)
target_link_libraries(bench_thread_pipeline nanobench Threads::Threads)
target_link_libraries(bench_thread_pipeline nanobench_impl Threads::Threads)
target_include_directories(bench_thread_pipeline PRIVATE src)
add_executable(bench_format_comparison benchmarks/bench_format_comparison.cpp
src/arena_allocator.cpp src/format.cpp)
target_link_libraries(bench_format_comparison nanobench)
target_include_directories(bench_format_comparison PRIVATE src)
add_executable(bench_format_comparison benchmarks/bench_format_comparison.cpp)
target_link_libraries(bench_format_comparison nanobench_impl weaseldb_sources)
# Metrics system benchmark
add_executable(bench_metric benchmarks/bench_metric.cpp src/metric.cpp
src/arena_allocator.cpp src/format.cpp)
target_link_libraries(bench_metric nanobench Threads::Threads simdutf::simdutf
weaseljson)
target_include_directories(bench_metric PRIVATE src)
add_executable(bench_metric benchmarks/bench_metric.cpp)
target_link_libraries(bench_metric nanobench_impl weaseldb_sources)
# Register benchmark with CTest
add_test(NAME metric_benchmarks COMMAND bench_metric)
# Debug tools
add_executable(
debug_arena tools/debug_arena.cpp src/json_commit_request_parser.cpp
src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(debug_arena generate_json_tokens)
target_link_libraries(debug_arena weaseljson simdutf::simdutf)
target_include_directories(debug_arena PRIVATE src)
add_executable(debug_arena tools/debug_arena.cpp)
target_link_libraries(debug_arena weaseldb_sources)
# Load tester
add_executable(load_tester tools/load_tester.cpp)
target_link_libraries(load_tester Threads::Threads llhttp_static perfetto)
add_test(NAME arena_allocator_tests COMMAND test_arena_allocator)
add_test(NAME arena_tests COMMAND test_arena)
add_test(NAME commit_request_tests COMMAND test_commit_request)
add_test(NAME http_handler_tests COMMAND test_http_handler)
add_test(NAME server_connection_return_tests
COMMAND test_server_connection_return)
add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator)
add_test(NAME arena_benchmarks COMMAND bench_arena)
add_test(NAME commit_request_benchmarks COMMAND bench_commit_request)
add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison)
add_test(NAME thread_pipeline_benchmarks COMMAND bench_thread_pipeline)
add_test(NAME format_comparison_benchmarks COMMAND bench_format_comparison)
add_executable(test_api_url_parser tests/test_api_url_parser.cpp)
target_link_libraries(test_api_url_parser doctest_impl weaseldb_sources_debug)
target_compile_options(test_api_url_parser PRIVATE -UNDEBUG)
add_test(NAME api_url_parser_tests COMMAND test_api_url_parser)

10
api.md
View File

@@ -257,6 +257,16 @@ Removes a retention policy, which may allow the log to be truncated.
-----
## `GET /ok`
Simple health check endpoint.
### Response
Returns `200 OK` with minimal content for basic health monitoring.
-----
## `GET /metrics`
Retrieves server metrics for monitoring.

View File

@@ -1,4 +1,4 @@
#include "arena_allocator.hpp"
#include "arena.hpp"
#include <nanobench.h>
#include <vector>
@@ -14,8 +14,8 @@ int main() {
{
// Arena allocator benchmark
ArenaAllocator arena;
bench.run("ArenaAllocator", [&] {
Arena arena;
bench.run("Arena", [&] {
void *ptr = arena.allocate_raw(alloc_size);
ankerl::nanobench::doNotOptimizeAway(ptr);
});

View File

@@ -0,0 +1,34 @@
#include <iostream>
#include <nanobench.h>
#include <string>
#include "../src/cpu_work.hpp"
int main(int argc, char *argv[]) {
int iterations = DEFAULT_HEALTH_CHECK_ITERATIONS;
if (argc > 1) {
try {
iterations = std::stoi(argv[1]);
if (iterations < 0) {
std::cerr << "Error: iterations must be non-negative" << std::endl;
return 1;
}
} catch (const std::exception &e) {
std::cerr << "Error: invalid number '" << argv[1] << "'" << std::endl;
return 1;
}
}
std::cout << "Benchmarking spend_cpu_cycles with " << iterations
<< " iterations" << std::endl;
ankerl::nanobench::Bench bench;
bench.minEpochIterations(10000);
// Benchmark the same CPU work that health checks use
bench.run("spend_cpu_cycles(" + std::to_string(iterations) + ")",
[&] { spend_cpu_cycles(iterations); });
return 0;
}

View File

@@ -1,4 +1,4 @@
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "format.hpp"
#include <cstdio>
#include <iomanip>
@@ -26,7 +26,7 @@ void benchmark_simple_concatenation() {
ankerl::nanobench::Bench bench;
bench.title("Simple Concatenation").unit("op").warmup(100);
ArenaAllocator arena(64);
Arena arena(64);
// Arena-based static_format
bench.run("static_format", [&] {
auto result = static_format(arena, "Hello ", "World", "!");
@@ -65,7 +65,7 @@ void benchmark_mixed_types() {
ankerl::nanobench::Bench bench;
bench.title("Mixed Types").unit("op").warmup(100);
ArenaAllocator arena(128);
Arena arena(128);
// Arena-based static_format
bench.run("static_format", [&] {
auto result =
@@ -106,7 +106,7 @@ void benchmark_complex_formatting() {
ankerl::nanobench::Bench bench;
bench.title("Complex Formatting").unit("op").warmup(100);
ArenaAllocator arena(128);
Arena arena(128);
// Arena-based format (static_format doesn't support printf specifiers)
bench.run("format", [&] {
auto result = format(arena, "%-10s %5d %8.2f", TEST_STRING.c_str(),
@@ -147,7 +147,7 @@ void benchmark_error_messages() {
constexpr int line_number = 123;
const std::string error_msg = "File not found";
ArenaAllocator arena(128);
Arena arena(128);
// Arena-based static_format (using string literals only)
bench.run("static_format", [&] {
auto result = static_format(arena, "Error ", error_code, ": ",
@@ -188,7 +188,7 @@ void benchmark_double_formatting() {
std::cout << "\n=== Simple Double Formatting ===\n";
// Validate that all formatters produce identical output
ArenaAllocator arena(128);
Arena arena(128);
auto static_result = static_format(arena, TEST_DOUBLE);
auto format_result = format(arena, "%.17g", TEST_DOUBLE);

View File

@@ -1,6 +1,6 @@
#include <nanobench.h>
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "metric.hpp"
#include <atomic>
@@ -62,7 +62,7 @@ struct ContentionEnvironment {
render_latch = std::make_unique<std::latch>(2);
background_threads.emplace_back([this]() {
ArenaAllocator arena;
Arena arena;
render_latch->arrive_and_wait(); // Render thread signals it's ready
@@ -201,7 +201,7 @@ int main() {
});
}
ArenaAllocator arena;
Arena arena;
std::string bench_name =
"render() - " + std::to_string(scale) + " metrics each type";

View File

@@ -14,9 +14,9 @@
using namespace weaseldb::test_data;
// Arena-based allocator adapter for RapidJSON
class RapidJsonArenaAllocator {
class RapidJsonArenaAdapter {
public:
explicit RapidJsonArenaAllocator(ArenaAllocator *arena) : arena_(arena) {}
explicit RapidJsonArenaAdapter(Arena *arena) : arena_(arena) {}
static const bool kNeedFree = false;
@@ -37,7 +37,7 @@ public:
}
private:
ArenaAllocator *arena_;
Arena *arena_;
};
// Arena-based RapidJSON SAX handler for commit request parsing
@@ -56,7 +56,7 @@ public:
std::string_view key, value, begin, end;
};
ArenaAllocator arena;
Arena arena;
bool valid = true;
std::string_view request_id, leader_id;
uint64_t read_version = 0;

View File

@@ -1,14 +0,0 @@
#include <nanobench.h>
#include "../src/loop_iterations.hpp"
int main() {
ankerl::nanobench::Bench bench;
bench.minEpochIterations(100000);
bench.run("volatile loop to " + std::to_string(loop_iterations), [&] {
for (volatile int i = 0; i < loop_iterations; i = i + 1)
;
});
return 0;
}

View File

@@ -18,9 +18,7 @@ Controls server networking, threading, and request handling behavior.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `bind_address` | string | `"127.0.0.1"` | IP address to bind the server to |
| `port` | integer | `8080` | Port number to listen on |
| `unix_socket_path` | string | `""` (empty) | Unix domain socket path. If specified, takes precedence over TCP |
| `interfaces` | array of objects | TCP on 127.0.0.1:8080 | Network interfaces to listen on. Each interface can be TCP or Unix socket |
| `max_request_size_bytes` | integer | `1048576` (1MB) | Maximum size for incoming requests. Requests exceeding this limit receive a `413 Content Too Large` response |
| `io_threads` | integer | `1` | Number of I/O threads for handling connections and network events |
| `epoll_instances` | integer | `io_threads` | Number of epoll instances to reduce kernel contention (max: io_threads). Lower values allow multiple threads per epoll for better load balancing, higher values reduce contention |
@@ -47,16 +45,25 @@ Controls behavior of the `/v1/subscribe` endpoint and SSE streaming.
| `max_buffer_size_bytes` | integer | `10485760` (10MB) | Maximum amount of unconsumed data to buffer for slow subscribers. Connections are closed if this limit is exceeded |
| `keepalive_interval_seconds` | integer | `30` | Interval between keepalive comments in the Server-Sent Events stream to prevent idle timeouts on network proxies |
### Benchmark Configuration (`[benchmark]`)
Controls benchmarking and health check behavior.
| Parameter | Type | Default | Description |
|-----------|------|---------|-------------|
| `ok_resolve_iterations` | integer | `4000` | CPU-intensive loop iterations for `/ok` requests in resolve stage. 0 = health check only, 4000 = default benchmark load (~650ns, 1M req/s) |
## Example Configuration
```toml
# WeaselDB Configuration File
[server]
# Network configuration
bind_address = "0.0.0.0"
port = 8080
# unix_socket_path = "weaseldb.sock" # Alternative to TCP
# Network interfaces - can specify multiple TCP and/or Unix socket interfaces
interfaces = [
{ type = "tcp", address = "0.0.0.0", port = 8080 },
# { type = "unix", path = "weaseldb.sock" }, # Alternative Unix socket
]
# Performance tuning
max_request_size_bytes = 2097152 # 2MB
@@ -74,6 +81,9 @@ request_id_retention_versions = 50000
[subscription]
max_buffer_size_bytes = 52428800 # 50MB
keepalive_interval_seconds = 15
[benchmark]
ok_resolve_iterations = 10000 # Higher load for performance testing
```
## Configuration Loading

View File

@@ -1,27 +1,39 @@
# WeaselDB Configuration File
# See config.md for complete documentation of all configuration options
[server]
# Network interfaces to listen on - production config with just TCP
# Network interfaces where WeaselDB will accept connections
# Options: TCP (address + port) or Unix domain sockets (path)
# For production, use TCP. For local testing, consider Unix sockets for better performance
interfaces = [
{ type = "tcp", address = "127.0.0.1", port = 8080 }
]
# Maximum request size in bytes (for 413 Content Too Large responses)
# Maximum size allowed for incoming requests (larger requests are rejected)
# Increase if you need to handle very large transaction payloads
max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events
# Number of worker threads handling network connections
# Start with 1, increase if CPU usage is high under load
io_threads = 1
# Event batch size for epoll processing
# Internal network processing batch size
# Higher values may improve throughput at cost of latency
event_batch_size = 32
[commit]
# Minimum length for request_id to ensure sufficient entropy
# Required minimum length for transaction request IDs
# Longer IDs reduce chance of accidental duplicates across clients
min_request_id_length = 20
# How long to retain request IDs for /v1/status queries (hours)
# How long to keep transaction status information available (hours)
# Used by status API to look up the outcome of completed transactions
request_id_retention_hours = 24
# Minimum number of versions to retain request IDs
# Alternative retention policy: keep transaction status for at least this many database versions
# Ensures status lookups work even during periods of low database activity
request_id_retention_versions = 100000000
[subscription]
# Maximum buffer size for unconsumed data in /v1/subscribe (bytes)
# Memory limit for buffering change stream data per subscriber (bytes)
# Subscribers that fall behind will be disconnected when this limit is reached
# See api.md for details on the subscription streaming API
max_buffer_size_bytes = 10485760 # 10MB
# Interval for sending keepalive comments to prevent idle timeouts (seconds)
# How often to send keep-alive messages to streaming subscribers (seconds)
# Prevents network timeouts during periods of no database activity
keepalive_interval_seconds = 30

View File

@@ -48,13 +48,20 @@ ninja test # or ctest
```
**Individual targets:**
- `./test_arena_allocator` - Arena allocator unit tests
- `./test_arena` - Arena allocator unit tests
- `./test_commit_request` - JSON parsing and validation tests
- `./test_http_handler` - HTTP protocol handling tests
- `./test_metric` - Metrics system tests
- `./test_api_url_parser` - API URL parsing tests
- `./test_server_connection_return` - Connection lifecycle tests
**Benchmarking:**
- `./bench_arena_allocator` - Memory allocation performance
- `./bench_arena` - Memory allocation performance
- `./bench_commit_request` - JSON parsing performance
- `./bench_parser_comparison` - Compare vs nlohmann::json and RapidJSON
- `./bench_metric` - Metrics system performance
- `./bench_thread_pipeline` - Lock-free pipeline performance
- `./bench_format_comparison` - String formatting performance
**Debug tools:**
- `./debug_arena` - Analyze arena allocator behavior
@@ -73,6 +80,9 @@ ninja test # or ctest
- **toml11** - TOML configuration parsing
- **doctest** - Testing framework
- **nanobench** - Benchmarking library
- **nlohmann/json** - JSON library (used in benchmarks)
- **RapidJSON** - High-performance JSON library (used in benchmarks)
- **llhttp** - Fast HTTP parser
---
@@ -80,7 +90,7 @@ ninja test # or ctest
### Core Components
#### **Arena Allocator** (`src/arena_allocator.hpp`)
#### **Arena Allocator** (`src/arena.hpp`)
Ultra-fast memory allocator optimized for request/response patterns:
@@ -119,7 +129,7 @@ Ultra-fast memory allocator optimized for request/response patterns:
- **Streaming data processing** with partial message handling
- **Connection lifecycle hooks** for initialization and cleanup
#### **Thread Pipeline** (`src/ThreadPipeline.h`)
#### **Thread Pipeline** (`src/thread_pipeline.hpp`)
A high-performance, multi-stage, lock-free pipeline for inter-thread communication.
@@ -280,7 +290,7 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions.
- **Server Creation**: Always use `Server::create()` factory method - direct construction is impossible
- **Connection Creation**: Only the Server can create connections - no public constructor or factory method
- **Connection Ownership**: Use unique_ptr semantics for safe ownership transfer between components
- **Arena Allocator Pattern**: Always use `ArenaAllocator` for temporary allocations within request processing
- **Arena Allocator Pattern**: Always use `Arena` for temporary allocations within request processing
- **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory
- **Ownership Transfer**: Use `Server::release_back_to_server()` for returning connections to server from handlers
- **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition
@@ -365,7 +375,7 @@ class HttpHandler : public ConnectionHandler {
public:
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
// Parse HTTP request using connection's arena
ArenaAllocator& arena = conn_ptr->get_arena();
Arena& arena = conn_ptr->get_arena();
// Generate response
conn_ptr->append_message("HTTP/1.1 200 OK\r\n\r\nHello World");
@@ -458,7 +468,7 @@ public:
#### Arena-Based String Handling
```cpp
// Preferred: String view with arena allocation to minimize copying
std::string_view process_json_key(const char* data, ArenaAllocator& arena);
std::string_view process_json_key(const char* data, Arena& arena);
// Avoid: Unnecessary string copies
std::string process_json_key(const char* data);
@@ -503,13 +513,13 @@ ParseResult parse_commit_request(const char* json, CommitRequest& out);
### Build Targets
**Test Executables:**
- `test_arena_allocator` - Arena allocator functionality tests
- `test_arena` - Arena allocator functionality tests
- `test_commit_request` - JSON parsing and validation tests
- `test_metric` - Metrics system functionality tests
- Main server executable (compiled from `src/main.cpp`)
**Benchmark Executables:**
- `bench_arena_allocator` - Arena allocator performance benchmarks
- `bench_arena` - Arena allocator performance benchmarks
- `bench_commit_request` - JSON parsing performance benchmarks
- `bench_parser_comparison` - Comparison benchmarks vs nlohmann::json and RapidJSON
- `bench_metric` - Metrics system performance benchmarks

2
doctest_impl.cpp Normal file
View File

@@ -0,0 +1,2 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest/doctest.h>

2
nanobench_impl.cpp Normal file
View File

@@ -0,0 +1,2 @@
#define ANKERL_NANOBENCH_IMPLEMENT
#include <nanobench.h>

250
src/api_url_parser.cpp Normal file
View File

@@ -0,0 +1,250 @@
#include "api_url_parser.hpp"
#include <cassert>
#include <string_view>
namespace {
// RFC 3986 hex digit to value conversion
// Returns -1 for invalid hex digits
int hex_digit_to_value(char c) {
if (c >= '0' && c <= '9')
return c - '0';
if (c >= 'A' && c <= 'F')
return c - 'A' + 10;
if (c >= 'a' && c <= 'f')
return c - 'a' + 10;
return -1;
}
// Decode percent-encoded sequence at src
// Returns decoded byte value, or -1 for malformed encoding
int decode_percent_sequence(const char *src) {
if (src[0] != '%')
return -1;
int high = hex_digit_to_value(src[1]);
int low = hex_digit_to_value(src[2]);
if (high == -1 || low == -1)
return -1;
return (high << 4) | low;
}
// Decode RFC 3986 percent-encoding in place (for path segments)
// Returns new length, or -1 for malformed encoding
int decode_path_segment(char *data, int length) {
char *write_pos = data;
const char *read_pos = data;
const char *end = data + length;
while (read_pos < end) {
if (*read_pos == '%') {
if (read_pos + 2 >= end)
return -1; // Incomplete sequence
int decoded = decode_percent_sequence(read_pos);
if (decoded == -1)
return -1; // Malformed sequence
*write_pos++ = static_cast<char>(decoded);
read_pos += 3;
} else {
*write_pos++ = *read_pos++;
}
}
return static_cast<int>(write_pos - data);
}
// Decode application/x-www-form-urlencoded in place (for query parameters)
// Handles + → space conversion, then percent-decoding
// Returns new length, or -1 for malformed encoding
int decode_query_value(char *data, int length) {
char *write_pos = data;
const char *read_pos = data;
const char *end = data + length;
while (read_pos < end) {
if (*read_pos == '+') {
*write_pos++ = ' ';
read_pos++;
} else if (*read_pos == '%') {
if (read_pos + 2 >= end)
return -1; // Incomplete sequence
int decoded = decode_percent_sequence(read_pos);
if (decoded == -1)
return -1; // Malformed sequence
*write_pos++ = static_cast<char>(decoded);
read_pos += 3;
} else {
*write_pos++ = *read_pos++;
}
}
return static_cast<int>(write_pos - data);
}
// A simplified helper to find a delimiter in a buffer
// Returns the position of the delimiter, or -1 if not found
int find_delimiter(const char *data, int length, char delimiter) {
for (int i = 0; i < length; ++i) {
if (data[i] == delimiter) {
return i;
}
}
return -1;
}
// Maps a string parameter key to its corresponding enum value.
// Unrecognized keys are ignored as per the design.
[[nodiscard]] std::optional<ApiParameterKey>
to_api_parameter_key(std::string_view key) {
if (key == "request_id")
return ApiParameterKey::RequestId;
if (key == "min_version")
return ApiParameterKey::MinVersion;
return std::nullopt;
}
// Parses the query string part of a URL (in-place decoding)
// Returns ParseResult::Success or ParseResult::MalformedEncoding
ParseResult parse_query_string(char *query_data, int query_length,
RouteMatch &match) {
int pos = 0;
while (pos < query_length) {
// Find end of current key=value pair
int pair_end = find_delimiter(query_data + pos, query_length - pos, '&');
if (pair_end == -1)
pair_end = query_length - pos;
// Find = separator within the pair
int eq_pos = find_delimiter(query_data + pos, pair_end, '=');
if (eq_pos == -1) {
// No value, skip this parameter
pos += pair_end + 1;
continue;
}
// Decode key and value in place
char *key_start = query_data + pos;
int key_length = eq_pos;
char *value_start = query_data + pos + eq_pos + 1;
int value_length = pair_end - eq_pos - 1;
// Decode value (query parameters use form encoding)
int decoded_value_length = decode_query_value(value_start, value_length);
if (decoded_value_length == -1) {
return ParseResult::MalformedEncoding;
}
// Check if this is a parameter we care about
std::string_view key_view(key_start, key_length);
if (auto key_enum = to_api_parameter_key(key_view)) {
match.params[static_cast<int>(*key_enum)] =
std::string_view(value_start, decoded_value_length);
}
pos += pair_end + 1;
}
return ParseResult::Success;
}
} // namespace
ParseResult ApiUrlParser::parse(std::string_view method, char *url_data,
int url_length, RouteMatch &result) {
assert(url_data != nullptr);
assert(url_length >= 0);
// Find query string separator
int query_start = find_delimiter(url_data, url_length, '?');
char *path_data = url_data;
int path_length = (query_start == -1) ? url_length : query_start;
char *query_data = (query_start == -1) ? nullptr : url_data + query_start + 1;
int query_length = (query_start == -1) ? 0 : url_length - query_start - 1;
// Parse and decode query string first
if (query_data && query_length > 0) {
ParseResult query_result =
parse_query_string(query_data, query_length, result);
if (query_result != ParseResult::Success) {
return query_result;
}
}
// Decode path segment (RFC 3986 percent-decoding)
int decoded_path_length = decode_path_segment(path_data, path_length);
if (decoded_path_length == -1) {
return ParseResult::MalformedEncoding;
}
std::string_view path(path_data, decoded_path_length);
// Route matching with decoded path
if (method == "GET") {
if (path == "/v1/version") {
result.route = HttpRoute::GetVersion;
return ParseResult::Success;
}
if (path == "/v1/subscribe") {
result.route = HttpRoute::GetSubscribe;
return ParseResult::Success;
}
if (path == "/v1/status") {
result.route = HttpRoute::GetStatus;
return ParseResult::Success;
}
if (path.starts_with("/v1/retention")) {
result.route = HttpRoute::GetRetention;
// Note: This matches both /v1/retention and /v1/retention/{id}
if (path.length() > 13) { // length of "/v1/retention"
std::string_view policy_id =
path.substr(14); // length of "/v1/retention/"
if (!policy_id.empty()) {
result.params[static_cast<int>(ApiParameterKey::PolicyId)] =
policy_id;
}
}
return ParseResult::Success;
}
if (path == "/metrics") {
result.route = HttpRoute::GetMetrics;
return ParseResult::Success;
}
if (path == "/ok") {
result.route = HttpRoute::GetOk;
return ParseResult::Success;
}
} else if (method == "POST") {
if (path == "/v1/commit") {
result.route = HttpRoute::PostCommit;
return ParseResult::Success;
}
} else if (method == "PUT") {
if (path.starts_with("/v1/retention/")) {
result.route = HttpRoute::PutRetention;
std::string_view policy_id = path.substr(14);
result.params[static_cast<int>(ApiParameterKey::PolicyId)] = policy_id;
return ParseResult::Success;
}
} else if (method == "DELETE") {
if (path.starts_with("/v1/retention/")) {
result.route = HttpRoute::DeleteRetention;
std::string_view policy_id = path.substr(14);
result.params[static_cast<int>(ApiParameterKey::PolicyId)] = policy_id;
return ParseResult::Success;
}
}
result.route = HttpRoute::NotFound;
return ParseResult::Success;
}

104
src/api_url_parser.hpp Normal file
View File

@@ -0,0 +1,104 @@
#pragma once
#include <array>
#include <optional>
#include <string_view>
/**
* @brief Defines all HTTP routes supported by the WeaselDB server.
*/
enum class HttpRoute {
GetVersion,
PostCommit,
GetSubscribe,
GetStatus,
PutRetention,
GetRetention,
DeleteRetention,
GetMetrics,
GetOk,
NotFound
};
/**
* @brief Defines unique keys for all known URL and query parameters in the API.
* @note This allows for O(1) lookup of parameter values in a fixed-size array.
*/
enum class ApiParameterKey {
// --- Query Parameters ---
RequestId,
MinVersion,
// --- URL Parameters ---
PolicyId,
// --- Sentinel for array size ---
Count
};
/**
* @brief A fixed-size array for storing parsed parameter values from a URL.
*
* It is indexed by the ApiParameterKey enum. The value is a string_view into
* the original URL string, making lookups allocation-free.
*/
using ApiParameters = std::array<std::optional<std::string_view>,
static_cast<size_t>(ApiParameterKey::Count)>;
/**
* @brief Result codes for URL parsing operations.
*/
enum class [[nodiscard]] ParseResult { Success, MalformedEncoding };
/**
* @brief Contains the complete, structured result of a successful URL parse.
*
* This struct is the output of the ApiUrlParser and contains everything
* a handler needs to process a request, with no further parsing required.
*/
struct RouteMatch {
/**
* @brief The specific API endpoint that was matched.
*/
HttpRoute route;
/**
* @brief A fixed-size array containing all parsed URL and query parameters.
*/
ApiParameters params;
};
/**
* @brief A parser that matches a URL against the fixed WeaselDB API.
*
* This class provides a single static method to parse a URL and method
* into a structured RouteMatch object. It is designed to be a high-performance,
* allocation-free parser with a simple interface.
*/
class ApiUrlParser {
public:
/**
* @brief Parses a URL and HTTP method against the known WeaselDB API
* endpoints.
*
* **Mutates in place**: This function performs RFC 3986 percent-decoding
* directly on the provided URL buffer. Path segments are decoded according
* to RFC 3986, while query parameters follow
* application/x-www-form-urlencoded rules (+ → space, then %XX → bytes).
*
* @param method The HTTP method of the request.
* @param url_data Mutable buffer containing the URL (will be modified
* in-place).
* @param url_length Length of the URL data in bytes.
* @param out_match Output parameter for the parsed route and parameters.
* @return ParseResult::Success on successful parsing,
* ParseResult::MalformedEncoding if the URL contains invalid percent-encoding
* sequences.
* @note On success, string_view parameters in out_match point into the
* decoded url_data buffer and remain valid while url_data is unchanged.
* @note On error, url_data contents are undefined and should not be used.
*/
[[nodiscard]] static ParseResult parse(std::string_view method,
char *url_data, int url_length,
RouteMatch &out_match);
};

View File

@@ -1,10 +1,10 @@
#include "arena_allocator.hpp"
#include "arena.hpp"
#include <cassert>
#include <iomanip>
#include <limits>
#include <vector>
ArenaAllocator::~ArenaAllocator() {
Arena::~Arena() {
while (current_block_) {
Block *prev = current_block_->prev;
std::free(current_block_);
@@ -12,13 +12,13 @@ ArenaAllocator::~ArenaAllocator() {
}
}
ArenaAllocator::ArenaAllocator(ArenaAllocator &&other) noexcept
Arena::Arena(Arena &&other) noexcept
: initial_block_size_(other.initial_block_size_),
current_block_(other.current_block_) {
other.current_block_ = nullptr;
}
ArenaAllocator &ArenaAllocator::operator=(ArenaAllocator &&other) noexcept {
Arena &Arena::operator=(Arena &&other) noexcept {
if (this != &other) {
while (current_block_) {
Block *prev = current_block_->prev;
@@ -34,7 +34,7 @@ ArenaAllocator &ArenaAllocator::operator=(ArenaAllocator &&other) noexcept {
return *this;
}
void ArenaAllocator::reset() {
void Arena::reset() {
if (!current_block_) {
return;
}
@@ -63,8 +63,8 @@ void ArenaAllocator::reset() {
current_block_->offset = 0;
}
void *ArenaAllocator::realloc_raw(void *ptr, uint32_t old_size,
uint32_t new_size, uint32_t alignment) {
void *Arena::realloc_raw(void *ptr, uint32_t old_size, uint32_t new_size,
uint32_t alignment) {
if (ptr == nullptr) {
return allocate_raw(new_size, alignment);
}
@@ -125,8 +125,8 @@ void *ArenaAllocator::realloc_raw(void *ptr, uint32_t old_size,
return new_ptr;
}
void ArenaAllocator::debug_dump(std::ostream &out, bool show_memory_map,
bool show_content, size_t content_limit) const {
void Arena::debug_dump(std::ostream &out, bool show_memory_map,
bool show_content, size_t content_limit) const {
out << "=== Arena Debug Dump ===" << std::endl;
if (!current_block_) {
@@ -242,20 +242,20 @@ void ArenaAllocator::debug_dump(std::ostream &out, bool show_memory_map,
}
}
void ArenaAllocator::add_block(size_t size) {
void Arena::add_block(size_t size) {
Block *new_block = Block::create(size, current_block_);
current_block_ = new_block;
}
size_t ArenaAllocator::calculate_next_block_size(size_t required_size) const {
size_t Arena::calculate_next_block_size(size_t required_size) const {
size_t doubled_size = (current_block_ ? current_block_->size : 0) * 2;
doubled_size =
std::min<size_t>(doubled_size, std::numeric_limits<uint32_t>::max());
return std::max(required_size, doubled_size);
}
void ArenaAllocator::dump_memory_contents(std::ostream &out, const char *data,
size_t size) {
void Arena::dump_memory_contents(std::ostream &out, const char *data,
size_t size) {
const int bytes_per_line = 16;
for (int64_t offset = 0; offset < static_cast<int64_t>(size);

View File

@@ -17,7 +17,7 @@
/**
* @brief A high-performance arena allocator for bulk allocations.
*
* ArenaAllocator provides extremely fast memory allocation (~1ns per
* Arena provides extremely fast memory allocation (~1ns per
* allocation) by allocating large blocks and serving allocations from them
* sequentially. It's designed for scenarios where many small objects need to be
* allocated and can all be deallocated together.
@@ -39,7 +39,7 @@
*
* ## Usage Examples:
* ```cpp
* ArenaAllocator arena(1024);
* Arena arena(1024);
* void* ptr = arena.allocate_raw(100);
* int* num = arena.construct<int>(42);
* arena.reset(); // Reuse arena memory
@@ -52,13 +52,13 @@
* - Move semantics transfer ownership of all blocks
*
* ## Thread Safety:
* ArenaAllocator is **not thread-safe** - concurrent access from multiple
* Arena is **not thread-safe** - concurrent access from multiple
* threads requires external synchronization. However, this design is
* intentional for performance reasons and the WeaselDB architecture ensures
* thread safety through ownership patterns:
*
* ### Safe Usage Patterns in WeaselDB:
* - **Per-Connection Instances**: Each Connection owns its own ArenaAllocator
* - **Per-Connection Instances**: Each Connection owns its own Arena
* instance, accessed only by the thread that currently owns the connection
* - **Single Owner Principle**: Connection ownership transfers atomically
* between threads using unique_ptr, ensuring only one thread accesses the arena
@@ -81,10 +81,10 @@
* - **No Shared State**: Each arena is completely isolated - no shared data
* between different arena instances
*
* @warning Do not share ArenaAllocator instances between threads. Use separate
* @warning Do not share Arena instances between threads. Use separate
* instances per thread or per logical unit of work.
*/
struct ArenaAllocator {
struct Arena {
private:
/**
* @brief Internal block structure for the intrusive linked list.
@@ -117,19 +117,17 @@ private:
*/
static Block *create(size_t size, Block *prev) {
if (size > std::numeric_limits<uint32_t>::max()) {
std::fprintf(
stderr,
"ArenaAllocator: Block size %zu exceeds maximum uint32_t value\n",
size);
std::fprintf(stderr,
"Arena: Block size %zu exceeds maximum uint32_t value\n",
size);
std::abort();
}
void *memory = std::aligned_alloc(
alignof(Block), align_up(sizeof(Block) + size, alignof(Block)));
if (!memory) {
std::fprintf(
stderr,
"ArenaAllocator: Failed to allocate memory block of size %zu\n",
size);
std::fprintf(stderr,
"Arena: Failed to allocate memory block of size %zu\n",
size);
std::abort();
}
size_t total_size = size + (prev ? prev->total_size : 0);
@@ -142,7 +140,7 @@ private:
public:
/**
* @brief Construct an ArenaAllocator with the specified initial block size.
* @brief Construct an Arena with the specified initial block size.
*
* No memory is allocated until the first allocation request (lazy
* initialization). The initial block size is used for the first block and as
@@ -150,7 +148,7 @@ public:
*
* @param initial_size Size in bytes for the first block (default: 1024)
*/
explicit ArenaAllocator(size_t initial_size = 1024)
explicit Arena(size_t initial_size = 1024)
: initial_block_size_(initial_size), current_block_(nullptr) {}
/**
@@ -159,18 +157,18 @@ public:
* Traverses the intrusive linked list backwards from current_block_,
* freeing each block. This ensures no memory leaks.
*/
~ArenaAllocator();
~Arena();
/// Copy construction is not allowed (would be expensive and error-prone)
ArenaAllocator(const ArenaAllocator &) = delete;
Arena(const Arena &) = delete;
/// Copy assignment is not allowed (would be expensive and error-prone)
ArenaAllocator &operator=(const ArenaAllocator &) = delete;
Arena &operator=(const Arena &) = delete;
/**
* @brief Move constructor - transfers ownership of all blocks.
* @param other The ArenaAllocator to move from (will be left empty)
* @param other The Arena to move from (will be left empty)
*/
ArenaAllocator(ArenaAllocator &&other) noexcept;
Arena(Arena &&other) noexcept;
/**
* @brief Move assignment operator - transfers ownership of all blocks.
@@ -178,10 +176,10 @@ public:
* Frees any existing blocks in this allocator before taking ownership
* of blocks from the other allocator.
*
* @param other The ArenaAllocator to move from (will be left empty)
* @param other The Arena to move from (will be left empty)
* @return Reference to this allocator
*/
ArenaAllocator &operator=(ArenaAllocator &&other) noexcept;
Arena &operator=(Arena &&other) noexcept;
/**
* @brief Allocate raw memory with the specified size and alignment.
@@ -293,7 +291,7 @@ public:
T *realloc(T *ptr, uint32_t old_size, uint32_t new_size) {
if (size_t(new_size) * sizeof(T) > std::numeric_limits<uint32_t>::max()) {
std::fprintf(stderr,
"ArenaAllocator: Reallocation size overflow for type %s "
"Arena: Reallocation size overflow for type %s "
"(new_size=%u, sizeof(T)=%zu)\n",
typeid(T).name(), new_size, sizeof(T));
std::abort();
@@ -306,7 +304,7 @@ public:
* @brief Smart pointer for arena-allocated objects with non-trivial
* destructors.
*
* ArenaAllocator::Ptr calls the destructor but does not free memory (assumes
* Arena::Ptr calls the destructor but does not free memory (assumes
* arena allocation). This provides RAII semantics for objects that need
* cleanup without the overhead of individual memory deallocation.
*
@@ -363,13 +361,13 @@ public:
* This method returns different types based on whether T is trivially
* destructible:
* - For trivially destructible types: returns T* (raw pointer)
* - For non-trivially destructible types: returns ArenaAllocator::Ptr<T>
* - For non-trivially destructible types: returns Arena::Ptr<T>
* (smart pointer that calls destructor)
*
* @tparam T The type of object to construct
* @tparam Args Types of constructor arguments
* @param args Arguments to forward to T's constructor
* @return T* for trivially destructible types, ArenaAllocator::Ptr<T>
* @return T* for trivially destructible types, Arena::Ptr<T>
* otherwise
* @note Prints error to stderr and calls std::abort() if memory allocation
* fails
@@ -414,7 +412,7 @@ public:
template <typename T> T *allocate(uint32_t size) {
static_assert(
std::is_trivially_destructible_v<T>,
"ArenaAllocator::allocate requires trivially destructible types. "
"Arena::allocate requires trivially destructible types. "
"Objects allocated in the arena will not have their destructors "
"called.");
if (size == 0) {
@@ -422,7 +420,7 @@ public:
}
if (size_t(size) * sizeof(T) > std::numeric_limits<uint32_t>::max()) {
std::fprintf(stderr,
"ArenaAllocator: Allocation size overflow for type %s "
"Arena: Allocation size overflow for type %s "
"(size=%u, sizeof(T)=%zu)\n",
typeid(T).name(), size, sizeof(T));
std::abort();
@@ -615,7 +613,7 @@ private:
};
/**
* @brief STL-compatible allocator that uses ArenaAllocator for memory
* @brief STL-compatible allocator that uses Arena for memory
* management.
* @tparam T The type of objects to allocate
*/
@@ -633,7 +631,7 @@ public:
using other = ArenaStlAllocator<U>;
};
explicit ArenaStlAllocator(ArenaAllocator *arena) noexcept : arena_(arena) {}
explicit ArenaStlAllocator(Arena *arena) noexcept : arena_(arena) {}
template <typename U>
ArenaStlAllocator(const ArenaStlAllocator<U> &other) noexcept
@@ -659,7 +657,7 @@ public:
return arena_ != other.arena_;
}
ArenaAllocator *arena_;
Arena *arena_;
template <typename U> friend class ArenaStlAllocator;
};
@@ -669,7 +667,7 @@ public:
/// arena-allocated Uses arena's realloc() for efficient growth without copying
/// when possible
template <typename T> struct ArenaVector {
explicit ArenaVector(ArenaAllocator *arena)
explicit ArenaVector(Arena *arena)
: arena_(arena), data_(nullptr), size_(0), capacity_(0) {}
void push_back(const T &item) {
@@ -713,7 +711,7 @@ private:
capacity_ = new_capacity;
}
ArenaAllocator *arena_;
Arena *arena_;
T *data_;
size_t size_;
size_t capacity_;

View File

@@ -5,7 +5,7 @@
#include <string_view>
#include <vector>
#include "arena_allocator.hpp"
#include "arena.hpp"
/**
* @brief Represents a precondition for optimistic concurrency control.
@@ -63,7 +63,7 @@ struct Operation {
*/
struct CommitRequest {
private:
ArenaAllocator arena_;
Arena arena_;
std::optional<std::string_view> request_id_;
std::string_view leader_id_;
int64_t read_version_ = 0;
@@ -155,7 +155,7 @@ public:
*
* @return Reference to the arena allocator
*/
const ArenaAllocator &arena() const { return arena_; }
const Arena &arena() const { return arena_; }
/**
* @brief Get access to the underlying arena allocator for allocation.
@@ -166,7 +166,7 @@ public:
*
* @return Reference to the arena allocator
*/
ArenaAllocator &arena() { return arena_; }
Arena &arena() { return arena_; }
/**
* @brief Reset the commit request for reuse.

View File

@@ -14,6 +14,7 @@ ConfigParser::load_from_file(const std::string &file_path) {
parse_server_config(toml_data, config.server);
parse_commit_config(toml_data, config.commit);
parse_subscription_config(toml_data, config.subscription);
parse_benchmark_config(toml_data, config.benchmark);
if (!validate_config(config)) {
return std::nullopt;
@@ -36,6 +37,7 @@ ConfigParser::parse_toml_string(const std::string &toml_content) {
parse_server_config(toml_data, config.server);
parse_commit_config(toml_data, config.commit);
parse_subscription_config(toml_data, config.subscription);
parse_benchmark_config(toml_data, config.benchmark);
if (!validate_config(config)) {
return std::nullopt;
@@ -146,6 +148,13 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
});
}
void ConfigParser::parse_benchmark_config(const auto &toml_data,
BenchmarkConfig &config) {
parse_section(toml_data, "benchmark", [&](const auto &bench) {
parse_field(bench, "ok_resolve_iterations", config.ok_resolve_iterations);
});
}
bool ConfigParser::validate_config(const Config &config) {
bool valid = true;

View File

@@ -74,6 +74,15 @@ struct SubscriptionConfig {
std::chrono::seconds keepalive_interval{30};
};
/**
* @brief Configuration settings for benchmarking and health check behavior.
*/
struct BenchmarkConfig {
/// CPU-intensive loop iterations for /ok requests in resolve stage
/// 0 = health check only, 4000 = default benchmark load (740ns, 1M req/s)
int ok_resolve_iterations = 0;
};
/**
* @brief Top-level configuration container for all WeaselDB settings.
*/
@@ -81,6 +90,7 @@ struct Config {
ServerConfig server; ///< Server networking and request handling settings
CommitConfig commit; ///< Commit processing and validation settings
SubscriptionConfig subscription; ///< Subscription streaming settings
BenchmarkConfig benchmark; ///< Benchmarking and health check settings
};
/**
@@ -152,6 +162,8 @@ private:
static void parse_commit_config(const auto &toml_data, CommitConfig &config);
static void parse_subscription_config(const auto &toml_data,
SubscriptionConfig &config);
static void parse_benchmark_config(const auto &toml_data,
BenchmarkConfig &config);
};
} // namespace weaseldb

View File

@@ -8,7 +8,7 @@
#include <sys/uio.h>
#include <unistd.h>
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "connection_handler.hpp"
#ifndef __has_feature
@@ -28,7 +28,7 @@
* - RAII cleanup happens if I/O thread doesn't transfer back
*
* Arena allocator thread safety:
* Each Connection contains its own ArenaAllocator instance that is accessed
* Each Connection contains its own Arena instance that is accessed
* exclusively by the thread that currently owns the connection. This ensures
* thread safety without requiring locks:
* - Arena is used by the owning thread for I/O buffers, request parsing, and
@@ -117,7 +117,7 @@ struct Connection {
/**
* @brief Get access to the connection's arena allocator.
*
* Returns a reference to this connection's private ArenaAllocator instance,
* Returns a reference to this connection's private Arena instance,
* which should be used for all temporary allocations during request
* processing. The arena provides extremely fast allocation (~1ns) and
* automatic cleanup when the connection is destroyed or reset.
@@ -135,7 +135,7 @@ struct Connection {
*
* Best practices:
* ```cpp
* ArenaAllocator& arena = conn->get_arena();
* Arena& arena = conn->get_arena();
*
* // Allocate temporary parsing buffers
* char* buffer = arena.allocate<char>(1024);
@@ -147,7 +147,7 @@ struct Connection {
* std::vector<Token, ArenaStlAllocator<Token>> tokens{&arena};
* ```
*/
ArenaAllocator &get_arena() { return arena_; }
Arena &get_arena() { return arena_; }
/**
* @brief Get the unique identifier for this connection.
@@ -345,7 +345,7 @@ private:
const int64_t id_;
const size_t epoll_index_; // Index of the epoll instance this connection uses
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
ArenaAllocator arena_;
Arena arena_;
ConnectionHandler *handler_;
std::weak_ptr<Server> server_; // Weak reference to server for safe cleanup

61
src/cpu_work.cpp Normal file
View File

@@ -0,0 +1,61 @@
#include "cpu_work.hpp"
#if defined(__x86_64__) || defined(__amd64__)
// x86-64 file-scoped assembly implementation
#ifdef __APPLE__
asm(".text\n"
".globl _spend_cpu_cycles\n"
"_spend_cpu_cycles:\n"
" test %edi, %edi\n" // Test if iterations <= 0
" jle .L_end\n" // Jump to end if <= 0
".L_loop:\n" // Loop start
" dec %edi\n" // Decrement iterations
" jnz .L_loop\n" // Jump back if not zero
".L_end:\n" // End
" ret\n" // Return
);
#else
asm(".text\n"
".globl spend_cpu_cycles\n"
".type spend_cpu_cycles, @function\n"
"spend_cpu_cycles:\n"
" test %edi, %edi\n" // Test if iterations <= 0
" jle .L_end\n" // Jump to end if <= 0
".L_loop:\n" // Loop start
" dec %edi\n" // Decrement iterations
" jnz .L_loop\n" // Jump back if not zero
".L_end:\n" // End
" ret\n" // Return
".size spend_cpu_cycles, .-spend_cpu_cycles\n");
#endif
#elif defined(__aarch64__)
// ARM64 file-scoped assembly implementation
#ifdef __APPLE__
asm(".text\n"
".globl _spend_cpu_cycles\n"
"_spend_cpu_cycles:\n"
" cmp w0, wzr\n" // Compare iterations with zero
" b.le .L_end\n" // Branch to end if <= 0
".L_loop:\n" // Loop start
" subs w0, w0, #1\n" // Decrement iterations and set flags
" b.ne .L_loop\n" // Branch back if not zero
".L_end:\n" // End
" ret\n" // Return
);
#else
asm(".text\n"
".globl spend_cpu_cycles\n"
".type spend_cpu_cycles, %function\n"
"spend_cpu_cycles:\n"
" cmp w0, wzr\n" // Compare iterations with zero
" b.le .L_end\n" // Branch to end if <= 0
".L_loop:\n" // Loop start
" subs w0, w0, #1\n" // Decrement iterations and set flags
" b.ne .L_loop\n" // Branch back if not zero
".L_end:\n" // End
" ret\n" // Return
".size spend_cpu_cycles, spend_cpu_cycles\n");
#endif
#endif

24
src/cpu_work.hpp Normal file
View File

@@ -0,0 +1,24 @@
#pragma once
extern "C" {
/**
* @brief Perform CPU-intensive work for benchmarking and health check purposes.
*
* This function performs a deterministic amount of CPU work that cannot be
* optimized away by the compiler. It's used both in the health check resolve
* stage and in benchmarks to measure the actual CPU time consumed.
*
* @param iterations Number of loop iterations to perform
*/
void spend_cpu_cycles(int iterations);
}
/**
* @brief Default CPU work iterations for health check benchmarking.
*
* Represents the number of CPU-intensive loop iterations used in the
* /ok health check resolve stage. This value provides 650ns of CPU work
* and achieves 1M requests/second throughput through the 4-stage pipeline.
*/
constexpr int DEFAULT_HEALTH_CHECK_ITERATIONS = 4000;

View File

@@ -952,7 +952,7 @@ void DoubleTerm::write(char *&buf) const { buf = to_chars(buf, nullptr, s); }
} // namespace detail
std::string_view format(ArenaAllocator &arena, const char *fmt, ...) {
std::string_view format(Arena &arena, const char *fmt, ...) {
va_list args;
// Try to format directly into available arena space (single-pass

View File

@@ -7,7 +7,7 @@
#include <string_view>
#include <type_traits>
#include "arena_allocator.hpp"
#include "arena.hpp"
/**
* @brief Runtime printf-style formatting with arena allocation optimization.
@@ -47,7 +47,7 @@
*
* ## Usage Examples:
* ```cpp
* ArenaAllocator arena(1024);
* Arena arena(1024);
*
* // Basic formatting
* auto msg = format(arena, "Hello %s!", "World");
@@ -83,7 +83,7 @@
* const char*
*
* ## Optimization Details:
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
* The function uses `Arena::allocate_remaining_space()` to claim all
* available arena space and attempt formatting. If successful, it shrinks the
* allocation to the actual size used. If formatting fails (doesn't fit), it
* falls back to the traditional two-pass approach: measure size, allocate
@@ -92,7 +92,7 @@
* This strategy optimizes for the common case where available arena space is
* sufficient, while maintaining correctness for all cases.
*/
std::string_view format(ArenaAllocator &arena, const char *fmt, ...)
std::string_view format(Arena &arena, const char *fmt, ...)
__attribute__((format(printf, 2, 3)));
namespace detail {
@@ -232,7 +232,7 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
*
* ## Usage Examples:
* ```cpp
* ArenaAllocator arena(1024);
* Arena arena(1024);
*
* // String concatenation
* auto result1 = static_format(arena, "Hello ", "World", "!");
@@ -275,7 +275,7 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
* builds
*/
template <class... Ts>
std::string_view static_format(ArenaAllocator &arena, Ts &&...ts) {
std::string_view static_format(Arena &arena, Ts &&...ts) {
constexpr int upper_bound = (decltype(detail::term(ts))::kMaxLength + ...);
char *result = arena.allocate<char>(upper_bound);
char *buf = result;

View File

@@ -5,7 +5,9 @@
#include <string>
#include <strings.h>
#include "arena_allocator.hpp"
#include "api_url_parser.hpp"
#include "arena.hpp"
#include "cpu_work.hpp"
#include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp"
@@ -25,6 +27,8 @@ thread_local auto status_counter =
requests_counter_family.create({{"path", "/v1/status"}});
thread_local auto version_counter =
requests_counter_family.create({{"path", "/v1/version"}});
thread_local auto ok_counter =
requests_counter_family.create({{"path", "/ok"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
@@ -33,8 +37,8 @@ auto banned_request_ids_memory_gauge =
.create({});
// HttpConnectionState implementation
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
HttpConnectionState::HttpConnectionState(Arena &arena)
: arena(arena), current_header_field_buf(ArenaStlAllocator<char>(&arena)),
current_header_value_buf(ArenaStlAllocator<char>(&arena)) {
llhttp_settings_init(&settings);
@@ -55,7 +59,7 @@ HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
// HttpHandler implementation
void HttpHandler::on_connection_established(Connection &conn) {
// Allocate HTTP state in connection's arena
ArenaAllocator &arena = conn.get_arena();
Arena &arena = conn.get_arena();
void *mem = arena.allocate_raw(sizeof(HttpConnectionState),
alignof(HttpConnectionState));
auto *state = new (mem) HttpConnectionState(arena);
@@ -65,13 +69,21 @@ void HttpHandler::on_connection_established(Connection &conn) {
void HttpHandler::on_connection_closed(Connection &conn) {
// Arena cleanup happens automatically when connection is destroyed
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
state->~HttpConnectionState();
if (state) {
// Arena::Ptr automatically calls destructors
state->~HttpConnectionState();
}
conn.user_data = nullptr;
}
void HttpHandler::on_write_buffer_drained(
std::unique_ptr<Connection> &conn_ptr) {
// Reset arena after all messages have been written for the next request
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
if (state) {
TRACE_EVENT("http", "reply",
perfetto::Flow::Global(state->http_request_id));
}
on_connection_closed(*conn_ptr);
conn_ptr->reset();
on_connection_established(*conn_ptr);
@@ -79,21 +91,27 @@ void HttpHandler::on_write_buffer_drained(
void HttpHandler::on_batch_complete(
std::span<std::unique_ptr<Connection>> batch) {
// Collect commit requests and status requests for pipeline processing
// Collect commit, status, and health check requests for pipeline processing
int pipeline_count = 0;
// Count both commit and status requests
// Count commit, status, and health check requests
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Count commit requests that passed basic validation
if (state->route == HttpRoute::POST_commit && state->commit_request &&
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
pipeline_count++;
}
// Count status requests
else if (state->route == HttpRoute::GET_status &&
else if (state->route == HttpRoute::GetStatus &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
// Count health check requests
else if (state->route == HttpRoute::GetOk &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
@@ -111,14 +129,18 @@ void HttpHandler::on_batch_complete(
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::POST_commit && state->commit_request &&
if (state->route == HttpRoute::PostCommit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ = CommitEntry{std::move(conn)};
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GET_status) {
else if (state->route == HttpRoute::GetStatus) {
*out_iter++ = StatusEntry{std::move(conn)};
}
// Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) {
*out_iter++ = HealthCheckEntry{std::move(conn)};
}
}
}
}
@@ -148,87 +170,64 @@ void HttpHandler::on_data_arrived(std::string_view data,
// If message is complete, route and handle the request
if (state->message_complete) {
// Parse route from method and URL
state->route = parseRoute(state->method, state->url);
// Copy URL to arena for in-place decoding
Arena &arena = conn_ptr->get_arena();
char *url_buffer = arena.allocate<char>(state->url.size());
std::memcpy(url_buffer, state->url.data(), state->url.size());
RouteMatch route_match;
auto parse_result =
ApiUrlParser::parse(state->method, url_buffer,
static_cast<int>(state->url.size()), route_match);
if (parse_result != ParseResult::Success) {
// Handle malformed URL encoding
send_error_response(*conn_ptr, 400, "Malformed URL encoding", true);
return;
}
state->route = route_match.route;
// Route to appropriate handler
switch (state->route) {
case HttpRoute::GET_version:
handleGetVersion(*conn_ptr, *state);
case HttpRoute::GetVersion:
handle_get_version(*conn_ptr, *state);
break;
case HttpRoute::POST_commit:
handlePostCommit(*conn_ptr, *state);
case HttpRoute::PostCommit:
handle_post_commit(*conn_ptr, *state);
break;
case HttpRoute::GET_subscribe:
handleGetSubscribe(*conn_ptr, *state);
case HttpRoute::GetSubscribe:
handle_get_subscribe(*conn_ptr, *state);
break;
case HttpRoute::GET_status:
handleGetStatus(*conn_ptr, *state);
case HttpRoute::GetStatus:
handle_get_status(*conn_ptr, *state, route_match);
break;
case HttpRoute::PUT_retention:
handlePutRetention(*conn_ptr, *state);
case HttpRoute::PutRetention:
handle_put_retention(*conn_ptr, *state, route_match);
break;
case HttpRoute::GET_retention:
handleGetRetention(*conn_ptr, *state);
case HttpRoute::GetRetention:
handle_get_retention(*conn_ptr, *state, route_match);
break;
case HttpRoute::DELETE_retention:
handleDeleteRetention(*conn_ptr, *state);
case HttpRoute::DeleteRetention:
handle_delete_retention(*conn_ptr, *state, route_match);
break;
case HttpRoute::GET_metrics:
handleGetMetrics(*conn_ptr, *state);
case HttpRoute::GetMetrics:
handle_get_metrics(*conn_ptr, *state);
break;
case HttpRoute::GET_ok:
handleGetOk(*conn_ptr, *state);
case HttpRoute::GetOk:
handle_get_ok(*conn_ptr, *state);
break;
case HttpRoute::NotFound:
default:
handleNotFound(*conn_ptr, *state);
handle_not_found(*conn_ptr, *state);
break;
}
}
}
HttpRoute HttpHandler::parseRoute(std::string_view method,
std::string_view url) {
// Strip query parameters if present
size_t query_pos = url.find('?');
if (query_pos != std::string_view::npos) {
url = url.substr(0, query_pos);
}
// Route based on method and path
if (method == "GET") {
if (url == "/v1/version")
return HttpRoute::GET_version;
if (url == "/v1/subscribe")
return HttpRoute::GET_subscribe;
if (url.starts_with("/v1/status"))
return HttpRoute::GET_status;
if (url.starts_with("/v1/retention")) {
// Check if it's a specific retention policy or list all
return HttpRoute::GET_retention;
}
if (url == "/metrics")
return HttpRoute::GET_metrics;
if (url == "/ok")
return HttpRoute::GET_ok;
} else if (method == "POST") {
if (url == "/v1/commit")
return HttpRoute::POST_commit;
} else if (method == "PUT") {
if (url.starts_with("/v1/retention/"))
return HttpRoute::PUT_retention;
} else if (method == "DELETE") {
if (url.starts_with("/v1/retention/"))
return HttpRoute::DELETE_retention;
}
return HttpRoute::NotFound;
}
// Route handlers (basic implementations)
void HttpHandler::handleGetVersion(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_get_version(Connection &conn,
const HttpConnectionState &state) {
version_counter.inc();
send_json_response(
conn, 200,
@@ -237,15 +236,15 @@ void HttpHandler::handleGetVersion(Connection &conn,
state.connection_close);
}
void HttpHandler::handlePostCommit(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_post_commit(Connection &conn,
const HttpConnectionState &state) {
commit_counter.inc();
// Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) {
const char *error = state.commit_parser
? state.commit_parser->get_parse_error()
: "No parser initialized";
ArenaAllocator &arena = conn.get_arena();
Arena &arena = conn.get_arena();
std::string_view error_msg =
format(arena, "Parse failed: %s", error ? error : "Unknown error");
send_error_response(conn, 400, error_msg, state.connection_close);
@@ -297,8 +296,8 @@ void HttpHandler::handlePostCommit(Connection &conn,
// Response will be sent after 4-stage pipeline processing is complete
}
void HttpHandler::handleGetSubscribe(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_get_subscribe(Connection &conn,
const HttpConnectionState &state) {
// TODO: Implement subscription streaming
send_json_response(
conn, 200,
@@ -306,90 +305,63 @@ void HttpHandler::handleGetSubscribe(Connection &conn,
state.connection_close);
}
void HttpHandler::handleGetStatus(Connection &conn,
HttpConnectionState &state) {
void HttpHandler::handle_get_status(Connection &conn,
HttpConnectionState &state,
const RouteMatch &route_match) {
status_counter.inc();
// Status requests are processed through the pipeline
// Response will be generated in the sequence stage
// This handler extracts request_id from query parameters and prepares for
// pipeline processing
// Extract request_id from query parameters:
// /v1/status?request_id=<ID>&min_version=<VERSION>
std::string_view url = state.url;
// Find query parameters
size_t query_pos = url.find('?');
if (query_pos == std::string_view::npos) {
// No query parameters
const auto &request_id =
route_match.params[static_cast<int>(ApiParameterKey::RequestId)];
if (!request_id) {
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
std::string_view query_string = url.substr(query_pos + 1);
// Simple query parameter parsing for request_id
// Look for "request_id=" in the query string
size_t request_id_pos = query_string.find("request_id=");
if (request_id_pos == std::string_view::npos) {
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
// Extract the request_id value
size_t value_start = request_id_pos + 11; // length of "request_id="
if (value_start >= query_string.length()) {
if (request_id->empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Find the end of the request_id value (next & or end of string)
size_t value_end = query_string.find('&', value_start);
if (value_end == std::string_view::npos) {
value_end = query_string.length();
}
state.status_request_id =
query_string.substr(value_start, value_end - value_start);
if (state.status_request_id.empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Store the request_id in the state for the pipeline
state.status_request_id = *request_id;
// Ready for pipeline processing
}
void HttpHandler::handlePutRetention(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_put_retention(Connection &conn,
const HttpConnectionState &state,
const RouteMatch &route_match) {
// TODO: Parse retention policy from body and store
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
state.connection_close);
}
void HttpHandler::handleGetRetention(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_get_retention(Connection &conn,
const HttpConnectionState &state,
const RouteMatch &route_match) {
// TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
}
void HttpHandler::handleDeleteRetention(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_delete_retention(Connection &conn,
const HttpConnectionState &state,
const RouteMatch &route_match) {
// TODO: Extract policy_id from URL and delete
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
state.connection_close);
}
void HttpHandler::handleGetMetrics(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_get_metrics(Connection &conn,
const HttpConnectionState &state) {
metrics_counter.inc();
ArenaAllocator &arena = conn.get_arena();
Arena &arena = conn.get_arena();
auto metrics_span = metric::render(arena);
// Calculate total size for the response body
@@ -428,23 +400,25 @@ void HttpHandler::handleGetMetrics(Connection &conn,
}
}
void HttpHandler::handleGetOk(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_get_ok(Connection &conn,
const HttpConnectionState &state) {
ok_counter.inc();
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
// Health check requests are processed through the pipeline
// Response will be generated in the release stage after pipeline processing
}
void HttpHandler::handleNotFound(Connection &conn,
const HttpConnectionState &state) {
void HttpHandler::handle_not_found(Connection &conn,
const HttpConnectionState &state) {
send_error_response(conn, 404, "Not found", state.connection_close);
}
// HTTP utility methods
void HttpHandler::sendResponse(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body, bool close_connection) {
ArenaAllocator &arena = conn.get_arena();
void HttpHandler::send_response(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body, bool close_connection) {
Arena &arena = conn.get_arena();
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
// Status text
@@ -492,13 +466,13 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection) {
sendResponse(conn, status_code, "application/json", json, close_connection);
send_response(conn, status_code, "application/json", json, close_connection);
}
void HttpHandler::send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection) {
ArenaAllocator &arena = conn.get_arena();
Arena &arena = conn.get_arena();
std::string_view json =
format(arena, R"({"error":"%.*s"})", static_cast<int>(message.size()),
@@ -589,8 +563,9 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
// parser
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
// Initialize streaming commit request parsing
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
state->commit_request = std::make_unique<CommitRequest>();
state->commit_parser = state->arena.construct<JsonCommitRequestParser>();
state->commit_request = state->arena.construct<CommitRequest>();
state->parsing_commit =
state->commit_parser->begin_streaming_parse(*state->commit_request);
@@ -719,6 +694,18 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) {
R"({"status": "not_committed"})",
state->connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: noop in sequence stage
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "sequence_health_check",
perfetto::Flow::Global(state->http_request_id));
}
return false; // Continue processing
}
@@ -770,6 +757,21 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: perform configurable CPU work
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "resolve_health_check",
perfetto::Flow::Global(state->http_request_id));
}
// Perform configurable CPU-intensive work for benchmarking
spend_cpu_cycles(config_.benchmark.ok_resolve_iterations);
return false; // Continue processing
}
return false; // Unknown type, continue
@@ -817,7 +819,7 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
perfetto::Flow::Global(state->http_request_id));
const CommitRequest &commit_request = *state->commit_request;
ArenaAllocator &arena = commit_entry.connection->get_arena();
Arena &arena = commit_entry.connection->get_arena();
std::string_view response;
// Generate success response with actual assigned version
@@ -843,6 +845,22 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: generate OK response
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "persist_health_check",
perfetto::Flow::Global(state->http_request_id));
// Generate OK response
send_response(*health_check_entry.connection, 200, "text/plain",
"OK", state->connection_close);
}
return false; // Continue processing
}
return false; // Unknown type, continue
@@ -898,6 +916,22 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(status_entry.connection));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
// Process health check entry: return connection to server
auto &health_check_entry = e;
auto *state = static_cast<HttpConnectionState *>(
health_check_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_health_check",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(
std::move(health_check_entry.connection));
return false; // Continue processing
}

View File

@@ -8,10 +8,11 @@
#include <llhttp.h>
#include "arena_allocator.hpp"
#include "api_url_parser.hpp"
#include "arena.hpp"
#include "config.hpp"
#include "connection.hpp"
#include "connection_handler.hpp"
#include "loop_iterations.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
@@ -20,29 +21,14 @@
// Forward declarations
struct CommitRequest;
struct JsonCommitRequestParser;
/**
* HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing.
*/
enum class HttpRoute {
GET_version,
POST_commit,
GET_subscribe,
GET_status,
PUT_retention,
GET_retention,
DELETE_retention,
GET_metrics,
GET_ok,
NotFound
};
struct RouteMatch;
/**
* HTTP connection state stored in Connection::user_data.
* Manages llhttp parser state and request data.
*/
struct HttpConnectionState {
Arena &arena;
llhttp_t parser;
llhttp_settings_t settings;
@@ -70,13 +56,13 @@ struct HttpConnectionState {
0; // X-Request-Id header value (for tracing/logging)
// Streaming parser for POST requests
std::unique_ptr<JsonCommitRequestParser> commit_parser;
std::unique_ptr<CommitRequest> commit_request;
Arena::Ptr<JsonCommitRequestParser> commit_parser;
Arena::Ptr<CommitRequest> commit_request;
bool parsing_commit = false;
bool basic_validation_passed =
false; // Set to true if basic validation passes
explicit HttpConnectionState(ArenaAllocator &arena);
explicit HttpConnectionState(Arena &arena);
};
/**
@@ -84,9 +70,9 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
struct HttpHandler : ConnectionHandler {
HttpHandler()
: banned_request_ids(
ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
explicit HttpHandler(const weaseldb::Config &config)
: config_(config), banned_request_ids(ArenaStlAllocator<std::string_view>(
&banned_request_arena)) {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
@@ -102,7 +88,7 @@ struct HttpHandler : ConnectionHandler {
resolveThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) {
auto guard = commitPipeline.acquire<1, 0>();
auto guard = commitPipeline.acquire<1, 0>(/*maxBatch*/ 1);
if (process_resolve_batch(guard.batch)) {
return; // Shutdown signal received
}
@@ -154,9 +140,6 @@ struct HttpHandler : ConnectionHandler {
void on_batch_complete(
std::span<std::unique_ptr<Connection>> /*batch*/) override;
// Route parsing (public for testing)
static HttpRoute parseRoute(std::string_view method, std::string_view url);
// llhttp callbacks (public for HttpConnectionState access)
static int onUrl(llhttp_t *parser, const char *at, size_t length);
static int onHeaderField(llhttp_t *parser, const char *at, size_t length);
@@ -170,6 +153,9 @@ struct HttpHandler : ConnectionHandler {
private:
static constexpr int lg_size = 16;
// Configuration reference
const weaseldb::Config &config_;
// Pipeline state (sequence thread only)
int64_t next_version = 1; // Next version to assign (sequence thread only)
@@ -179,7 +165,7 @@ private:
// Arena for banned request IDs and related data structures (sequence thread
// only)
ArenaAllocator banned_request_arena;
Arena banned_request_arena;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
@@ -208,22 +194,27 @@ private:
bool process_release_batch(BatchType &batch);
// Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn,
const HttpConnectionState &state);
void handleGetMetrics(Connection &conn, const HttpConnectionState &state);
void handleGetOk(Connection &conn, const HttpConnectionState &state);
void handleNotFound(Connection &conn, const HttpConnectionState &state);
void handle_get_version(Connection &conn, const HttpConnectionState &state);
void handle_post_commit(Connection &conn, const HttpConnectionState &state);
void handle_get_subscribe(Connection &conn, const HttpConnectionState &state);
void handle_get_status(Connection &conn, HttpConnectionState &state,
const RouteMatch &route_match);
void handle_put_retention(Connection &conn, const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_get_retention(Connection &conn, const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_delete_retention(Connection &conn,
const HttpConnectionState &state,
const RouteMatch &route_match);
void handle_get_metrics(Connection &conn, const HttpConnectionState &state);
void handle_get_ok(Connection &conn, const HttpConnectionState &state);
void handle_not_found(Connection &conn, const HttpConnectionState &state);
// HTTP utilities
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body,
bool close_connection = false);
static void send_response(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body,
bool close_connection = false);
static void send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);

View File

@@ -70,7 +70,7 @@ private:
ArenaString operation_type;
// Constructor to initialize arena-allocated containers
explicit ParserContext(ArenaAllocator *arena)
explicit ParserContext(Arena *arena)
: current_key(ArenaStlAllocator<char>(arena)),
current_string(ArenaStlAllocator<char>(arena)),
current_number(ArenaStlAllocator<char>(arena)),
@@ -79,7 +79,7 @@ private:
has_read_version_been_set = false;
}
void attach_arena(ArenaAllocator *arena) {
void attach_arena(Arena *arena) {
current_key = ArenaString{ArenaStlAllocator<char>(arena)};
current_string = ArenaString{ArenaStlAllocator<char>(arena)};
current_number = ArenaString{ArenaStlAllocator<char>(arena)};

View File

@@ -1,3 +0,0 @@
#pragma once
constexpr int loop_iterations = 1725;

View File

@@ -252,12 +252,14 @@ int main(int argc, char *argv[]) {
std::cout << "Keepalive interval: "
<< config->subscription.keepalive_interval.count() << " seconds"
<< std::endl;
std::cout << "Health check resolve iterations: "
<< config->benchmark.ok_resolve_iterations << std::endl;
// Create listen sockets
std::vector<int> listen_fds = create_listen_sockets(*config);
// Create handler and server
HttpHandler http_handler;
HttpHandler http_handler(*config);
auto server = Server::create(*config, http_handler, listen_fds);
g_server = server.get();

View File

@@ -25,7 +25,7 @@
#include <immintrin.h>
#include <simdutf.h>
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "format.hpp"
// WeaselDB Metrics System Design:
@@ -79,18 +79,18 @@ namespace metric {
// - Content: Thread-specific metric instance state
//
// 3. TEMPORARY ARENAS:
// a) Caller-Provided Arenas (ArenaAllocator& parameters):
// a) Caller-Provided Arenas (Arena& parameters):
// - Lifetime: Controlled by caller (function parameter)
// - Purpose: Output formatting where caller controls result lifetime
// - Owner: Caller owns arena and controls string lifetime
// - Example: render(ArenaAllocator& arena) - caller manages arena
// - Example: render(Arena& arena) - caller manages arena
// lifecycle
//
// b) Stack-Owned Temporary Arenas:
// - Lifetime: Function/scope lifetime (automatic destruction)
// - Purpose: Internal temporary allocations for lookups and processing
// - Owner: Function owns arena on stack, destroyed at scope exit
// - Example: intern_labels() creates ArenaAllocator lookup_arena(1024)
// - Example: intern_labels() creates Arena lookup_arena(1024)
//
// CRITICAL OWNERSHIP RULES:
//
@@ -124,8 +124,7 @@ static void validate_or_abort(bool condition, const char *message,
}
// Helper to copy a string into arena memory
static std::string_view arena_copy_string(std::string_view str,
ArenaAllocator &arena) {
static std::string_view arena_copy_string(std::string_view str, Arena &arena) {
if (str.empty()) {
return std::string_view{};
}
@@ -142,7 +141,7 @@ struct LabelsKey {
// Arena-owning constructor (copies strings into arena and formats as
// Prometheus text)
LabelsKey(std::span<const std::pair<std::string_view, std::string_view>> l,
ArenaAllocator &arena) {
Arena &arena) {
// Copy and validate all label keys and values, sort by key
ArenaVector<std::pair<std::string_view, std::string_view>> labels(&arena);
for (const auto &[key, value] : l) {
@@ -251,7 +250,7 @@ template <> struct Family<Counter>::State {
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>>
instances;
explicit PerThreadState(ArenaAllocator &arena)
explicit PerThreadState(Arena &arena)
: instances(
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>(
&arena)) {}
@@ -271,7 +270,7 @@ template <> struct Family<Counter>::State {
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Counter>>>>
callbacks;
State(ArenaAllocator &arena)
State(Arena &arena)
: global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>(
&arena)),
@@ -293,7 +292,7 @@ template <> struct Family<Gauge>::State {
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Gauge>>>>
callbacks;
State(ArenaAllocator &arena)
State(Arena &arena)
: instances(ArenaStlAllocator<std::pair<const LabelsKey, Gauge::State *>>(
&arena)),
callbacks(ArenaStlAllocator<
@@ -312,7 +311,7 @@ template <> struct Family<Histogram>::State {
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>>
instances;
explicit PerThreadState(ArenaAllocator &arena)
explicit PerThreadState(Arena &arena)
: instances(
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>(
&arena)) {}
@@ -326,7 +325,7 @@ template <> struct Family<Histogram>::State {
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>>
global_accumulated_values;
State(ArenaAllocator &arena)
State(Arena &arena)
: buckets(&arena),
global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>(
@@ -371,54 +370,47 @@ struct Metric {
static std::mutex mutex;
// Global arena allocator for metric families and persistent global state
static ArenaAllocator &get_global_arena() {
static auto *global_arena =
new ArenaAllocator(64 * 1024); // 64KB initial size
static Arena &get_global_arena() {
static auto *global_arena = new Arena(64 * 1024); // 64KB initial size
return *global_arena;
}
// Function-local statics to avoid static initialization order fiasco
static auto &get_counter_families() {
using FamilyMap =
std::map<std::string_view, ArenaAllocator::Ptr<Family<Counter>::State>,
std::less<std::string_view>,
ArenaStlAllocator<
std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Counter>::State>>>>;
static FamilyMap *counterFamilies =
new FamilyMap(ArenaStlAllocator<
std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Counter>::State>>>(
using FamilyMap = std::map<
std::string_view, Arena::Ptr<Family<Counter>::State>,
std::less<std::string_view>,
ArenaStlAllocator<std::pair<const std::string_view,
Arena::Ptr<Family<Counter>::State>>>>;
static FamilyMap *counterFamilies = new FamilyMap(
ArenaStlAllocator<std::pair<const std::string_view,
Arena::Ptr<Family<Counter>::State>>>(
&get_global_arena()));
return *counterFamilies;
}
static auto &get_gauge_families() {
using FamilyMap =
std::map<std::string_view, ArenaAllocator::Ptr<Family<Gauge>::State>,
std::less<std::string_view>,
ArenaStlAllocator<
std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Gauge>::State>>>>;
using FamilyMap = std::map<
std::string_view, Arena::Ptr<Family<Gauge>::State>,
std::less<std::string_view>,
ArenaStlAllocator<std::pair<const std::string_view,
Arena::Ptr<Family<Gauge>::State>>>>;
static FamilyMap *gaugeFamilies = new FamilyMap(
ArenaStlAllocator<std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Gauge>::State>>>(
Arena::Ptr<Family<Gauge>::State>>>(
&get_global_arena()));
return *gaugeFamilies;
}
static auto &get_histogram_families() {
using FamilyMap =
std::map<std::string_view,
ArenaAllocator::Ptr<Family<Histogram>::State>,
std::less<std::string_view>,
ArenaStlAllocator<
std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Histogram>::State>>>>;
static FamilyMap *histogramFamilies =
new FamilyMap(ArenaStlAllocator<
std::pair<const std::string_view,
ArenaAllocator::Ptr<Family<Histogram>::State>>>(
using FamilyMap = std::map<
std::string_view, Arena::Ptr<Family<Histogram>::State>,
std::less<std::string_view>,
ArenaStlAllocator<std::pair<const std::string_view,
Arena::Ptr<Family<Histogram>::State>>>>;
static FamilyMap *histogramFamilies = new FamilyMap(
ArenaStlAllocator<std::pair<const std::string_view,
Arena::Ptr<Family<Histogram>::State>>>(
&get_global_arena()));
return *histogramFamilies;
}
@@ -446,8 +438,7 @@ struct Metric {
// Registry of all thread arenas for memory tracking
static auto &get_thread_arenas() {
using ThreadArenaMap =
std::unordered_map<std::thread::id, ArenaAllocator *>;
using ThreadArenaMap = std::unordered_map<std::thread::id, Arena *>;
static ThreadArenaMap *threadArenas = new ThreadArenaMap();
return *threadArenas;
}
@@ -460,7 +451,7 @@ struct Metric {
// Thread cleanup for per-family thread-local storage
struct ThreadInit {
ArenaAllocator arena;
Arena arena;
ThreadInit() {
// Register this thread's arena for memory tracking
std::unique_lock<std::mutex> _{mutex};
@@ -536,7 +527,7 @@ struct Metric {
static thread_local ThreadInit thread_init;
// Thread-local arena allocator for metric instances
static ArenaAllocator &get_thread_local_arena() { return thread_init.arena; }
static Arena &get_thread_local_arena() { return thread_init.arena; }
// Thread cleanup now handled by ThreadInit RAII
@@ -561,7 +552,7 @@ struct Metric {
// lifetime)
// Create temporary lookup key using stack-allocated arena
ArenaAllocator lookup_arena(1024); // Small arena for lookups only
Arena lookup_arena(1024); // Small arena for lookups only
LabelsKey lookup_key{labels, lookup_arena};
// Use standard hash set lookup - lookup_key memory used transiently only
@@ -736,7 +727,7 @@ struct Metric {
ArenaVector<Counter::State *> thread_states; // Pre-resolved pointers
Counter::State *global_state; // Pre-resolved global state pointer
CounterLabelData(const LabelsKey &key, ArenaAllocator &arena)
CounterLabelData(const LabelsKey &key, Arena &arena)
: labels_key(key), thread_states(&arena), global_state(nullptr) {}
};
@@ -754,7 +745,7 @@ struct Metric {
Histogram::State *global_state; // Pre-resolved global state pointer
size_t bucket_count; // Cache bucket count from family
HistogramLabelData(const LabelsKey &key, ArenaAllocator &arena)
HistogramLabelData(const LabelsKey &key, Arena &arena)
: labels_key(key), thread_states(&arena), global_state(nullptr),
bucket_count(0) {}
};
@@ -764,7 +755,7 @@ struct Metric {
ArenaVector<ArenaVector<CounterLabelData>> counter_data;
ArenaVector<ArenaVector<GaugeLabelData>> gauge_data;
ArenaVector<ArenaVector<HistogramLabelData>> histogram_data;
explicit LabelSets(ArenaAllocator &arena)
explicit LabelSets(Arena &arena)
: counter_data(&arena), gauge_data(&arena), histogram_data(&arena) {}
};
@@ -846,7 +837,7 @@ struct Metric {
// Three-phase rendering system
struct RenderPlan {
ArenaAllocator arena;
Arena arena;
ArenaVector<std::string_view> static_text{&arena};
ArenaVector<RenderInstruction> instructions{&arena};
uint64_t registration_version;
@@ -865,7 +856,7 @@ struct Metric {
// Use temporary arena for formatting static text (will be interned to
// global arena)
ArenaAllocator temp_arena(8192); // 8KB for temporary formatting
Arena temp_arena(8192); // 8KB for temporary formatting
// Helper function to append an additional label to existing Prometheus
// format
@@ -1091,7 +1082,7 @@ struct Metric {
// Phase 2: Execute phase - run instructions and generate dynamic text
static ArenaVector<std::string_view>
execute_render_plan(ArenaAllocator &arena,
execute_render_plan(Arena &arena,
const ArenaVector<RenderInstruction> &instructions) {
ArenaVector<std::string_view> dynamic_text(&arena);
@@ -1191,7 +1182,7 @@ struct Metric {
// Phase 3: Present phase - interleave static and dynamic text
static ArenaVector<std::string_view>
present_render_output(ArenaAllocator &arena,
present_render_output(Arena &arena,
const ArenaVector<std::string_view> &static_text,
const ArenaVector<std::string_view> &dynamic_text) {
ArenaVector<std::string_view> output(&arena);
@@ -1213,7 +1204,7 @@ struct Metric {
}
// Build label sets once for reuse in both phases
static LabelSets build_label_sets(ArenaAllocator &arena) {
static LabelSets build_label_sets(Arena &arena) {
LabelSets label_sets{arena};
// Build counter data with pre-resolved pointers
@@ -1495,7 +1486,7 @@ Family<Gauge> create_gauge(std::string_view name, std::string_view help) {
auto name_view = arena_copy_string(name, global_arena);
auto &familyPtr = Metric::get_gauge_families()[name_view];
if (!familyPtr) {
// Family<T>::State instances use ArenaAllocator::Ptr for automatic cleanup
// Family<T>::State instances use Arena::Ptr for automatic cleanup
familyPtr = global_arena.construct<Family<Gauge>::State>(global_arena);
familyPtr->name = name_view;
familyPtr->help = arena_copy_string(help, global_arena);
@@ -1519,7 +1510,7 @@ Family<Histogram> create_histogram(std::string_view name, std::string_view help,
auto name_view = arena_copy_string(name, global_arena);
auto &family_ptr = Metric::get_histogram_families()[name_view];
if (!family_ptr) {
// Family<T>::State instances use ArenaAllocator::Ptr for automatic cleanup
// Family<T>::State instances use Arena::Ptr for automatic cleanup
family_ptr = global_arena.construct<Family<Histogram>::State>(global_arena);
family_ptr->name = name_view;
family_ptr->help = arena_copy_string(help, global_arena);
@@ -1688,7 +1679,7 @@ static double calculate_metrics_memory_usage() {
}
// New three-phase render implementation
std::span<std::string_view> render(ArenaAllocator &arena) {
std::span<std::string_view> render(Arena &arena) {
// Initialize self-monitoring metrics (before taking global lock)
static auto memory_gauge = []() {
auto gauge = create_gauge("weaseldb_metrics_memory_bytes",

View File

@@ -50,7 +50,7 @@
#include <type_traits>
#include <vector>
#include "arena_allocator.hpp"
#include "arena.hpp"
namespace metric {
@@ -220,7 +220,7 @@ std::vector<double> exponential_buckets(double start, double factor, int count);
// allocated in provided arena for zero-copy efficiency. The caller is
// responsible for the arena's lifecycle. THREAD SAFETY: Serialized by global
// mutex - callbacks need not be thread-safe
std::span<std::string_view> render(ArenaAllocator &arena);
std::span<std::string_view> render(Arena &arena);
// Validation functions for Prometheus compatibility
bool is_valid_metric_name(std::string_view name);

View File

@@ -32,6 +32,19 @@ struct StatusEntry {
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for /ok health check requests.
* Flows through all pipeline stages as a noop except resolve stage.
* Resolve stage can perform configurable CPU work for benchmarking.
*/
struct HealthCheckEntry {
std::unique_ptr<Connection> connection;
HealthCheckEntry() = default; // Default constructor for variant
explicit HealthCheckEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup.
@@ -44,4 +57,5 @@ struct ShutdownEntry {
* Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately.
*/
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
using PipelineEntry =
std::variant<CommitEntry, StatusEntry, HealthCheckEntry, ShutdownEntry>;

View File

@@ -114,7 +114,7 @@ std::string_view response = static_format(arena,
"\r\n", body);
// Printf-style formatting - runtime flexible
ArenaAllocator& arena = conn.get_arena();
Arena& arena = conn.get_arena();
std::string_view response = format(arena,
"HTTP/1.1 %d OK\r\n"
"Content-Length: %zu\r\n"
@@ -154,9 +154,9 @@ int32_t initial_block_size_;
- **Full encapsulation still applies** - use `private:` sections to hide implementation details and maintain deep, capable structs
- The struct keyword doesn't mean shallow design - it means interface-first organization for human readers
```cpp
struct ArenaAllocator {
struct Arena {
// Public interface first
explicit ArenaAllocator(int64_t initial_size = 1024);
explicit Arena(int64_t initial_size = 1024);
void* allocate_raw(int64_t size);
private:
@@ -228,7 +228,7 @@ template <typename T> struct rebind { using type = T*; };
#include <simdutf.h>
#include <weaseljson/weaseljson.h>
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "commit_request.hpp"
// Never this:
@@ -248,16 +248,16 @@ std::unique_ptr<Parser> parser;
- **Explicit constructors** to prevent implicit conversions
- **Delete copy operations** when inappropriate
```cpp
struct ArenaAllocator {
explicit ArenaAllocator(int64_t initial_size = 1024);
struct Arena {
explicit Arena(int64_t initial_size = 1024);
// Copy construction is not allowed
ArenaAllocator(const ArenaAllocator &source) = delete;
ArenaAllocator &operator=(const ArenaAllocator &source) = delete;
Arena(const Arena &source) = delete;
Arena &operator=(const Arena &source) = delete;
// Move semantics
ArenaAllocator(ArenaAllocator &&source) noexcept;
ArenaAllocator &operator=(ArenaAllocator &&source) noexcept;
Arena(Arena &&source) noexcept;
Arena &operator=(Arena &&source) noexcept;
private:
int32_t initial_block_size_;
@@ -276,7 +276,7 @@ private:
std::span<const Operation> operations() const { return operations_; }
void process_data(std::string_view request_data); // ≤ 16 bytes, pass by value
void process_request(const CommitRequest& commit_request); // > 16 bytes, pass by reference
ArenaAllocator(ArenaAllocator &&source) noexcept;
Arena(Arena &&source) noexcept;
```
### Template Usage
@@ -353,10 +353,10 @@ auto value = counter; // Implicit - memory ordering not explicit
- **STL containers with arena allocators require default construction after arena reset** - `clear()` is not sufficient
```cpp
// STL containers with arena allocators - correct reset pattern
std::vector<Operation, ArenaStlAllocator<Operation>> operations(arena_allocator);
std::vector<Operation, ArenaStlAllocator<Operation>> operations(arena);
// ... use container ...
operations = {}; // Default construct - clear() won't work correctly
arena_allocator.reset(); // Reset arena memory
arena.reset(); // Reset arena memory
```
### Resource Management
@@ -364,7 +364,7 @@ arena_allocator.reset(); // Reset arena memory
- **Move semantics** for efficient resource transfer
- **Explicit cleanup** methods where appropriate
```cpp
~ArenaAllocator() {
~Arena() {
while (current_block_) {
Block *prev = current_block_->prev;
std::free(current_block_);
@@ -395,7 +395,7 @@ enum class [[nodiscard]] ParseResult { Success, InvalidJson, MissingField };
// System failure - abort immediately
void* memory = std::malloc(size);
if (!memory) {
std::fprintf(stderr, "ArenaAllocator: Memory allocation failed\n");
std::fprintf(stderr, "Arena: Memory allocation failed\n");
std::abort();
}
// ... use memory, eventually std::free(memory)
@@ -529,8 +529,8 @@ Connection(struct sockaddr_storage addr, int fd, int64_t id,
- **SUBCASE** for related test variations
- **Fresh instances** for each test to avoid state contamination
```cpp
TEST_CASE("ArenaAllocator basic allocation") {
ArenaAllocator arena;
TEST_CASE("Arena basic allocation") {
Arena arena;
SUBCASE("allocate zero bytes returns nullptr") {
void *ptr = arena.allocate_raw(0);

View File

@@ -0,0 +1,33 @@
# WeaselDB Test Configuration with Benchmark Health Check
[server]
# Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing
interfaces = [
{ type = "tcp", address = "127.0.0.1", port = 8080 },
{ type = "unix", path = "weaseldb.sock" }
]
# Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events
io_threads = 8
epoll_instances = 8
# Event batch size for epoll processing
event_batch_size = 64
[commit]
# Minimum length for request_id to ensure sufficient entropy
min_request_id_length = 20
# How long to retain request IDs for /v1/status queries (hours)
request_id_retention_hours = 24
# Minimum number of versions to retain request IDs
request_id_retention_versions = 100000000
[subscription]
# Maximum buffer size for unconsumed data in /v1/subscribe (bytes)
max_buffer_size_bytes = 10485760 # 10MB
# Interval for sending keepalive comments to prevent idle timeouts (seconds)
keepalive_interval_seconds = 30
[benchmark]
# Use original benchmark load for testing
ok_resolve_iterations = 4000

View File

@@ -0,0 +1,554 @@
#include <doctest/doctest.h>
#include <cstring>
#include <string>
#include "api_url_parser.hpp"
// Helper to convert string to mutable buffer for testing
std::string make_mutable_copy(const std::string &url) {
return url; // Return copy that can be modified
}
TEST_CASE("ApiUrlParser routing") {
SUBCASE("Static GET routes") {
auto url = make_mutable_copy("/v1/version");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetVersion);
url = make_mutable_copy("/v1/subscribe");
result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetSubscribe);
url = make_mutable_copy("/metrics");
result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetMetrics);
url = make_mutable_copy("/ok");
result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetOk);
}
SUBCASE("Static POST routes") {
auto url = make_mutable_copy("/v1/commit");
RouteMatch match;
auto result = ApiUrlParser::parse("POST", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::PostCommit);
}
SUBCASE("Not found") {
auto url = make_mutable_copy("/unknown/route");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
url = make_mutable_copy("/v1/version");
result = ApiUrlParser::parse("DELETE", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
}
}
TEST_CASE("ApiUrlParser with query strings") {
SUBCASE("Simple query string") {
auto url = make_mutable_copy("/v1/status?request_id=123");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"123");
}
SUBCASE("Multiple query parameters") {
auto url = make_mutable_copy("/v1/status?request_id=abc&min_version=42");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"abc");
REQUIRE(match.params[static_cast<int>(ApiParameterKey::MinVersion)]
.has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::MinVersion)].value() ==
"42");
}
SUBCASE("Unknown parameters are ignored") {
auto url = make_mutable_copy("/v1/version?foo=bar&baz=quux");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetVersion);
CHECK_FALSE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
}
}
TEST_CASE("ApiUrlParser with URL parameters") {
SUBCASE("PUT retention policy") {
auto url = make_mutable_copy("/v1/retention/my-policy");
RouteMatch match;
auto result = ApiUrlParser::parse("PUT", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::PutRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"my-policy");
}
SUBCASE("DELETE retention policy") {
auto url = make_mutable_copy("/v1/retention/another-policy");
RouteMatch match;
auto result = ApiUrlParser::parse("DELETE", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::DeleteRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"another-policy");
}
SUBCASE("GET retention policy") {
auto url = make_mutable_copy("/v1/retention/get-this");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"get-this");
}
SUBCASE("GET all retention policies (no ID)") {
auto url = make_mutable_copy("/v1/retention");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
CHECK_FALSE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
}
}
TEST_CASE("ApiUrlParser with URL and query parameters") {
auto url = make_mutable_copy("/v1/retention/p1?request_id=abc123");
RouteMatch match;
auto result = ApiUrlParser::parse("DELETE", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::DeleteRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"p1");
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"abc123");
}
TEST_CASE("ApiUrlParser URL decoding") {
SUBCASE("Path segment percent-decoding") {
auto url = make_mutable_copy("/v1/retention/my%2Dpolicy");
RouteMatch match;
auto result = ApiUrlParser::parse("PUT", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::PutRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"my-policy");
}
SUBCASE("Query parameter form decoding (+ to space)") {
auto url = make_mutable_copy("/v1/status?request_id=hello+world");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"hello world");
}
SUBCASE("Query parameter percent-decoding") {
auto url = make_mutable_copy("/v1/status?request_id=hello%20world");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"hello world");
}
SUBCASE("Base64-like sequences in query parameters") {
auto url = make_mutable_copy("/v1/status?request_id=YWJj%3D");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"YWJj=");
}
SUBCASE("Mixed encoding in path and query") {
auto url = make_mutable_copy(
"/v1/retention/my%2Dpolicy?request_id=hello+world%21");
RouteMatch match;
auto result = ApiUrlParser::parse("DELETE", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::DeleteRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"my-policy");
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"hello world!");
}
}
TEST_CASE("ApiUrlParser malformed encoding") {
SUBCASE("Incomplete percent sequence in path") {
auto url = make_mutable_copy("/v1/retention/bad%2");
RouteMatch match;
auto result = ApiUrlParser::parse("PUT", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
SUBCASE("Invalid hex digits in path") {
auto url = make_mutable_copy("/v1/retention/bad%ZZ");
RouteMatch match;
auto result = ApiUrlParser::parse("PUT", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
SUBCASE("Incomplete percent sequence in query") {
auto url = make_mutable_copy("/v1/status?request_id=bad%2");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
SUBCASE("Invalid hex digits in query") {
auto url = make_mutable_copy("/v1/status?request_id=bad%GG");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
SUBCASE("Percent at end of path") {
auto url = make_mutable_copy("/v1/retention/bad%");
RouteMatch match;
auto result = ApiUrlParser::parse("PUT", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
SUBCASE("Percent at end of query") {
auto url = make_mutable_copy("/v1/status?request_id=bad%");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::MalformedEncoding);
}
}
TEST_CASE("ApiUrlParser edge cases and bugs") {
SUBCASE("Bug: Path boundary error - /v1/retention/ with trailing slash") {
// BUG: Code checks length > 13 but substrings at 14, causing off-by-one
auto url = make_mutable_copy(
"/v1/retention/"); // length 14, exactly the boundary case
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
// This should NOT set PolicyId since it's empty, but current code might
// have issues
CHECK_FALSE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
}
SUBCASE("Bug: Empty URL handling") {
auto url = make_mutable_copy("");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
}
SUBCASE("Bug: Query-only URL") {
auto url = make_mutable_copy("?request_id=123");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
// Should still parse query parameters
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"123");
}
SUBCASE("Bug: Consecutive delimiters in query string") {
auto url =
make_mutable_copy("/v1/status?&&request_id=123&&min_version=42&&");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"123");
REQUIRE(match.params[static_cast<int>(ApiParameterKey::MinVersion)]
.has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::MinVersion)].value() ==
"42");
}
SUBCASE("Bug: Parameter without value (should be skipped)") {
auto url = make_mutable_copy("/v1/status?debug&request_id=123");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
// debug parameter should be ignored since it has no value
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"123");
}
SUBCASE("Bug: Empty parameter value") {
auto url = make_mutable_copy("/v1/status?request_id=");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"");
}
SUBCASE("Edge: Exact length boundary for retention path") {
// Test the exact boundary condition: "/v1/retention" is 13 chars
auto url = make_mutable_copy("/v1/retention"); // exactly 13 characters
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
CHECK_FALSE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
}
SUBCASE("Edge: Minimum valid policy ID") {
// Test one character after the boundary
auto url =
make_mutable_copy("/v1/retention/a"); // 15 chars total, policy_id = "a"
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
"a");
}
}
TEST_CASE("ApiUrlParser specific bug reproduction") {
SUBCASE("Reproduction: Path boundary math error") {
// The bug: code checks length > 13 but substrings at 14
// "/v1/retention" = 13 chars, "/v1/retention/" = 14 chars
// This test should demonstrate undefined behavior or wrong results
auto url = make_mutable_copy("/v1/retention/");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
// Expected behavior: should match GetRetention but NOT set PolicyId
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
// The bug: may incorrectly extract empty string or cause buffer read error
// Let's see what actually happens
if (match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value()) {
auto policy_id =
match.params[static_cast<int>(ApiParameterKey::PolicyId)].value();
CHECK(policy_id.empty()); // Should be empty if set at all
}
}
SUBCASE("Reproduction: Query parsing with edge cases") {
// Test parameter parsing with multiple edge conditions
auto url = make_mutable_copy(
"/v1/status?=empty_key&no_value&request_id=&min_version=42&=");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
// Should handle empty values correctly
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"");
REQUIRE(match.params[static_cast<int>(ApiParameterKey::MinVersion)]
.has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::MinVersion)].value() ==
"42");
}
SUBCASE("Reproduction: Very long input stress test") {
// Test potential integer overflow or performance issues
std::string long_policy_id(1000, 'x'); // 1000 character policy ID
auto url = make_mutable_copy("/v1/retention/" + long_policy_id +
"?request_id=" + std::string(500, 'y'));
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)].value() ==
long_policy_id);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
std::string(500, 'y'));
}
SUBCASE("Reproduction: Zero-length edge case") {
char empty_buffer[1] = {0};
RouteMatch match;
auto result = ApiUrlParser::parse("GET", empty_buffer, 0, match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
}
SUBCASE("Reproduction: Null buffer edge case") {
// This might cause undefined behavior if not handled properly
RouteMatch match;
char single_char = '/';
auto result = ApiUrlParser::parse("GET", &single_char, 1, match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::NotFound);
}
SUBCASE("BUG: Query parser pos increment overflow") {
// BUG: pos += pair_end + 1 can go beyond buffer bounds
// When pair_end == query_length - pos (no & found), pos becomes
// query_length + 1
auto url = make_mutable_copy("/v1/status?no_ampersand_at_end");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
// This should not crash or have undefined behavior
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetStatus);
// Parameter should be ignored since it's not a known key
}
SUBCASE("BUG: String view with potentially negative length cast") {
// BUG: decoded_value_length is int but gets cast to size_t for string_view
// If decode function returned negative (which it can), this could wrap
// around
// We can't easily trigger the decode function to return -1 through normal
// parsing since that's caught earlier, but this tests the edge case
// handling
auto url = make_mutable_copy("/v1/status?request_id=normal_value");
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
REQUIRE(
match.params[static_cast<int>(ApiParameterKey::RequestId)].has_value());
CHECK(match.params[static_cast<int>(ApiParameterKey::RequestId)].value() ==
"normal_value");
}
SUBCASE("BUG: Array bounds - PolicyId path extraction edge case") {
// Test the boundary condition more precisely
// "/v1/retention" = 13 chars, checking length > 13, substr(14)
auto url = make_mutable_copy("/v1/retention/"); // exactly 14 chars
RouteMatch match;
auto result = ApiUrlParser::parse("GET", url.data(),
static_cast<int>(url.size()), match);
CHECK(result == ParseResult::Success);
CHECK(match.route == HttpRoute::GetRetention);
// path.length() = 14, so > 13 is true
// path.substr(14) should return empty string_view
// The bug would be if this crashes or returns invalid data
if (match.params[static_cast<int>(ApiParameterKey::PolicyId)].has_value()) {
CHECK(match.params[static_cast<int>(ApiParameterKey::PolicyId)]
.value()
.empty());
}
}
}

View File

@@ -1,28 +1,27 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "format.hpp"
#include <cstring>
#include <doctest/doctest.h>
#include <string>
#include <vector>
TEST_CASE("ArenaAllocator basic construction") {
ArenaAllocator arena;
TEST_CASE("Arena basic construction") {
Arena arena;
CHECK(arena.num_blocks() == 0);
CHECK(arena.used_bytes() == 0);
CHECK(arena.total_allocated() == 0);
CHECK(arena.available_in_current_block() == 0);
}
TEST_CASE("ArenaAllocator custom initial size") {
ArenaAllocator arena(2048);
TEST_CASE("Arena custom initial size") {
Arena arena(2048);
CHECK(arena.num_blocks() == 0);
CHECK(arena.total_allocated() == 0);
CHECK(arena.available_in_current_block() == 0);
}
TEST_CASE("ArenaAllocator basic allocation") {
ArenaAllocator arena;
TEST_CASE("Arena basic allocation") {
Arena arena;
SUBCASE("allocate zero bytes returns nullptr") {
void *ptr = arena.allocate_raw(0);
@@ -47,8 +46,8 @@ TEST_CASE("ArenaAllocator basic allocation") {
}
}
TEST_CASE("ArenaAllocator alignment") {
ArenaAllocator arena;
TEST_CASE("Arena alignment") {
Arena arena;
SUBCASE("default alignment") {
void *ptr = arena.allocate_raw(1);
@@ -67,14 +66,14 @@ TEST_CASE("ArenaAllocator alignment") {
}
SUBCASE("alignment with larger allocations") {
ArenaAllocator fresh_arena;
Arena fresh_arena;
void *ptr = fresh_arena.allocate_raw(100, 64);
CHECK(reinterpret_cast<uintptr_t>(ptr) % 64 == 0);
}
}
TEST_CASE("ArenaAllocator block management") {
ArenaAllocator arena(128);
TEST_CASE("Arena block management") {
Arena arena(128);
SUBCASE("single block allocation") {
void *ptr = arena.allocate_raw(64);
@@ -99,8 +98,8 @@ TEST_CASE("ArenaAllocator block management") {
}
}
TEST_CASE("ArenaAllocator construct template") {
ArenaAllocator arena;
TEST_CASE("Arena construct template") {
Arena arena;
SUBCASE("construct int") {
int *ptr = arena.construct<int>(42);
@@ -143,8 +142,8 @@ TEST_CASE("ArenaAllocator construct template") {
}
}
TEST_CASE("ArenaAllocator reset functionality") {
ArenaAllocator arena;
TEST_CASE("Arena reset functionality") {
Arena arena;
arena.allocate_raw(100);
arena.allocate_raw(200);
@@ -160,8 +159,8 @@ TEST_CASE("ArenaAllocator reset functionality") {
CHECK(arena.used_bytes() == 50);
}
TEST_CASE("ArenaAllocator reset memory leak test") {
ArenaAllocator arena(32); // Smaller initial size
TEST_CASE("Arena reset memory leak test") {
Arena arena(32); // Smaller initial size
// Force multiple blocks
arena.allocate_raw(30); // First block (32 bytes)
@@ -192,8 +191,8 @@ TEST_CASE("ArenaAllocator reset memory leak test") {
CHECK(arena.used_bytes() == 20);
}
TEST_CASE("ArenaAllocator memory tracking") {
ArenaAllocator arena(512);
TEST_CASE("Arena memory tracking") {
Arena arena(512);
CHECK(arena.total_allocated() == 0);
CHECK(arena.used_bytes() == 0);
@@ -211,8 +210,8 @@ TEST_CASE("ArenaAllocator memory tracking") {
CHECK(arena.total_allocated() >= 1024);
}
TEST_CASE("ArenaAllocator stress test") {
ArenaAllocator arena(1024);
TEST_CASE("Arena stress test") {
Arena arena(1024);
SUBCASE("many small allocations") {
std::vector<void *> ptrs;
@@ -238,13 +237,13 @@ TEST_CASE("ArenaAllocator stress test") {
}
}
TEST_CASE("ArenaAllocator move semantics") {
ArenaAllocator arena1(512);
TEST_CASE("Arena move semantics") {
Arena arena1(512);
arena1.allocate_raw(100);
size_t used_bytes = arena1.used_bytes();
size_t num_blocks = arena1.num_blocks();
ArenaAllocator arena2 = std::move(arena1);
Arena arena2 = std::move(arena1);
CHECK(arena2.used_bytes() == used_bytes);
CHECK(arena2.num_blocks() == num_blocks);
@@ -252,16 +251,16 @@ TEST_CASE("ArenaAllocator move semantics") {
CHECK(ptr != nullptr);
}
TEST_CASE("ArenaAllocator edge cases") {
TEST_CASE("Arena edge cases") {
SUBCASE("very small block size") {
ArenaAllocator arena(16);
Arena arena(16);
void *ptr = arena.allocate_raw(8);
CHECK(ptr != nullptr);
CHECK(arena.num_blocks() == 1);
}
SUBCASE("allocation exactly block size") {
ArenaAllocator arena(64);
Arena arena(64);
void *ptr = arena.allocate_raw(64);
CHECK(ptr != nullptr);
CHECK(arena.num_blocks() == 1);
@@ -272,7 +271,7 @@ TEST_CASE("ArenaAllocator edge cases") {
}
SUBCASE("multiple resets") {
ArenaAllocator arena;
Arena arena;
for (int i = 0; i < 10; ++i) {
arena.allocate_raw(100);
arena.reset();
@@ -291,8 +290,8 @@ struct TestPOD {
}
};
TEST_CASE("ArenaAllocator with custom objects") {
ArenaAllocator arena;
TEST_CASE("Arena with custom objects") {
Arena arena;
TestPOD *obj1 = arena.construct<TestPOD>(42, "first");
TestPOD *obj2 = arena.construct<TestPOD>(84, "second");
@@ -306,8 +305,8 @@ TEST_CASE("ArenaAllocator with custom objects") {
CHECK(std::strcmp(obj2->name, "second") == 0);
}
TEST_CASE("ArenaAllocator geometric growth policy") {
ArenaAllocator arena(64);
TEST_CASE("Arena geometric growth policy") {
Arena arena(64);
SUBCASE("normal geometric growth doubles size") {
arena.allocate_raw(60); // Fill first block
@@ -339,8 +338,8 @@ TEST_CASE("ArenaAllocator geometric growth policy") {
}
}
TEST_CASE("ArenaAllocator alignment edge cases") {
ArenaAllocator arena;
TEST_CASE("Arena alignment edge cases") {
Arena arena;
SUBCASE("unaligned then aligned allocation") {
void *ptr1 = arena.allocate_raw(1, 1);
@@ -352,20 +351,20 @@ TEST_CASE("ArenaAllocator alignment edge cases") {
}
SUBCASE("large alignment requirements") {
ArenaAllocator fresh_arena;
Arena fresh_arena;
void *ptr = fresh_arena.allocate_raw(1, 128);
CHECK(ptr != nullptr);
CHECK(reinterpret_cast<uintptr_t>(ptr) % 128 == 0);
}
}
TEST_CASE("ArenaAllocator realloc functionality") {
ArenaAllocator arena;
TEST_CASE("Arena realloc functionality") {
Arena arena;
SUBCASE("realloc edge cases") {
// realloc with new_size == 0 returns nullptr and reclaims memory if it's
// the last allocation
ArenaAllocator fresh_arena(256);
Arena fresh_arena(256);
void *ptr = fresh_arena.allocate_raw(100);
size_t used_before = fresh_arena.used_bytes();
CHECK(used_before == 100);
@@ -375,7 +374,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
CHECK(fresh_arena.used_bytes() == 0); // Memory should be reclaimed
// Test case where it's NOT the last allocation - memory cannot be reclaimed
ArenaAllocator arena2(256);
Arena arena2(256);
void *ptr1 = arena2.allocate_raw(50);
(void)arena2.allocate_raw(50);
size_t used_before2 = arena2.used_bytes();
@@ -398,7 +397,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("in-place extension - growing") {
ArenaAllocator fresh_arena(1024);
Arena fresh_arena(1024);
void *ptr = fresh_arena.allocate_raw(100);
CHECK(ptr != nullptr);
@@ -419,7 +418,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("in-place shrinking") {
ArenaAllocator fresh_arena(1024);
Arena fresh_arena(1024);
void *ptr = fresh_arena.allocate_raw(200);
std::memset(ptr, 0xCD, 200);
@@ -436,7 +435,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("copy when can't extend in place") {
ArenaAllocator fresh_arena(256); // Larger block to avoid edge cases
Arena fresh_arena(256); // Larger block to avoid edge cases
// Allocate first chunk
void *ptr1 = fresh_arena.allocate_raw(60);
@@ -471,7 +470,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("copy when insufficient space for extension") {
ArenaAllocator fresh_arena(100);
Arena fresh_arena(100);
// Allocate almost all space
void *ptr = fresh_arena.allocate_raw(90);
@@ -490,7 +489,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("realloc with custom alignment") {
ArenaAllocator fresh_arena(1024);
Arena fresh_arena(1024);
// Allocate with specific alignment
void *ptr = fresh_arena.allocate_raw(50, 16);
@@ -510,7 +509,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
}
SUBCASE("realloc stress test") {
ArenaAllocator fresh_arena(512);
Arena fresh_arena(512);
void *ptr = fresh_arena.allocate_raw(50);
size_t current_size = 50;
@@ -537,7 +536,7 @@ TEST_CASE("ArenaAllocator realloc functionality") {
TEST_CASE("format function fallback codepath") {
SUBCASE("single-pass optimization success") {
ArenaAllocator arena(128);
Arena arena(128);
auto result = format(arena, "Hello %s! Number: %d", "World", 42);
CHECK(result == "Hello World! Number: 42");
CHECK(result.length() == 23);
@@ -545,7 +544,7 @@ TEST_CASE("format function fallback codepath") {
SUBCASE("fallback when speculative formatting fails") {
// Create arena with limited space to force fallback
ArenaAllocator arena(16);
Arena arena(16);
// Consume most space to leave insufficient room for speculative formatting
arena.allocate<char>(10);
@@ -562,7 +561,7 @@ TEST_CASE("format function fallback codepath") {
}
SUBCASE("edge case - exactly available space") {
ArenaAllocator arena(32);
Arena arena(32);
arena.allocate<char>(20); // Leave 12 bytes
CHECK(arena.available_in_current_block() == 12);
@@ -575,13 +574,13 @@ TEST_CASE("format function fallback codepath") {
SUBCASE("allocate_remaining_space postcondition") {
// Test empty arena
ArenaAllocator empty_arena(64);
Arena empty_arena(64);
auto space1 = empty_arena.allocate_remaining_space();
CHECK(space1.allocated_bytes >= 1);
CHECK(space1.allocated_bytes == 64);
// Test full arena (should create new block)
ArenaAllocator full_arena(32);
Arena full_arena(32);
full_arena.allocate<char>(32); // Fill completely
auto space2 = full_arena.allocate_remaining_space();
CHECK(space2.allocated_bytes >= 1);
@@ -589,7 +588,7 @@ TEST_CASE("format function fallback codepath") {
}
SUBCASE("format error handling") {
ArenaAllocator arena(64);
Arena arena(64);
// Test with invalid format (should return empty string_view)
// Note: This is hard to trigger reliably across platforms,
@@ -599,7 +598,7 @@ TEST_CASE("format function fallback codepath") {
}
}
// Test object with non-trivial destructor for ArenaAllocator::Ptr testing
// Test object with non-trivial destructor for Arena::Ptr testing
class TestObject {
public:
static int destructor_count;
@@ -626,11 +625,11 @@ struct TrivialObject {
TrivialObject(int v) : value(v) {}
};
TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
TEST_CASE("Arena::Ptr smart pointer functionality") {
TestObject::reset_counters();
SUBCASE("construct returns raw pointer for trivially destructible types") {
ArenaAllocator arena;
Arena arena;
auto ptr = arena.construct<TrivialObject>(42);
static_assert(std::is_same_v<decltype(ptr), TrivialObject *>,
@@ -640,23 +639,22 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(ptr->value == 42);
}
SUBCASE("construct returns ArenaAllocator::Ptr for non-trivially "
SUBCASE("construct returns Arena::Ptr for non-trivially "
"destructible types") {
ArenaAllocator arena;
Arena arena;
auto ptr = arena.construct<TestObject>(42);
static_assert(
std::is_same_v<decltype(ptr), ArenaAllocator::Ptr<TestObject>>,
"construct() should return ArenaAllocator::Ptr for non-trivially "
"destructible types");
static_assert(std::is_same_v<decltype(ptr), Arena::Ptr<TestObject>>,
"construct() should return Arena::Ptr for non-trivially "
"destructible types");
CHECK(ptr);
CHECK(ptr->value == 42);
CHECK(TestObject::constructor_count == 1);
CHECK(TestObject::destructor_count == 0);
}
SUBCASE("ArenaAllocator::Ptr calls destructor on destruction") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr calls destructor on destruction") {
Arena arena;
{
auto ptr = arena.construct<TestObject>(42);
@@ -667,8 +665,8 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(TestObject::destructor_count == 1);
}
SUBCASE("ArenaAllocator::Ptr move semantics") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr move semantics") {
Arena arena;
auto ptr1 = arena.construct<TestObject>(42);
CHECK(TestObject::constructor_count == 1);
@@ -683,8 +681,8 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(TestObject::destructor_count == 1); // Destructor called
}
SUBCASE("ArenaAllocator::Ptr access operators") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr access operators") {
Arena arena;
auto ptr = arena.construct<TestObject>(123);
@@ -704,8 +702,8 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(static_cast<bool>(ptr) == true);
}
SUBCASE("ArenaAllocator::Ptr reset functionality") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr reset functionality") {
Arena arena;
auto ptr = arena.construct<TestObject>(42);
CHECK(TestObject::constructor_count == 1);
@@ -724,8 +722,8 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(TestObject::destructor_count == 1);
}
SUBCASE("ArenaAllocator::Ptr release functionality") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr release functionality") {
Arena arena;
auto ptr = arena.construct<TestObject>(42);
TestObject *raw_ptr = ptr.release();
@@ -740,8 +738,8 @@ TEST_CASE("ArenaAllocator::Ptr smart pointer functionality") {
CHECK(TestObject::destructor_count == 1);
}
SUBCASE("ArenaAllocator::Ptr move assignment") {
ArenaAllocator arena;
SUBCASE("Arena::Ptr move assignment") {
Arena arena;
auto ptr1 = arena.construct<TestObject>(42);
auto ptr2 = arena.construct<TestObject>(84);

View File

@@ -1,4 +1,3 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include "../benchmarks/test_data.hpp"
#include "parser_comparison.hpp"
#include <doctest/doctest.h>

View File

@@ -1,4 +1,4 @@
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "http_handler.hpp"
#include "perfetto_categories.hpp"
#include <atomic>
@@ -12,13 +12,13 @@ std::atomic<int> activeConnections{0};
// Simple test helper since Connection has complex constructor requirements
struct TestConnectionData {
ArenaAllocator arena;
Arena arena;
std::string message_buffer;
void *user_data = nullptr;
void append_message(std::string_view data) { message_buffer += data; }
ArenaAllocator &get_arena() { return arena; }
Arena &get_arena() { return arena; }
const std::string &getResponse() const { return message_buffer; }
void clearResponse() { message_buffer.clear(); }
void reset() {
@@ -27,68 +27,6 @@ struct TestConnectionData {
}
};
TEST_CASE("HttpHandler route parsing") {
SUBCASE("GET routes") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version") ==
HttpRoute::GET_version);
CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe") ==
HttpRoute::GET_subscribe);
CHECK(HttpHandler::parseRoute("GET", "/v1/status") ==
HttpRoute::GET_status);
CHECK(HttpHandler::parseRoute("GET", "/v1/retention") ==
HttpRoute::GET_retention);
CHECK(HttpHandler::parseRoute("GET", "/metrics") == HttpRoute::GET_metrics);
CHECK(HttpHandler::parseRoute("GET", "/ok") == HttpRoute::GET_ok);
}
SUBCASE("POST routes") {
CHECK(HttpHandler::parseRoute("POST", "/v1/commit") ==
HttpRoute::POST_commit);
}
SUBCASE("PUT routes") {
CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy1") ==
HttpRoute::PUT_retention);
}
SUBCASE("DELETE routes") {
CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy1") ==
HttpRoute::DELETE_retention);
}
SUBCASE("Unknown routes") {
CHECK(HttpHandler::parseRoute("GET", "/unknown") == HttpRoute::NotFound);
CHECK(HttpHandler::parseRoute("PATCH", "/v1/version") ==
HttpRoute::NotFound);
}
SUBCASE("Query parameters stripped") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version?foo=bar") ==
HttpRoute::GET_version);
}
}
TEST_CASE("HttpHandler route parsing edge cases") {
// Test just the static route parsing method since full integration testing
// would require complex Connection setup with server dependencies
SUBCASE("Route parsing with query parameters") {
CHECK(HttpHandler::parseRoute("GET", "/v1/version?param=value") ==
HttpRoute::GET_version);
CHECK(HttpHandler::parseRoute("GET", "/v1/subscribe?stream=true") ==
HttpRoute::GET_subscribe);
}
SUBCASE("Retention policy routes") {
CHECK(HttpHandler::parseRoute("PUT", "/v1/retention/policy123") ==
HttpRoute::PUT_retention);
CHECK(HttpHandler::parseRoute("DELETE", "/v1/retention/policy456") ==
HttpRoute::DELETE_retention);
CHECK(HttpHandler::parseRoute("GET", "/v1/retention/policy789") ==
HttpRoute::GET_retention);
}
}
// Test helper to verify the new hook functionality
struct MockConnectionHandler : public ConnectionHandler {
bool write_progress_called = false;

View File

@@ -1,7 +1,6 @@
#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN
#include <doctest/doctest.h>
#include "arena_allocator.hpp"
#include "arena.hpp"
#include "metric.hpp"
#include <atomic>
@@ -280,7 +279,7 @@ TEST_CASE("callback-based metrics") {
[]() { return 42.0; });
// Callback should be called during render
ArenaAllocator arena;
Arena arena;
auto output = metric::render(arena);
CHECK(output.size() > 0);
}
@@ -289,7 +288,7 @@ TEST_CASE("callback-based metrics") {
gauge_family.register_callback({{"type", "callback"}},
[]() { return 123.5; });
ArenaAllocator arena;
Arena arena;
auto output = metric::render(arena);
CHECK(output.size() > 0);
}
@@ -305,7 +304,7 @@ TEST_CASE("callback-based metrics") {
}
TEST_CASE("prometheus text format rendering") {
ArenaAllocator arena;
Arena arena;
// Create some metrics
auto counter_family =
@@ -464,7 +463,7 @@ TEST_CASE("thread safety") {
threads.emplace_back([&]() {
start_latch.arrive_and_wait();
ArenaAllocator arena;
Arena arena;
auto output = metric::render(arena);
if (output.size() > 0) {
success_count.fetch_add(1);
@@ -504,7 +503,7 @@ TEST_CASE("thread counter cleanup bug") {
// Measure actual values from within the thread (before ThreadInit
// destructor runs)
ArenaAllocator thread_arena;
Arena thread_arena;
auto thread_output = metric::render(thread_arena);
for (const auto &line : thread_output) {
@@ -539,7 +538,7 @@ TEST_CASE("thread counter cleanup bug") {
worker.join();
// Measure values after thread cleanup
ArenaAllocator arena;
Arena arena;
auto output = metric::render(arena);
double counter_value_after = 0;
@@ -616,7 +615,7 @@ TEST_CASE("error conditions") {
TEST_CASE("memory management") {
SUBCASE("arena allocation in render") {
ArenaAllocator arena;
Arena arena;
auto initial_used = arena.used_bytes();
auto counter_family = metric::create_counter("memory_test", "Memory test");
@@ -637,7 +636,7 @@ TEST_CASE("memory management") {
}
SUBCASE("arena reset behavior") {
ArenaAllocator arena;
Arena arena;
auto counter_family = metric::create_counter("reset_test", "Reset test");
auto counter = counter_family.create({});
@@ -660,7 +659,7 @@ TEST_CASE("render output deterministic order golden test") {
// Clean slate - reset all metrics before this test
metric::reset_metrics_for_testing();
ArenaAllocator arena;
Arena arena;
// Create a comprehensive set of metrics with deliberate ordering
// to test deterministic output

View File

@@ -2,30 +2,30 @@
## Summary
WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeline with futex wake optimization, delivering 550ns serial CPU time per request while maintaining 0% CPU usage when idle. Higher serial CPU time means more CPU budget available for serial processing.
WeaselDB's /ok health check endpoint achieves 1M requests/second with 740ns of configurable CPU work per request through the 4-stage commit pipeline, while maintaining 0% CPU usage when idle. The configurable CPU work serves both as a health check (validating the full pipeline) and as a benchmarking tool for measuring per-request processing capacity.
## Performance Metrics
### Throughput
- **1.3M requests/second** over unix socket
- **1.0M requests/second** /ok health check endpoint (4-stage commit pipeline)
- 8 I/O threads with 8 epoll instances
- Load tester used 12 network threads
- Max latency: 4ms out of 90M requests
- **0% CPU usage when idle** (optimized futex wake implementation)
### Threading Architecture
- Two-stage pipeline: Stage-0 (noop) → Stage-1 (connection return)
- **Four-stage commit pipeline**: Sequence → Resolve → Persist → Release
- Lock-free coordination using atomic ring buffer
- **Optimized futex wake**: Only wake on final pipeline stage
- Each request "processed" serially on single thread
- Configurable CPU work performed serially in resolve stage
### Performance Characteristics
**Optimized Pipeline Mode**:
- **Throughput**: 1.3M requests/second
- **Serial CPU time per request**: 550ns (validated with nanobench)
- **Theoretical maximum serial CPU time**: 769ns (1,000,000,000ns ÷ 1,300,000 req/s)
- **Serial efficiency**: 71.5% (550ns ÷ 769ns)
**Health Check Pipeline (/ok endpoint)**:
- **Throughput**: 1.0M requests/second
- **Configurable CPU work**: 740ns (4000 iterations, validated with nanobench)
- **Theoretical maximum CPU time**: 1000ns (1,000,000,000ns ÷ 1,000,000 req/s)
- **CPU work efficiency**: 74% (740ns ÷ 1000ns)
- **Pipeline stages**: Sequence (noop) → Resolve (CPU work) → Persist (response) → Release (cleanup)
- **CPU usage when idle**: 0%
### Key Optimizations
@@ -41,22 +41,25 @@ WeaselDB achieved 1.3M requests/second throughput using a two-stage ThreadPipeli
- **Maintained**: 100,000 spin iterations necessary to prevent thread descheduling
- **Result**: Same throughput with more efficient spinning
**Stage-0 Batch Size Optimization**:
- **Changed**: Stage-0 max batch size from unlimited to 1
**Resolve Batch Size Optimization**:
- **Changed**: Resolve max batch size from unlimited to 1
- **Mechanism**: Single-item processing checks for work more frequently, keeping the thread in fast coordination paths instead of expensive spin/wait cycles
- **Profile evidence**: Coordination overhead reduced from ~11% to ~5.6% CPU time
- **Result**: Additional 12.7% increase in serial CPU budget (488ns → 550ns)
- **Overall improvement**: 38.9% increase from baseline (396ns → 550ns)
### Request Flow
**Health Check Pipeline** (/ok endpoint):
```
I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline
I/O Threads (8) → HttpHandler::on_batch_complete() → Commit Pipeline
↑ ↓
| Stage 0: Noop thread
| (550ns serial CPU per request)
| (batch size: 1)
| Stage 0: Sequence (noop)
| ↓
| Stage 1: Connection return
| Stage 1: Resolve (740ns CPU work)
| (spend_cpu_cycles(4000))
| ↓
| Stage 2: Persist (generate response)
| (send "OK" response)
| ↓
| Stage 3: Release (connection return)
| (optimized futex wake)
| ↓
└─────────────────────── Server::release_back_to_server()
@@ -64,7 +67,9 @@ I/O Threads (8) → HttpHandler::on_batch_complete() → ThreadPipeline
## Test Configuration
- Server: test_config.toml with 8 io_threads, 8 epoll_instances
- Load tester: ./load_tester --network-threads 12
- Server: test_benchmark_config.toml with 8 io_threads, 8 epoll_instances
- Configuration: `ok_resolve_iterations = 4000` (740ns CPU work)
- Load tester: targeting /ok endpoint
- Benchmark validation: ./bench_cpu_work 4000
- Build: ninja
- Command: ./weaseldb --config test_config.toml
- Command: ./weaseldb --config test_benchmark_config.toml

View File

@@ -25,6 +25,7 @@
### Infrastructure & Tooling
- [x] Implement thread-safe Prometheus metrics library and serve `GET /metrics` endpoint
- [ ] Implement gperf-based HTTP routing for efficient request dispatching
- [ ] Replace nlohmann/json with simdjson DOM API in parser comparison benchmarks
- [ ] Implement HTTP client for S3 interactions
- [ ] Design `HttpClient` class following WeaselDB patterns (factory creation, arena allocation, RAII)
- [ ] Implement connection pool with configurable limits (max connections, idle timeout)

View File

@@ -10,7 +10,7 @@
struct ArenaDebugger {
const CommitRequest &commit_request;
const ArenaAllocator &arena;
const Arena &arena;
std::unordered_set<const void *> referenced_addresses;
explicit ArenaDebugger(const CommitRequest &cr)