Compare commits

...

7 Commits

Author SHA1 Message Date
55069c0c79 Add counters for /v1/{commit,status,version} 2025-09-04 15:49:54 -04:00
96aae52853 Basic implementation of /commit, /version, and /status
No precondition checking, persistence, or log scanning yet.
2025-09-04 15:40:17 -04:00
8b6736127a Add commit pipeline design 2025-09-04 13:40:03 -04:00
9272048108 Outline commit pipeline 2025-09-03 23:43:03 -04:00
b2ffe3bfab Refactor to use format for http responses 2025-09-03 22:45:59 -04:00
978861c430 Parse commit request 2025-09-03 21:53:04 -04:00
46edb7cd26 Allow listening on multiple interfaces 2025-09-03 16:09:16 -04:00
14 changed files with 1435 additions and 277 deletions

View File

@@ -156,13 +156,25 @@ add_executable(
test_http_handler
tests/test_http_handler.cpp
src/http_handler.cpp
src/server.cpp
src/config.cpp
src/json_commit_request_parser.cpp
src/arena_allocator.cpp
src/format.cpp
src/connection.cpp
src/connection_registry.cpp
src/metric.cpp)
target_link_libraries(test_http_handler doctest::doctest llhttp_static
Threads::Threads perfetto simdutf::simdutf)
src/metric.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_dependencies(test_http_handler generate_json_tokens)
target_link_libraries(
test_http_handler
doctest::doctest
llhttp_static
Threads::Threads
toml11::toml11
perfetto
simdutf::simdutf
weaseljson)
target_include_directories(test_http_handler PRIVATE src)
target_compile_definitions(test_http_handler
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
@@ -177,6 +189,7 @@ add_executable(
src/arena_allocator.cpp
src/config.cpp
src/http_handler.cpp
src/json_commit_request_parser.cpp
src/format.cpp
src/metric.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp)

499
commit_pipeline.md Normal file
View File

@@ -0,0 +1,499 @@
# Commit Processing Pipeline
## Overview
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
## Architecture
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
```
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
```
### Pipeline Flow
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
2. **Sequence Stage**: Assign sequential version numbers to commits
3. **Resolve Stage**: Validate preconditions and check for conflicts
4. **Persist Stage**: Write commits to durable storage and notify subscribers
5. **Release Stage**: Return connections to HTTP I/O threads for response handling
## Stage Details
### Stage 0: Sequence Assignment
**Thread**: `txn-sequence`
**Purpose**: Version assignment and request ID management
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
- Record version assignments for transaction tracking
**Why Serialization is Required**:
- Version numbers must be strictly sequential without gaps
- Banned list updates must be atomic with version assignment
- Status requests must get accurate upper bound on potential commit versions
**Request ID Banned List**:
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
- Lifecycle: Grows indefinitely until process restart (leader change)
- Removal: Only on process restart/leader change, which invalidates all old request IDs
**Current Implementation**:
```cpp
bool HttpHandler::process_sequence_batch(BatchType &batch) {
for (auto &entry : batch) {
if (std::holds_alternative<ShutdownEntry>(entry)) {
return true; // Shutdown signal
}
// TODO: Pattern match on CommitEntry vs StatusEntry
// TODO: Implement sequence assignment logic for each type
}
return false; // Continue processing
}
```
### Stage 1: Precondition Resolution
**Thread**: `txn-resolve`
**Purpose**: Validate preconditions and detect conflicts
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Mark failed commits with failure information (including which preconditions failed)
**Why Serialization is Required**:
- Must maintain consistent view of in-memory recent writes set
- Conflict detection requires atomic evaluation of all preconditions against recent writes
- Recent writes set updates must be synchronized
**Transaction State Transitions**:
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
- Failed transactions continue through pipeline with failure information for client response
**Current Implementation**:
```cpp
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// TODO: Implement precondition resolution logic:
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
// 3. If failed: Mark with failure info (which preconditions failed)
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 2: Transaction Persistence
**Thread**: `txn-persist`
**Purpose**: Write semi-committed transactions to durable storage
**Serialization**: **Required** - Must mark batches durable in order
**Responsibilities**:
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Generate durability events for `/v1/subscribe` when committed version advances
- Batch multiple commits for efficient persistence operations
**Why Serialization is Required**:
- Batches must be marked durable in sequential version order
- High water mark updates must reflect strict ordering of committed versions
- Ensures consistency guarantees across all endpoints
**Committed Version High Water Mark**:
- Global atomic value tracking highest durably committed version
- Updated after each batch commits: set to highest version in the batch
- Read by `/v1/version` endpoint using atomic seq_cst reads
- Enables `/v1/subscribe` durability events when high water mark advances
**Batching Strategy**:
- Multiple semi-committed transactions can be persisted in a single batch
- High water mark updated once per batch to highest version in that batch
- See `persistence.md` for detailed persistence design
**Current Implementation**:
```cpp
bool HttpHandler::process_persist_batch(BatchType &batch) {
// TODO: Implement actual persistence logic:
// 1. For CommitEntry: Apply operations to persistent storage
// 2. Update committed version high water mark to highest version in batch
// 3. Generate durability events for /v1/subscribe
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 3: Connection Release
**Thread**: `txn-release`
**Purpose**: Return connections to HTTP server for client response
**Serialization**: Not required - Independent connection handling
**Responsibilities**:
- Return processed connections to HTTP server for all request types
- Connection carries response data (success/failure) and status information
- Trigger response transmission to clients
**Response Handling**:
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
- Failed transactions carry failure information through entire pipeline for proper client response
**Implementation**:
```cpp
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
for (auto &conn : batch) {
if (!conn) {
return true; // Shutdown signal
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(conn));
}
return false; // Continue processing
}
```
## Threading Model
### Thread Pipeline Configuration
```cpp
// 4-stage pipeline: sequence -> resolve -> persist -> release
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
// Pipeline entry type (to be implemented)
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
```
### Thread Creation and Management
```cpp
HttpHandler() {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
auto guard = commitPipeline.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Similar pattern for resolve, persist, and release threads...
}
```
### Batch Processing
Each stage processes connections in batches using RAII guards:
```cpp
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
// Process batch
for (auto &conn : guard.batch) {
// Stage-specific processing
}
// Guard destructor automatically publishes batch to next stage
```
## Flow Control
### Pipeline Entry
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
```cpp
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
// Collect commit requests that passed basic validation for 4-stage pipeline processing
int commit_count = 0;
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
if (state->route == HttpRoute::POST_commit &&
state->commit_request &&
state->parsing_commit) {
commit_count++;
}
}
}
// Send commit requests to 4-stage pipeline in batch
if (commit_count > 0) {
auto guard = commitPipeline.push(commit_count, true);
// Move qualifying connections into pipeline
}
}
```
### Backpressure Handling
The pipeline implements natural backpressure:
- Each stage blocks if downstream stages are full
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
### Shutdown Coordination
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
```cpp
~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{
auto guard = commitPipeline.push(1, true);
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
}
```
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
## Error Handling
### Stage-Level Error Handling
Each stage handles different entry types:
```cpp
// Pattern matching on pipeline entry variant
std::visit([&](auto&& entry) {
using T = std::decay_t<decltype(entry)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Handle status entry (or skip if transferred)
}
}, pipeline_entry);
```
### Connection Error States
- Failed CommitEntries are passed through the pipeline with error information
- Downstream stages skip processing for error connections but forward them
- Error responses are sent when connection reaches release stage
- Connection ownership is always transferred to ensure cleanup
### Pipeline Integrity
- ShutdownEntry signals shutdown to all stages
- Each stage checks for ShutdownEntry and returns true to signal shutdown
- RAII guards ensure entries are always published downstream
- No entries are lost even during error conditions
## Performance Characteristics
### Throughput Optimization
- **Batching**: Multiple connections processed per stage activation
- **Lock-Free Communication**: Ring buffer between stages
- **Minimal Context Switching**: Dedicated threads per stage
- **Arena Allocation**: Efficient memory management throughout pipeline
### Latency Optimization
- **Single-Pass Processing**: Each connection flows through all stages once
- **Streaming Design**: Stages process concurrently
- **Minimal Copying**: Connection ownership transfer, not data copying
- **Direct Response**: Release stage triggers immediate response transmission
### Scalability Characteristics
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
- **NUMA Awareness**: Can pin threads to specific CPU cores
## Configuration
### Pipeline Parameters
```cpp
private:
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
// 4-stage pipeline configuration
StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
```
### Tuning Considerations
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
## Pipeline Entry Types
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
### Pipeline Entry Variants
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
- **ShutdownEntry**: Signals pipeline shutdown to all stages
- **Future types**: Pipeline design supports additional entry types
### Stage Processing by Type
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|-------|-------------|-------------|---------------|---------------|
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
## API Endpoint Integration
### `/v1/commit` - Transaction Submission
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
#### Request Processing Flow
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
```cpp
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
// Parse and validate anything that doesn't need serialization:
// - JSON parsing and CommitRequest construction
// - Basic validation: leader_id check, operation format validation
// - Check that we have at least one operation
// If validation fails, send error response immediately and return
// If validation succeeds, connection will enter pipeline in on_batch_complete()
}
```
2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
3. **Pipeline Processing**:
- **Sequence**: Check banned list → assign version (or reject)
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
- **Persist**: Apply operations → mark committed, update high water mark
- **Release**: Return connection with response data
4. **Response Generation**: Based on pipeline results
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
### `/v1/status` - Commit Status Lookup
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
#### Request Processing Flow
1. **HTTP I/O Thread Processing**:
```cpp
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
// TODO: Extract request_id from URL and min_version from query params
// Current: Returns placeholder static response
}
```
2. **Two-Phase Processing**:
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
### `/v1/subscribe` - Real-time Transaction Stream
**Pipeline Integration**: Consumes events from resolve and persist stages
#### Event Sources
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
- **Persist Stage**: Durability events when committed version high water mark advances
#### Current Implementation
```cpp
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
// TODO: Parse query parameters (after, durable)
// TODO: Establish Server-Sent Events stream
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
}
```
### `/v1/version` - Version Information
**Pipeline Integration**: Direct atomic read, no pipeline interaction
```cpp
// TODO: Implement direct atomic read of committed version high water mark
// No pipeline interaction needed - seq_cst atomic read
// Leader ID is process-lifetime constant
```
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
## Integration Points
### HTTP Handler Integration
The pipeline integrates with the HTTP handler at two points:
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
2. **Exit**: Release stage calls `Server::release_back_to_server()`
### Persistence Layer Integration
The persist stage interfaces with:
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
- **Subscriber System**: Real-time change stream notifications
- **Metrics System**: Transaction throughput and latency tracking
### Database State Integration
- **Sequence Stage**: Updates version number generator
- **Resolve Stage**: Queries current database state for precondition validation
- **Persist Stage**: Applies mutations to authoritative database state
## Future Optimizations
### Potential Enhancements
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
3. **Batch Size Tuning**: Dynamic batch size based on load
4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
### Monitoring and Observability
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
2. **Error Tracking**: Error rates and types by stage
3. **Resource Utilization**: CPU and memory usage per pipeline thread
4. **Flow Control Events**: Backpressure and stall detection
## Implementation Status
### Current State
- ✅ Pipeline structure implemented with 4 stages
- ✅ Thread creation and management
- ✅ RAII batch processing
- ✅ Error handling framework
- ✅ Shutdown coordination
### TODO Items
- ⏳ Sequence assignment logic implementation
- ⏳ Precondition resolution implementation
- ⏳ S3 persistence batching implementation
- ⏳ Subscriber notification system
- ⏳ Performance monitoring and metrics
- ⏳ Configuration tuning and optimization

View File

@@ -1,8 +1,10 @@
# WeaselDB Configuration File
[server]
bind_address = "127.0.0.1"
port = 8080
# Network interfaces to listen on - production config with just TCP
interfaces = [
{ type = "tcp", address = "127.0.0.1", port = 8080 }
]
# Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events

View File

@@ -79,9 +79,31 @@ void ConfigParser::parse_section(const auto &toml_data,
void ConfigParser::parse_server_config(const auto &toml_data,
ServerConfig &config) {
parse_section(toml_data, "server", [&](const auto &srv) {
parse_field(srv, "bind_address", config.bind_address);
parse_field(srv, "port", config.port);
parse_field(srv, "unix_socket_path", config.unix_socket_path);
// Parse interfaces array
if (srv.contains("interfaces")) {
auto interfaces = srv.at("interfaces");
if (interfaces.is_array()) {
for (const auto &iface : interfaces.as_array()) {
if (iface.contains("type")) {
std::string type = iface.at("type").as_string();
if (type == "tcp") {
std::string address = iface.at("address").as_string();
int port = iface.at("port").as_integer();
config.interfaces.push_back(ListenInterface::tcp(address, port));
} else if (type == "unix") {
std::string path = iface.at("path").as_string();
config.interfaces.push_back(ListenInterface::unix_socket(path));
}
}
}
}
}
// If no interfaces configured, use default TCP interface
if (config.interfaces.empty()) {
config.interfaces.push_back(ListenInterface::tcp("127.0.0.1", 8080));
}
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
parse_field(srv, "io_threads", config.io_threads);
@@ -127,25 +149,38 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
bool ConfigParser::validate_config(const Config &config) {
bool valid = true;
// Validate server configuration
if (config.server.unix_socket_path.empty()) {
// TCP mode validation
if (config.server.port <= 0 || config.server.port > 65535) {
std::cerr << "Configuration error: server.port must be between 1 and "
"65535, got "
<< config.server.port << std::endl;
// Validate server interfaces
if (config.server.interfaces.empty()) {
std::cerr << "Configuration error: no interfaces configured" << std::endl;
valid = false;
}
} else {
// Unix socket mode validation
if (config.server.unix_socket_path.length() >
107) { // UNIX_PATH_MAX is typically 108
std::cerr << "Configuration error: unix_socket_path too long (max 107 "
"chars), got "
<< config.server.unix_socket_path.length() << " chars"
for (const auto &iface : config.server.interfaces) {
if (iface.type == ListenInterface::Type::TCP) {
if (iface.port <= 0 || iface.port > 65535) {
std::cerr << "Configuration error: TCP port must be between 1 and "
"65535, got "
<< iface.port << std::endl;
valid = false;
}
if (iface.address.empty()) {
std::cerr << "Configuration error: TCP address cannot be empty"
<< std::endl;
valid = false;
}
} else { // Unix socket
if (iface.path.empty()) {
std::cerr << "Configuration error: Unix socket path cannot be empty"
<< std::endl;
valid = false;
}
if (iface.path.length() > 107) { // UNIX_PATH_MAX is typically 108
std::cerr << "Configuration error: Unix socket path too long (max 107 "
"chars), got "
<< iface.path.length() << " chars" << std::endl;
valid = false;
}
}
}
if (config.server.max_request_size_bytes == 0) {

View File

@@ -3,19 +3,40 @@
#include <chrono>
#include <optional>
#include <string>
#include <vector>
namespace weaseldb {
/**
* @brief Configuration for a single network interface to listen on.
*/
struct ListenInterface {
enum class Type { TCP, Unix };
Type type;
/// For TCP: IP address to bind to (e.g., "127.0.0.1", "0.0.0.0")
std::string address;
/// For TCP: port number
int port = 0;
/// For Unix: socket file path
std::string path;
// Factory methods for cleaner config creation
static ListenInterface tcp(const std::string &addr, int port_num) {
return {Type::TCP, addr, port_num, ""};
}
static ListenInterface unix_socket(const std::string &socket_path) {
return {Type::Unix, "", 0, socket_path};
}
};
/**
* @brief Configuration settings for the WeaselDB server component.
*/
struct ServerConfig {
/// IP address to bind the server to (default: localhost)
std::string bind_address = "127.0.0.1";
/// TCP port number for the server to listen on
int port = 8080;
/// Unix socket path (if specified, takes precedence over TCP)
std::string unix_socket_path;
/// Network interfaces to listen on (TCP and/or Unix sockets)
std::vector<ListenInterface> interfaces;
/// Maximum size in bytes for incoming HTTP requests (default: 1MB)
int64_t max_request_size_bytes = 1024 * 1024;
/// Number of I/O threads for handling connections and network events

View File

@@ -209,7 +209,7 @@ struct Connection {
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
* ```
*/
int64_t outgoingBytesQueued() const {
int64_t outgoing_bytes_queued() const {
#ifndef NDEBUG
// Debug build: validate counter accuracy
int64_t computed_total = 0;

View File

@@ -79,6 +79,8 @@
* - **Simple concatenation**: Basic string + number + string combinations
* - **Compile-time optimization**: When all types/values known at compile time
* - **Template contexts**: Where compile-time buffer sizing is beneficial
* - **IMPORTANT**: Only works with compile-time string literals, NOT runtime
* const char*
*
* ## Optimization Details:
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
@@ -206,10 +208,12 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
* optimized term writers for maximum speed.
*
* ## Supported Types:
* - **String literals**: C-style string literals and arrays
* - **String literals**: C-style string literals and arrays ("Hello", "World")
* - **Integers**: All integral types (int, int64_t, uint32_t, etc.)
* - **Floating point**: double (uses high-precision Grisu2 algorithm)
* - **Custom types**: Via specialization of `detail::term()`
* - **NOT supported**: const char* variables, std::string, std::string_view
* variables
*
* ## Performance Characteristics:
* - **Compile-time buffer sizing**: Buffer size calculated at compile time (no
@@ -245,16 +249,23 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
*
* ## When to Use:
* - **Hot paths**: Performance-critical code where formatting speed matters
* - **Known types**: When argument types are known at compile time
* - **Compile-time string literals**: All string arguments must be string
* literals (e.g., "Hello")
* - **Simple formatting**: Concatenation and basic type conversion
* - **Template code**: Where compile-time optimization is beneficial
* - **CANNOT use runtime strings**: No const char*, std::string, or string_view
* variables
*
* ## When to Use format() Instead:
* - **Printf-style formatting**: When you need format specifiers like "%d",
* "%.2f"
* - **Runtime flexibility**: When format strings come from variables/config
* - **Complex formatting**: When you need padding, precision, etc.
* - **Convenience**: For quick debugging or non-critical paths
* - **Runtime strings**: When you have const char*, std::string, or string_view
* variables
* - **Dynamic content**: When format strings come from variables/config/user
* input
* - **Complex formatting**: When you need padding, precision, width specifiers
* - **Mixed literal/runtime**: When combining string literals with runtime
* string data
*
* @note All arguments are passed by forwarding reference for optimal
* performance

View File

@@ -1,19 +1,37 @@
#include "http_handler.hpp"
#include <atomic>
#include <cstring>
#include <string>
#include <strings.h>
#include "arena_allocator.hpp"
#include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
auto requests_counter_family = metric::create_counter(
"weaseldb_http_requests_total", "Total http requests");
thread_local auto metrics_counter =
requests_counter_family.create({{"path", "/metrics"}});
// API endpoint request counters
thread_local auto commit_counter =
requests_counter_family.create({{"path", "/v1/commit"}});
thread_local auto status_counter =
requests_counter_family.create({{"path", "/v1/status"}});
thread_local auto version_counter =
requests_counter_family.create({{"path", "/v1/version"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
"Memory used by banned request IDs arena")
.create({});
// HttpConnectionState implementation
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
@@ -61,16 +79,46 @@ void HttpHandler::on_write_buffer_drained(
void HttpHandler::on_batch_complete(
std::span<std::unique_ptr<Connection>> batch) {
int readyCount = 0;
for (int i = 0; i < int(batch.size()); ++i) {
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
// Collect commit requests and status requests for pipeline processing
int pipeline_count = 0;
// Count both commit and status requests
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Count commit requests that passed basic validation
if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
pipeline_count++;
}
// Count status requests
else if (state->route == HttpRoute::GET_status &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
}
}
// Send requests to 4-stage pipeline in batch
if (pipeline_count > 0) {
auto guard = commitPipeline.push(pipeline_count, true);
auto out_iter = guard.batch.begin();
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ = CommitEntry{std::move(conn)};
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GET_status) {
*out_iter++ = StatusEntry{std::move(conn)};
}
if (readyCount > 0) {
auto guard = pipeline.push(readyCount, /*block=*/true);
auto outIter = guard.batch.begin();
for (int i = 0; i < int(batch.size()); ++i) {
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
*outIter++ = std::move(batch[i]);
}
}
}
@@ -80,7 +128,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) {
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
if (!state) {
sendErrorResponse(*conn_ptr, 500, "Internal server error", true);
send_error_response(*conn_ptr, 500, "Internal server error", true);
return;
}
@@ -94,7 +142,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) {
sendErrorResponse(*conn_ptr, 400, "Bad request", true);
send_error_response(*conn_ptr, 400, "Bad request", true);
return;
}
@@ -181,56 +229,160 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
// Route handlers (basic implementations)
void HttpHandler::handleGetVersion(Connection &conn,
const HttpConnectionState &state) {
sendJsonResponse(
version_counter.inc();
send_json_response(
conn, 200,
R"({"version":"0.0.1","leader":"node-1","committed_version":42})",
format(conn.get_arena(), R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)),
state.connection_close);
}
void HttpHandler::handlePostCommit(Connection &conn,
const HttpConnectionState &state) {
// TODO: Parse commit request from state.body and process
sendJsonResponse(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})",
state.connection_close);
commit_counter.inc();
// Check if streaming parse was successful
if (!state.commit_request || !state.parsing_commit) {
const char *error = state.commit_parser
? state.commit_parser->get_parse_error()
: "No parser initialized";
ArenaAllocator &arena = conn.get_arena();
std::string_view error_msg =
format(arena, "Parse failed: %s", error ? error : "Unknown error");
send_error_response(conn, 400, error_msg, state.connection_close);
return;
}
const CommitRequest &commit_request = *state.commit_request;
// Perform basic validation that doesn't require serialization (done on I/O
// threads)
bool valid = true;
std::string_view error_msg;
// Check that we have at least one operation
if (commit_request.operations().empty()) {
valid = false;
error_msg = "Commit request must contain at least one operation";
}
// Check leader_id is not empty
if (valid && commit_request.leader_id().empty()) {
valid = false;
error_msg = "Commit request must specify a leader_id";
}
// Check operations are well-formed
if (valid) {
for (const auto &op : commit_request.operations()) {
if (op.param1.empty()) {
valid = false;
error_msg = "Operation key cannot be empty";
break;
}
if (op.type == Operation::Type::Write && op.param2.empty()) {
valid = false;
error_msg = "Write operation value cannot be empty";
break;
}
}
}
if (!valid) {
send_error_response(conn, 400, error_msg, state.connection_close);
return;
}
// Basic validation passed - mark for 4-stage pipeline processing
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
// Response will be sent after 4-stage pipeline processing is complete
}
void HttpHandler::handleGetSubscribe(Connection &conn,
const HttpConnectionState &state) {
// TODO: Implement subscription streaming
sendJsonResponse(
send_json_response(
conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
state.connection_close);
}
void HttpHandler::handleGetStatus(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract request_id from URL and check status
sendJsonResponse(
conn, 200,
R"({"request_id":"example","status":"committed","version":43})",
HttpConnectionState &state) {
status_counter.inc();
// Status requests are processed through the pipeline
// Response will be generated in the sequence stage
// This handler extracts request_id from query parameters and prepares for
// pipeline processing
// Extract request_id from query parameters:
// /v1/status?request_id=<ID>&min_version=<VERSION>
std::string_view url = state.url;
// Find query parameters
size_t query_pos = url.find('?');
if (query_pos == std::string_view::npos) {
// No query parameters
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
std::string_view query_string = url.substr(query_pos + 1);
// Simple query parameter parsing for request_id
// Look for "request_id=" in the query string
size_t request_id_pos = query_string.find("request_id=");
if (request_id_pos == std::string_view::npos) {
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
// Extract the request_id value
size_t value_start = request_id_pos + 11; // length of "request_id="
if (value_start >= query_string.length()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Find the end of the request_id value (next & or end of string)
size_t value_end = query_string.find('&', value_start);
if (value_end == std::string_view::npos) {
value_end = query_string.length();
}
state.status_request_id =
query_string.substr(value_start, value_end - value_start);
if (state.status_request_id.empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Ready for pipeline processing
}
void HttpHandler::handlePutRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Parse retention policy from body and store
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})",
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
state.connection_close);
}
void HttpHandler::handleGetRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract policy_id from URL or return all policies
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close);
send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
}
void HttpHandler::handleDeleteRetention(Connection &conn,
const HttpConnectionState &state) {
// TODO: Extract policy_id from URL and delete
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})",
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
state.connection_close);
}
@@ -255,16 +407,16 @@ void HttpHandler::handleGetMetrics(Connection &conn,
arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
"Connection: close\r\n", "\r\n");
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: close\r\n", "\r\n");
conn.close_after_send();
} else {
headers = static_format(
arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
"Connection: keep-alive\r\n", "\r\n");
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"\r\n", "Connection: keep-alive\r\n", "\r\n");
}
// Send headers
@@ -278,91 +430,81 @@ void HttpHandler::handleGetMetrics(Connection &conn,
void HttpHandler::handleGetOk(Connection &conn,
const HttpConnectionState &state) {
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id));
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
}
void HttpHandler::handleNotFound(Connection &conn,
const HttpConnectionState &state) {
sendErrorResponse(conn, 404, "Not found", state.connection_close);
send_error_response(conn, 404, "Not found", state.connection_close);
}
// HTTP utility methods
void HttpHandler::sendResponse(Connection &conn, int status_code,
std::string_view content_type,
std::string_view body, bool close_connection) {
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
// Build HTTP response using arena
std::string response;
response.reserve(256 + body.size());
response += "HTTP/1.1 ";
response += std::to_string(status_code);
response += " ";
// Status text
switch (status_code) {
case 200:
response += "OK";
break;
case 400:
response += "Bad Request";
break;
case 404:
response += "Not Found";
break;
case 500:
response += "Internal Server Error";
break;
default:
response += "Unknown";
break;
}
ArenaAllocator &arena = conn.get_arena();
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
response += "\r\n";
response += "Content-Type: ";
response += content_type;
response += "\r\n";
response += "Content-Length: ";
response += std::to_string(body.size());
response += "\r\n";
response += "X-Response-ID: ";
response += std::to_string(state->request_id);
response += "\r\n";
if (close_connection) {
response += "Connection: close\r\n";
conn.close_after_send(); // Signal connection should be closed after sending
} else {
response += "Connection: keep-alive\r\n";
// Status text
std::string_view status_text;
switch (status_code) {
case 200:
status_text = "OK";
break;
case 400:
status_text = "Bad Request";
break;
case 404:
status_text = "Not Found";
break;
case 500:
status_text = "Internal Server Error";
break;
default:
status_text = "Unknown";
break;
}
response += "\r\n";
response += body;
const char *connection_header = close_connection ? "close" : "keep-alive";
std::string_view response =
format(arena,
"HTTP/1.1 %d %.*s\r\n"
"Content-Type: %.*s\r\n"
"Content-Length: %zu\r\n"
"X-Response-ID: %ld\r\n"
"Connection: %s\r\n"
"\r\n%.*s",
status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), state->http_request_id,
connection_header, static_cast<int>(body.size()), body.data());
if (close_connection) {
conn.close_after_send();
}
conn.append_message(response);
}
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection) {
sendResponse(conn, status_code, "application/json", json, close_connection);
}
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
void HttpHandler::send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection) {
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
ArenaAllocator &arena = conn.get_arena();
std::string json = R"({"error":")";
json += message;
json += R"("})";
std::string_view json =
format(arena, R"({"error":"%.*s"})", static_cast<int>(message.size()),
message.data());
sendJsonResponse(conn, status_code, json, close_connection);
send_json_response(conn, status_code, json, close_connection);
}
// llhttp callbacks
@@ -423,7 +565,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
id = id * 10 + (c - '0');
}
}
state->request_id = id;
state->http_request_id = id;
}
// Clear buffers for next header
@@ -443,19 +585,330 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
state->method = std::string_view(method_str);
// Check if this looks like a POST to /v1/commit to initialize streaming
// parser
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
// Initialize streaming commit request parsing
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
state->commit_request = std::make_unique<CommitRequest>();
state->parsing_commit =
state->commit_parser->begin_streaming_parse(*state->commit_request);
if (!state->parsing_commit) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
[[maybe_unused]] auto *state =
static_cast<HttpConnectionState *>(parser->data);
(void)at;
(void)length;
auto *state = static_cast<HttpConnectionState *>(parser->data);
if (state->parsing_commit && state->commit_parser) {
// Stream data to commit request parser
auto status =
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
state->message_complete = true;
if (state->parsing_commit && state->commit_parser) {
// Finish streaming parse
auto status = state->commit_parser->finish_streaming_parse();
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0;
}
// Pipeline stage implementations (batch-based)
bool HttpHandler::process_sequence_batch(BatchType &batch) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
// - Request ID banned list management
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Should not happen - basic validation was done on I/O thread
send_error_response(*commit_entry.connection, 500,
"Internal server error", true);
return false;
}
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
if (state->commit_request &&
state->commit_request->request_id().has_value()) {
auto commit_request_id =
state->commit_request->request_id().value();
if (banned_request_ids.find(commit_request_id) !=
banned_request_ids.end()) {
// Request ID is banned, this commit should fail
send_json_response(
*commit_entry.connection, 409,
R"({"status": "not_committed", "error": "request_id_banned"})",
state->connection_close);
return false;
}
}
// Assign sequential version number
commit_entry.assigned_version = next_version++;
TRACE_EVENT("http", "sequence_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version
// upper bound
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state && !state->status_request_id.empty()) {
// Add request_id to banned list - store the string in arena and
// use string_view
char *arena_chars = banned_request_arena.allocate<char>(
state->status_request_id.size());
std::memcpy(arena_chars, state->status_request_id.data(),
state->status_request_id.size());
std::string_view request_id_view(arena_chars,
state->status_request_id.size());
banned_request_ids.insert(request_id_view);
// Update memory usage metric
banned_request_ids_memory_gauge.set(
banned_request_arena.total_allocated());
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version - 1;
}
TRACE_EVENT("http", "sequence_status",
perfetto::Flow::Global(state->http_request_id));
// TODO: Transfer to status threadpool - for now just respond
// not_committed
send_json_response(*status_entry.connection, 200,
R"({"status": "not_committed"})",
state->connection_close);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Stage 1: Precondition resolution
// This stage must be serialized to maintain consistent database state view
// - Validate preconditions against current database state
// - Check for conflicts with other transactions
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Skip processing for failed sequence stage
return false;
}
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
TRACE_EVENT("http", "resolve_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_persist_batch(BatchType &batch) {
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
// Skip if resolve failed or connection is in error state
if (!state || !state->commit_request ||
!commit_entry.resolve_success) {
return false;
}
// Mark as persisted and update committed version high water mark
commit_entry.persist_success = true;
committed_version.store(commit_entry.assigned_version,
std::memory_order_seq_cst);
TRACE_EVENT("http", "persist_commit",
perfetto::Flow::Global(state->http_request_id));
const CommitRequest &commit_request = *state->commit_request;
ArenaAllocator &arena = commit_entry.connection->get_arena();
std::string_view response;
// Generate success response with actual assigned version
if (commit_request.request_id().has_value()) {
response = format(
arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response = format(
arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
send_json_response(*commit_entry.connection, 200, response,
state->connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
// Return connections to server for response transmission
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_commit",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(commit_entry.connection));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_status",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(status_entry.connection));
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}

View File

@@ -1,18 +1,26 @@
#pragma once
#include <atomic>
#include <memory>
#include <string_view>
#include <thread>
#include <unordered_set>
#include <llhttp.h>
#include "arena_allocator.hpp"
#include "connection.hpp"
#include "connection_handler.hpp"
#include "loop_iterations.hpp"
#include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
#include "thread_pipeline.hpp"
// Forward declarations
struct CommitRequest;
struct JsonCommitRequestParser;
/**
* HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing.
@@ -48,13 +56,25 @@ struct HttpConnectionState {
bool connection_close = false; // Client requested connection close
HttpRoute route = HttpRoute::NotFound;
// Status request data
std::string_view
status_request_id; // Request ID extracted from /v1/status/{id} URL
// Header accumulation buffers (arena-allocated)
using ArenaString =
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
ArenaString current_header_field_buf;
ArenaString current_header_value_buf;
bool header_field_complete = false;
int64_t request_id = 0; // X-Request-Id header value
int64_t http_request_id =
0; // X-Request-Id header value (for tracing/logging)
// Streaming parser for POST requests
std::unique_ptr<JsonCommitRequestParser> commit_parser;
std::unique_ptr<CommitRequest> commit_request;
bool parsing_commit = false;
bool basic_validation_passed =
false; // Set to true if basic validation passes
explicit HttpConnectionState(ArenaAllocator &arena);
};
@@ -64,70 +84,66 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing.
*/
struct HttpHandler : ConnectionHandler {
HttpHandler() {
finalStageThreads.emplace_back([this]() {
pthread_setname_np(pthread_self(), "stage-1-0");
HttpHandler()
: banned_request_ids(
ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
auto guard = pipeline.acquire<1, 0>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
auto guard = commitPipeline.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}
});
finalStageThreads.emplace_back([this]() {
pthread_setname_np(pthread_self(), "stage-1-1");
}};
// Stage 1: Precondition resolution thread
resolveThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) {
auto guard = pipeline.acquire<1, 1>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
auto guard = commitPipeline.acquire<1, 0>();
if (process_resolve_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}
});
stage0Thread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "stage-0");
int nulls = 0;
}};
// Stage 2: Transaction persistence thread
persistThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-persist");
for (;;) {
auto guard = pipeline.acquire<0, 0>(1);
for (auto &c : guard.batch) {
nulls += !c;
if (nulls == 2) {
return;
auto guard = commitPipeline.acquire<2, 0>();
if (process_persist_batch(guard.batch)) {
return; // Shutdown signal received
}
for (volatile int i = 0; i < loop_iterations; i = i + 1)
;
}
}};
// Stage 3: Connection return to server thread
releaseThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release");
for (;;) {
auto guard = commitPipeline.acquire<3, 0>();
if (process_release_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
}
~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{
auto guard = pipeline.push(2, true);
for (auto &c : guard.batch) {
c = {};
}
}
stage0Thread.join();
for (auto &thread : finalStageThreads) {
thread.join();
auto guard = commitPipeline.push(1, true);
guard.batch[0] =
ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
}
void on_connection_established(Connection &conn) override;
@@ -153,17 +169,49 @@ struct HttpHandler : ConnectionHandler {
private:
static constexpr int lg_size = 16;
StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
pipeline{lg_size};
std::thread stage0Thread;
std::vector<std::thread> finalStageThreads;
// Pipeline state (sequence thread only)
int64_t next_version = 1; // Next version to assign (sequence thread only)
// Pipeline state (persist thread writes, I/O threads read)
std::atomic<int64_t> committed_version{
0}; // Highest committed version (persist thread writes, I/O threads read)
// Arena for banned request IDs and related data structures (sequence thread
// only)
ArenaAllocator banned_request_arena;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
// (string_views into arena)
// Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
1>
commitPipeline{lg_size};
// Pipeline stage threads
std::thread sequenceThread;
std::thread resolveThread;
std::thread persistThread;
std::thread releaseThread;
// Pipeline stage processing methods (batch-based)
using BatchType =
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
1, 1, 1>::Batch;
bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch);
// Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn,
@@ -176,10 +224,10 @@ private:
static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body,
bool close_connection = false);
static void sendJsonResponse(Connection &conn, int status_code,
static void send_json_response(Connection &conn, int status_code,
std::string_view json,
bool close_connection = false);
static void sendErrorResponse(Connection &conn, int status_code,
static void send_error_response(Connection &conn, int status_code,
std::string_view message,
bool close_connection = false);
};

View File

@@ -29,12 +29,7 @@ void signal_handler(int sig) {
}
}
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::vector<int> listen_fds;
// Check if unix socket path is specified
if (!config.server.unix_socket_path.empty()) {
// Create unix socket
int create_unix_socket(const std::string &path) {
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1) {
perror("socket");
@@ -42,19 +37,18 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
}
// Remove existing socket file if it exists
unlink(config.server.unix_socket_path.c_str());
unlink(path.c_str());
struct sockaddr_un addr;
std::memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
std::fprintf(stderr, "Unix socket path too long\n");
if (path.length() >= sizeof(addr.sun_path)) {
std::fprintf(stderr, "Unix socket path too long: %s\n", path.c_str());
std::abort();
}
std::strncpy(addr.sun_path, config.server.unix_socket_path.c_str(),
sizeof(addr.sun_path) - 1);
std::strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind");
@@ -66,11 +60,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::abort();
}
listen_fds.push_back(sfd);
return listen_fds;
}
return sfd;
}
// TCP socket creation
int create_tcp_socket(const std::string &address, int port) {
struct addrinfo hints;
struct addrinfo *result, *rp;
int s;
@@ -84,8 +77,8 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
s = getaddrinfo(config.server.bind_address.c_str(),
std::to_string(config.server.port).c_str(), &hints, &result);
s = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints,
&result);
if (s != 0) {
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
std::abort();
@@ -94,18 +87,13 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
int sfd = -1;
for (rp = result; rp != nullptr; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) {
if (sfd == -1)
continue;
}
int val = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
perror("setsockopt SO_REUSEADDR");
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (SO_REUSEADDR failed)");
std::abort();
}
close(sfd);
continue;
}
@@ -113,40 +101,56 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
perror("setsockopt TCP_NODELAY");
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (TCP_NODELAY failed)");
std::abort();
}
close(sfd);
continue;
}
}
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
close(sfd);
freeaddrinfo(result);
std::abort();
}
break; /* Success */
}
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (bind failed)");
std::abort();
}
close(sfd);
sfd = -1;
}
freeaddrinfo(result);
if (rp == nullptr || sfd == -1) {
std::fprintf(stderr, "Could not bind to any address\n");
if (sfd == -1) {
std::fprintf(stderr, "Could not bind to %s:%d\n", address.c_str(), port);
std::abort();
}
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
return sfd;
}
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::vector<int> listen_fds;
for (const auto &iface : config.server.interfaces) {
int fd;
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
fd = create_tcp_socket(iface.address, iface.port);
std::cout << "Listening on TCP " << iface.address << ":" << iface.port
<< std::endl;
} else {
fd = create_unix_socket(iface.path);
std::cout << "Listening on Unix socket " << iface.path << std::endl;
}
listen_fds.push_back(fd);
}
if (listen_fds.empty()) {
std::fprintf(stderr, "No interfaces configured\n");
std::abort();
}
listen_fds.push_back(sfd);
return listen_fds;
}
@@ -218,13 +222,13 @@ int main(int argc, char *argv[]) {
}
std::cout << "Configuration loaded successfully:" << std::endl;
if (!config->server.unix_socket_path.empty()) {
std::cout << "Unix socket path: " << config->server.unix_socket_path
<< std::endl;
std::cout << "Interfaces: " << config->server.interfaces.size() << std::endl;
for (const auto &iface : config->server.interfaces) {
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
std::cout << " TCP: " << iface.address << ":" << iface.port << std::endl;
} else {
std::cout << "Server bind address: " << config->server.bind_address
<< std::endl;
std::cout << "Server port: " << config->server.port << std::endl;
std::cout << " Unix socket: " << iface.path << std::endl;
}
}
std::cout << "Max request size: " << config->server.max_request_size_bytes
<< " bytes" << std::endl;

47
src/pipeline_entry.hpp Normal file
View File

@@ -0,0 +1,47 @@
#pragma once
#include "connection.hpp"
#include <memory>
#include <variant>
/**
* Pipeline entry for commit requests that need full 4-stage processing.
* Contains connection with parsed CommitRequest.
*/
struct CommitEntry {
std::unique_ptr<Connection> connection;
int64_t assigned_version = 0; // Set by sequence stage
bool resolve_success = false; // Set by resolve stage
bool persist_success = false; // Set by persist stage
CommitEntry() = default; // Default constructor for variant
explicit CommitEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for status requests that need sequence stage processing
* then transfer to status threadpool.
*/
struct StatusEntry {
std::unique_ptr<Connection> connection;
int64_t version_upper_bound = 0; // Set by sequence stage
StatusEntry() = default; // Default constructor for variant
explicit StatusEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup.
*/
struct ShutdownEntry {
// Empty struct - presence indicates shutdown
};
/**
* Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately.
*/
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;

View File

@@ -98,9 +98,11 @@ Server::~Server() {
}
}
// Clean up unix socket file if it exists
if (!config_.server.unix_socket_path.empty()) {
unlink(config_.server.unix_socket_path.c_str());
// Clean up unix socket files if they exist
for (const auto &iface : config_.server.interfaces) {
if (iface.type == weaseldb::ListenInterface::Type::Unix) {
unlink(iface.path.c_str());
}
}
}

View File

@@ -102,6 +102,27 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
- **String views** with `std::string_view` to minimize unnecessary copying
- **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc)
### String Formatting
- **Always use `format.hpp` functions** - formats directly into arena-allocated memory
- **Use `static_format()` for performance-sensitive code** - faster but less flexible than `format()`
- **Use `format()` function with arena allocator** for printf-style formatting
```cpp
// Most performance-sensitive - compile-time optimized concatenation
std::string_view response = static_format(arena,
"HTTP/1.1 ", status_code, " OK\r\n",
"Content-Length: ", body.size(), "\r\n",
"\r\n", body);
// Printf-style formatting - runtime flexible
ArenaAllocator& arena = conn.get_arena();
std::string_view response = format(arena,
"HTTP/1.1 %d OK\r\n"
"Content-Length: %zu\r\n"
"\r\n%.*s",
status_code, body.size(),
static_cast<int>(body.size()), body.data());
```
### Complexity Control
- **Encapsulation is the main tool for controlling complexity**
- **Header files define the interface** - they are the contract with users of your code

View File

@@ -1,9 +1,11 @@
# WeaselDB Configuration File
[server]
unix_socket_path = "weaseldb.sock"
bind_address = "127.0.0.1"
port = 8080
# Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing
interfaces = [
{ type = "tcp", address = "127.0.0.1", port = 8080 },
{ type = "unix", path = "weaseldb.sock" }
]
# Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events