Compare commits

...

7 Commits

Author SHA1 Message Date
55069c0c79 Add counters for /v1/{commit,status,version} 2025-09-04 15:49:54 -04:00
96aae52853 Basic implementation of /commit, /version, and /status
No precondition checking, persistence, or log scanning yet.
2025-09-04 15:40:17 -04:00
8b6736127a Add commit pipeline design 2025-09-04 13:40:03 -04:00
9272048108 Outline commit pipeline 2025-09-03 23:43:03 -04:00
b2ffe3bfab Refactor to use format for http responses 2025-09-03 22:45:59 -04:00
978861c430 Parse commit request 2025-09-03 21:53:04 -04:00
46edb7cd26 Allow listening on multiple interfaces 2025-09-03 16:09:16 -04:00
14 changed files with 1435 additions and 277 deletions

View File

@@ -156,13 +156,25 @@ add_executable(
test_http_handler test_http_handler
tests/test_http_handler.cpp tests/test_http_handler.cpp
src/http_handler.cpp src/http_handler.cpp
src/server.cpp
src/config.cpp
src/json_commit_request_parser.cpp
src/arena_allocator.cpp src/arena_allocator.cpp
src/format.cpp src/format.cpp
src/connection.cpp src/connection.cpp
src/connection_registry.cpp src/connection_registry.cpp
src/metric.cpp) src/metric.cpp
target_link_libraries(test_http_handler doctest::doctest llhttp_static ${CMAKE_BINARY_DIR}/json_tokens.cpp)
Threads::Threads perfetto simdutf::simdutf) add_dependencies(test_http_handler generate_json_tokens)
target_link_libraries(
test_http_handler
doctest::doctest
llhttp_static
Threads::Threads
toml11::toml11
perfetto
simdutf::simdutf
weaseljson)
target_include_directories(test_http_handler PRIVATE src) target_include_directories(test_http_handler PRIVATE src)
target_compile_definitions(test_http_handler target_compile_definitions(test_http_handler
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN) PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
@@ -177,6 +189,7 @@ add_executable(
src/arena_allocator.cpp src/arena_allocator.cpp
src/config.cpp src/config.cpp
src/http_handler.cpp src/http_handler.cpp
src/json_commit_request_parser.cpp
src/format.cpp src/format.cpp
src/metric.cpp src/metric.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp) ${CMAKE_BINARY_DIR}/json_tokens.cpp)

499
commit_pipeline.md Normal file
View File

@@ -0,0 +1,499 @@
# Commit Processing Pipeline
## Overview
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
## Architecture
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
```
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
```
### Pipeline Flow
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
2. **Sequence Stage**: Assign sequential version numbers to commits
3. **Resolve Stage**: Validate preconditions and check for conflicts
4. **Persist Stage**: Write commits to durable storage and notify subscribers
5. **Release Stage**: Return connections to HTTP I/O threads for response handling
## Stage Details
### Stage 0: Sequence Assignment
**Thread**: `txn-sequence`
**Purpose**: Version assignment and request ID management
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
- Record version assignments for transaction tracking
**Why Serialization is Required**:
- Version numbers must be strictly sequential without gaps
- Banned list updates must be atomic with version assignment
- Status requests must get accurate upper bound on potential commit versions
**Request ID Banned List**:
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
- Lifecycle: Grows indefinitely until process restart (leader change)
- Removal: Only on process restart/leader change, which invalidates all old request IDs
**Current Implementation**:
```cpp
bool HttpHandler::process_sequence_batch(BatchType &batch) {
for (auto &entry : batch) {
if (std::holds_alternative<ShutdownEntry>(entry)) {
return true; // Shutdown signal
}
// TODO: Pattern match on CommitEntry vs StatusEntry
// TODO: Implement sequence assignment logic for each type
}
return false; // Continue processing
}
```
### Stage 1: Precondition Resolution
**Thread**: `txn-resolve`
**Purpose**: Validate preconditions and detect conflicts
**Serialization**: **Required** - Must be single-threaded
**Responsibilities**:
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Mark failed commits with failure information (including which preconditions failed)
**Why Serialization is Required**:
- Must maintain consistent view of in-memory recent writes set
- Conflict detection requires atomic evaluation of all preconditions against recent writes
- Recent writes set updates must be synchronized
**Transaction State Transitions**:
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
- Failed transactions continue through pipeline with failure information for client response
**Current Implementation**:
```cpp
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// TODO: Implement precondition resolution logic:
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
// 3. If failed: Mark with failure info (which preconditions failed)
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 2: Transaction Persistence
**Thread**: `txn-persist`
**Purpose**: Write semi-committed transactions to durable storage
**Serialization**: **Required** - Must mark batches durable in order
**Responsibilities**:
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
- Generate durability events for `/v1/subscribe` when committed version advances
- Batch multiple commits for efficient persistence operations
**Why Serialization is Required**:
- Batches must be marked durable in sequential version order
- High water mark updates must reflect strict ordering of committed versions
- Ensures consistency guarantees across all endpoints
**Committed Version High Water Mark**:
- Global atomic value tracking highest durably committed version
- Updated after each batch commits: set to highest version in the batch
- Read by `/v1/version` endpoint using atomic seq_cst reads
- Enables `/v1/subscribe` durability events when high water mark advances
**Batching Strategy**:
- Multiple semi-committed transactions can be persisted in a single batch
- High water mark updated once per batch to highest version in that batch
- See `persistence.md` for detailed persistence design
**Current Implementation**:
```cpp
bool HttpHandler::process_persist_batch(BatchType &batch) {
// TODO: Implement actual persistence logic:
// 1. For CommitEntry: Apply operations to persistent storage
// 2. Update committed version high water mark to highest version in batch
// 3. Generate durability events for /v1/subscribe
// 4. For StatusEntry: N/A (already transferred to status threadpool)
}
```
### Stage 3: Connection Release
**Thread**: `txn-release`
**Purpose**: Return connections to HTTP server for client response
**Serialization**: Not required - Independent connection handling
**Responsibilities**:
- Return processed connections to HTTP server for all request types
- Connection carries response data (success/failure) and status information
- Trigger response transmission to clients
**Response Handling**:
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
- Failed transactions carry failure information through entire pipeline for proper client response
**Implementation**:
```cpp
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
for (auto &conn : batch) {
if (!conn) {
return true; // Shutdown signal
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(conn));
}
return false; // Continue processing
}
```
## Threading Model
### Thread Pipeline Configuration
```cpp
// 4-stage pipeline: sequence -> resolve -> persist -> release
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
// Pipeline entry type (to be implemented)
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
```
### Thread Creation and Management
```cpp
HttpHandler() {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) {
auto guard = commitPipeline.acquire<0, 0>();
if (process_sequence_batch(guard.batch)) {
return; // Shutdown signal received
}
}
}};
// Similar pattern for resolve, persist, and release threads...
}
```
### Batch Processing
Each stage processes connections in batches using RAII guards:
```cpp
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
// Process batch
for (auto &conn : guard.batch) {
// Stage-specific processing
}
// Guard destructor automatically publishes batch to next stage
```
## Flow Control
### Pipeline Entry
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
```cpp
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
// Collect commit requests that passed basic validation for 4-stage pipeline processing
int commit_count = 0;
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
if (state->route == HttpRoute::POST_commit &&
state->commit_request &&
state->parsing_commit) {
commit_count++;
}
}
}
// Send commit requests to 4-stage pipeline in batch
if (commit_count > 0) {
auto guard = commitPipeline.push(commit_count, true);
// Move qualifying connections into pipeline
}
}
```
### Backpressure Handling
The pipeline implements natural backpressure:
- Each stage blocks if downstream stages are full
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
### Shutdown Coordination
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
```cpp
~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{
auto guard = commitPipeline.push(1, true);
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
}
```
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
## Error Handling
### Stage-Level Error Handling
Each stage handles different entry types:
```cpp
// Pattern matching on pipeline entry variant
std::visit([&](auto&& entry) {
using T = std::decay_t<decltype(entry)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Handle status entry (or skip if transferred)
}
}, pipeline_entry);
```
### Connection Error States
- Failed CommitEntries are passed through the pipeline with error information
- Downstream stages skip processing for error connections but forward them
- Error responses are sent when connection reaches release stage
- Connection ownership is always transferred to ensure cleanup
### Pipeline Integrity
- ShutdownEntry signals shutdown to all stages
- Each stage checks for ShutdownEntry and returns true to signal shutdown
- RAII guards ensure entries are always published downstream
- No entries are lost even during error conditions
## Performance Characteristics
### Throughput Optimization
- **Batching**: Multiple connections processed per stage activation
- **Lock-Free Communication**: Ring buffer between stages
- **Minimal Context Switching**: Dedicated threads per stage
- **Arena Allocation**: Efficient memory management throughout pipeline
### Latency Optimization
- **Single-Pass Processing**: Each connection flows through all stages once
- **Streaming Design**: Stages process concurrently
- **Minimal Copying**: Connection ownership transfer, not data copying
- **Direct Response**: Release stage triggers immediate response transmission
### Scalability Characteristics
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
- **NUMA Awareness**: Can pin threads to specific CPU cores
## Configuration
### Pipeline Parameters
```cpp
private:
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
// 4-stage pipeline configuration
StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
commitPipeline{lg_size};
```
### Tuning Considerations
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
## Pipeline Entry Types
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
### Pipeline Entry Variants
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
- **ShutdownEntry**: Signals pipeline shutdown to all stages
- **Future types**: Pipeline design supports additional entry types
### Stage Processing by Type
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|-------|-------------|-------------|---------------|---------------|
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
## API Endpoint Integration
### `/v1/commit` - Transaction Submission
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
#### Request Processing Flow
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
```cpp
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
// Parse and validate anything that doesn't need serialization:
// - JSON parsing and CommitRequest construction
// - Basic validation: leader_id check, operation format validation
// - Check that we have at least one operation
// If validation fails, send error response immediately and return
// If validation succeeds, connection will enter pipeline in on_batch_complete()
}
```
2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
3. **Pipeline Processing**:
- **Sequence**: Check banned list → assign version (or reject)
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
- **Persist**: Apply operations → mark committed, update high water mark
- **Release**: Return connection with response data
4. **Response Generation**: Based on pipeline results
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
### `/v1/status` - Commit Status Lookup
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
#### Request Processing Flow
1. **HTTP I/O Thread Processing**:
```cpp
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
// TODO: Extract request_id from URL and min_version from query params
// Current: Returns placeholder static response
}
```
2. **Two-Phase Processing**:
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
### `/v1/subscribe` - Real-time Transaction Stream
**Pipeline Integration**: Consumes events from resolve and persist stages
#### Event Sources
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
- **Persist Stage**: Durability events when committed version high water mark advances
#### Current Implementation
```cpp
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
// TODO: Parse query parameters (after, durable)
// TODO: Establish Server-Sent Events stream
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
}
```
### `/v1/version` - Version Information
**Pipeline Integration**: Direct atomic read, no pipeline interaction
```cpp
// TODO: Implement direct atomic read of committed version high water mark
// No pipeline interaction needed - seq_cst atomic read
// Leader ID is process-lifetime constant
```
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
## Integration Points
### HTTP Handler Integration
The pipeline integrates with the HTTP handler at two points:
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
2. **Exit**: Release stage calls `Server::release_back_to_server()`
### Persistence Layer Integration
The persist stage interfaces with:
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
- **Subscriber System**: Real-time change stream notifications
- **Metrics System**: Transaction throughput and latency tracking
### Database State Integration
- **Sequence Stage**: Updates version number generator
- **Resolve Stage**: Queries current database state for precondition validation
- **Persist Stage**: Applies mutations to authoritative database state
## Future Optimizations
### Potential Enhancements
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
3. **Batch Size Tuning**: Dynamic batch size based on load
4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
### Monitoring and Observability
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
2. **Error Tracking**: Error rates and types by stage
3. **Resource Utilization**: CPU and memory usage per pipeline thread
4. **Flow Control Events**: Backpressure and stall detection
## Implementation Status
### Current State
- ✅ Pipeline structure implemented with 4 stages
- ✅ Thread creation and management
- ✅ RAII batch processing
- ✅ Error handling framework
- ✅ Shutdown coordination
### TODO Items
- ⏳ Sequence assignment logic implementation
- ⏳ Precondition resolution implementation
- ⏳ S3 persistence batching implementation
- ⏳ Subscriber notification system
- ⏳ Performance monitoring and metrics
- ⏳ Configuration tuning and optimization

View File

@@ -1,8 +1,10 @@
# WeaselDB Configuration File # WeaselDB Configuration File
[server] [server]
bind_address = "127.0.0.1" # Network interfaces to listen on - production config with just TCP
port = 8080 interfaces = [
{ type = "tcp", address = "127.0.0.1", port = 8080 }
]
# Maximum request size in bytes (for 413 Content Too Large responses) # Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events # Number of I/O threads for handling connections and network events

View File

@@ -79,9 +79,31 @@ void ConfigParser::parse_section(const auto &toml_data,
void ConfigParser::parse_server_config(const auto &toml_data, void ConfigParser::parse_server_config(const auto &toml_data,
ServerConfig &config) { ServerConfig &config) {
parse_section(toml_data, "server", [&](const auto &srv) { parse_section(toml_data, "server", [&](const auto &srv) {
parse_field(srv, "bind_address", config.bind_address); // Parse interfaces array
parse_field(srv, "port", config.port); if (srv.contains("interfaces")) {
parse_field(srv, "unix_socket_path", config.unix_socket_path); auto interfaces = srv.at("interfaces");
if (interfaces.is_array()) {
for (const auto &iface : interfaces.as_array()) {
if (iface.contains("type")) {
std::string type = iface.at("type").as_string();
if (type == "tcp") {
std::string address = iface.at("address").as_string();
int port = iface.at("port").as_integer();
config.interfaces.push_back(ListenInterface::tcp(address, port));
} else if (type == "unix") {
std::string path = iface.at("path").as_string();
config.interfaces.push_back(ListenInterface::unix_socket(path));
}
}
}
}
}
// If no interfaces configured, use default TCP interface
if (config.interfaces.empty()) {
config.interfaces.push_back(ListenInterface::tcp("127.0.0.1", 8080));
}
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes); parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
parse_field(srv, "io_threads", config.io_threads); parse_field(srv, "io_threads", config.io_threads);
@@ -127,25 +149,38 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
bool ConfigParser::validate_config(const Config &config) { bool ConfigParser::validate_config(const Config &config) {
bool valid = true; bool valid = true;
// Validate server configuration // Validate server interfaces
if (config.server.unix_socket_path.empty()) { if (config.server.interfaces.empty()) {
// TCP mode validation std::cerr << "Configuration error: no interfaces configured" << std::endl;
if (config.server.port <= 0 || config.server.port > 65535) {
std::cerr << "Configuration error: server.port must be between 1 and "
"65535, got "
<< config.server.port << std::endl;
valid = false; valid = false;
} }
} else {
// Unix socket mode validation for (const auto &iface : config.server.interfaces) {
if (config.server.unix_socket_path.length() > if (iface.type == ListenInterface::Type::TCP) {
107) { // UNIX_PATH_MAX is typically 108 if (iface.port <= 0 || iface.port > 65535) {
std::cerr << "Configuration error: unix_socket_path too long (max 107 " std::cerr << "Configuration error: TCP port must be between 1 and "
"chars), got " "65535, got "
<< config.server.unix_socket_path.length() << " chars" << iface.port << std::endl;
valid = false;
}
if (iface.address.empty()) {
std::cerr << "Configuration error: TCP address cannot be empty"
<< std::endl; << std::endl;
valid = false; valid = false;
} }
} else { // Unix socket
if (iface.path.empty()) {
std::cerr << "Configuration error: Unix socket path cannot be empty"
<< std::endl;
valid = false;
}
if (iface.path.length() > 107) { // UNIX_PATH_MAX is typically 108
std::cerr << "Configuration error: Unix socket path too long (max 107 "
"chars), got "
<< iface.path.length() << " chars" << std::endl;
valid = false;
}
}
} }
if (config.server.max_request_size_bytes == 0) { if (config.server.max_request_size_bytes == 0) {

View File

@@ -3,19 +3,40 @@
#include <chrono> #include <chrono>
#include <optional> #include <optional>
#include <string> #include <string>
#include <vector>
namespace weaseldb { namespace weaseldb {
/**
* @brief Configuration for a single network interface to listen on.
*/
struct ListenInterface {
enum class Type { TCP, Unix };
Type type;
/// For TCP: IP address to bind to (e.g., "127.0.0.1", "0.0.0.0")
std::string address;
/// For TCP: port number
int port = 0;
/// For Unix: socket file path
std::string path;
// Factory methods for cleaner config creation
static ListenInterface tcp(const std::string &addr, int port_num) {
return {Type::TCP, addr, port_num, ""};
}
static ListenInterface unix_socket(const std::string &socket_path) {
return {Type::Unix, "", 0, socket_path};
}
};
/** /**
* @brief Configuration settings for the WeaselDB server component. * @brief Configuration settings for the WeaselDB server component.
*/ */
struct ServerConfig { struct ServerConfig {
/// IP address to bind the server to (default: localhost) /// Network interfaces to listen on (TCP and/or Unix sockets)
std::string bind_address = "127.0.0.1"; std::vector<ListenInterface> interfaces;
/// TCP port number for the server to listen on
int port = 8080;
/// Unix socket path (if specified, takes precedence over TCP)
std::string unix_socket_path;
/// Maximum size in bytes for incoming HTTP requests (default: 1MB) /// Maximum size in bytes for incoming HTTP requests (default: 1MB)
int64_t max_request_size_bytes = 1024 * 1024; int64_t max_request_size_bytes = 1024 * 1024;
/// Number of I/O threads for handling connections and network events /// Number of I/O threads for handling connections and network events

View File

@@ -209,7 +209,7 @@ struct Connection {
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued()); * metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
* ``` * ```
*/ */
int64_t outgoingBytesQueued() const { int64_t outgoing_bytes_queued() const {
#ifndef NDEBUG #ifndef NDEBUG
// Debug build: validate counter accuracy // Debug build: validate counter accuracy
int64_t computed_total = 0; int64_t computed_total = 0;

View File

@@ -79,6 +79,8 @@
* - **Simple concatenation**: Basic string + number + string combinations * - **Simple concatenation**: Basic string + number + string combinations
* - **Compile-time optimization**: When all types/values known at compile time * - **Compile-time optimization**: When all types/values known at compile time
* - **Template contexts**: Where compile-time buffer sizing is beneficial * - **Template contexts**: Where compile-time buffer sizing is beneficial
* - **IMPORTANT**: Only works with compile-time string literals, NOT runtime
* const char*
* *
* ## Optimization Details: * ## Optimization Details:
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all * The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
@@ -206,10 +208,12 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
* optimized term writers for maximum speed. * optimized term writers for maximum speed.
* *
* ## Supported Types: * ## Supported Types:
* - **String literals**: C-style string literals and arrays * - **String literals**: C-style string literals and arrays ("Hello", "World")
* - **Integers**: All integral types (int, int64_t, uint32_t, etc.) * - **Integers**: All integral types (int, int64_t, uint32_t, etc.)
* - **Floating point**: double (uses high-precision Grisu2 algorithm) * - **Floating point**: double (uses high-precision Grisu2 algorithm)
* - **Custom types**: Via specialization of `detail::term()` * - **Custom types**: Via specialization of `detail::term()`
* - **NOT supported**: const char* variables, std::string, std::string_view
* variables
* *
* ## Performance Characteristics: * ## Performance Characteristics:
* - **Compile-time buffer sizing**: Buffer size calculated at compile time (no * - **Compile-time buffer sizing**: Buffer size calculated at compile time (no
@@ -245,16 +249,23 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
* *
* ## When to Use: * ## When to Use:
* - **Hot paths**: Performance-critical code where formatting speed matters * - **Hot paths**: Performance-critical code where formatting speed matters
* - **Known types**: When argument types are known at compile time * - **Compile-time string literals**: All string arguments must be string
* literals (e.g., "Hello")
* - **Simple formatting**: Concatenation and basic type conversion * - **Simple formatting**: Concatenation and basic type conversion
* - **Template code**: Where compile-time optimization is beneficial * - **Template code**: Where compile-time optimization is beneficial
* - **CANNOT use runtime strings**: No const char*, std::string, or string_view
* variables
* *
* ## When to Use format() Instead: * ## When to Use format() Instead:
* - **Printf-style formatting**: When you need format specifiers like "%d", * - **Printf-style formatting**: When you need format specifiers like "%d",
* "%.2f" * "%.2f"
* - **Runtime flexibility**: When format strings come from variables/config * - **Runtime strings**: When you have const char*, std::string, or string_view
* - **Complex formatting**: When you need padding, precision, etc. * variables
* - **Convenience**: For quick debugging or non-critical paths * - **Dynamic content**: When format strings come from variables/config/user
* input
* - **Complex formatting**: When you need padding, precision, width specifiers
* - **Mixed literal/runtime**: When combining string literals with runtime
* string data
* *
* @note All arguments are passed by forwarding reference for optimal * @note All arguments are passed by forwarding reference for optimal
* performance * performance

View File

@@ -1,19 +1,37 @@
#include "http_handler.hpp" #include "http_handler.hpp"
#include <atomic>
#include <cstring> #include <cstring>
#include <string> #include <string>
#include <strings.h> #include <strings.h>
#include "arena_allocator.hpp" #include "arena_allocator.hpp"
#include "format.hpp" #include "format.hpp"
#include "json_commit_request_parser.hpp"
#include "metric.hpp" #include "metric.hpp"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp"
auto requests_counter_family = metric::create_counter( auto requests_counter_family = metric::create_counter(
"weaseldb_http_requests_total", "Total http requests"); "weaseldb_http_requests_total", "Total http requests");
thread_local auto metrics_counter = thread_local auto metrics_counter =
requests_counter_family.create({{"path", "/metrics"}}); requests_counter_family.create({{"path", "/metrics"}});
// API endpoint request counters
thread_local auto commit_counter =
requests_counter_family.create({{"path", "/v1/commit"}});
thread_local auto status_counter =
requests_counter_family.create({{"path", "/v1/status"}});
thread_local auto version_counter =
requests_counter_family.create({{"path", "/v1/version"}});
// Metric for banned request IDs memory usage
auto banned_request_ids_memory_gauge =
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
"Memory used by banned request IDs arena")
.create({});
// HttpConnectionState implementation // HttpConnectionState implementation
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena) HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
: current_header_field_buf(ArenaStlAllocator<char>(&arena)), : current_header_field_buf(ArenaStlAllocator<char>(&arena)),
@@ -61,16 +79,46 @@ void HttpHandler::on_write_buffer_drained(
void HttpHandler::on_batch_complete( void HttpHandler::on_batch_complete(
std::span<std::unique_ptr<Connection>> batch) { std::span<std::unique_ptr<Connection>> batch) {
int readyCount = 0; // Collect commit requests and status requests for pipeline processing
for (int i = 0; i < int(batch.size()); ++i) { int pipeline_count = 0;
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
// Count both commit and status requests
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Count commit requests that passed basic validation
if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
pipeline_count++;
}
// Count status requests
else if (state->route == HttpRoute::GET_status &&
// Error message not already queued
conn->outgoing_bytes_queued() == 0) {
pipeline_count++;
}
}
}
// Send requests to 4-stage pipeline in batch
if (pipeline_count > 0) {
auto guard = commitPipeline.push(pipeline_count, true);
auto out_iter = guard.batch.begin();
for (auto &conn : batch) {
if (conn && conn->user_data) {
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
// Create CommitEntry for commit requests
if (state->route == HttpRoute::POST_commit && state->commit_request &&
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ = CommitEntry{std::move(conn)};
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GET_status) {
*out_iter++ = StatusEntry{std::move(conn)};
} }
if (readyCount > 0) {
auto guard = pipeline.push(readyCount, /*block=*/true);
auto outIter = guard.batch.begin();
for (int i = 0; i < int(batch.size()); ++i) {
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
*outIter++ = std::move(batch[i]);
} }
} }
} }
@@ -80,7 +128,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) { std::unique_ptr<Connection> &conn_ptr) {
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data); auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
if (!state) { if (!state) {
sendErrorResponse(*conn_ptr, 500, "Internal server error", true); send_error_response(*conn_ptr, 500, "Internal server error", true);
return; return;
} }
@@ -94,7 +142,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
llhttp_execute(&state->parser, data.data(), data.size()); llhttp_execute(&state->parser, data.data(), data.size());
if (err != HPE_OK) { if (err != HPE_OK) {
sendErrorResponse(*conn_ptr, 400, "Bad request", true); send_error_response(*conn_ptr, 400, "Bad request", true);
return; return;
} }
@@ -181,56 +229,160 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
// Route handlers (basic implementations) // Route handlers (basic implementations)
void HttpHandler::handleGetVersion(Connection &conn, void HttpHandler::handleGetVersion(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
sendJsonResponse( version_counter.inc();
send_json_response(
conn, 200, conn, 200,
R"({"version":"0.0.1","leader":"node-1","committed_version":42})", format(conn.get_arena(), R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)),
state.connection_close); state.connection_close);
} }
void HttpHandler::handlePostCommit(Connection &conn, void HttpHandler::handlePostCommit(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
// TODO: Parse commit request from state.body and process commit_counter.inc();
sendJsonResponse( // Check if streaming parse was successful
conn, 200, if (!state.commit_request || !state.parsing_commit) {
R"({"request_id":"example","status":"committed","version":43})", const char *error = state.commit_parser
state.connection_close); ? state.commit_parser->get_parse_error()
: "No parser initialized";
ArenaAllocator &arena = conn.get_arena();
std::string_view error_msg =
format(arena, "Parse failed: %s", error ? error : "Unknown error");
send_error_response(conn, 400, error_msg, state.connection_close);
return;
}
const CommitRequest &commit_request = *state.commit_request;
// Perform basic validation that doesn't require serialization (done on I/O
// threads)
bool valid = true;
std::string_view error_msg;
// Check that we have at least one operation
if (commit_request.operations().empty()) {
valid = false;
error_msg = "Commit request must contain at least one operation";
}
// Check leader_id is not empty
if (valid && commit_request.leader_id().empty()) {
valid = false;
error_msg = "Commit request must specify a leader_id";
}
// Check operations are well-formed
if (valid) {
for (const auto &op : commit_request.operations()) {
if (op.param1.empty()) {
valid = false;
error_msg = "Operation key cannot be empty";
break;
}
if (op.type == Operation::Type::Write && op.param2.empty()) {
valid = false;
error_msg = "Write operation value cannot be empty";
break;
}
}
}
if (!valid) {
send_error_response(conn, 400, error_msg, state.connection_close);
return;
}
// Basic validation passed - mark for 4-stage pipeline processing
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
// Response will be sent after 4-stage pipeline processing is complete
} }
void HttpHandler::handleGetSubscribe(Connection &conn, void HttpHandler::handleGetSubscribe(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
// TODO: Implement subscription streaming // TODO: Implement subscription streaming
sendJsonResponse( send_json_response(
conn, 200, conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})", R"({"message":"Subscription endpoint - streaming not yet implemented"})",
state.connection_close); state.connection_close);
} }
void HttpHandler::handleGetStatus(Connection &conn, void HttpHandler::handleGetStatus(Connection &conn,
const HttpConnectionState &state) { HttpConnectionState &state) {
// TODO: Extract request_id from URL and check status status_counter.inc();
sendJsonResponse( // Status requests are processed through the pipeline
conn, 200, // Response will be generated in the sequence stage
R"({"request_id":"example","status":"committed","version":43})", // This handler extracts request_id from query parameters and prepares for
// pipeline processing
// Extract request_id from query parameters:
// /v1/status?request_id=<ID>&min_version=<VERSION>
std::string_view url = state.url;
// Find query parameters
size_t query_pos = url.find('?');
if (query_pos == std::string_view::npos) {
// No query parameters
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close); state.connection_close);
return;
}
std::string_view query_string = url.substr(query_pos + 1);
// Simple query parameter parsing for request_id
// Look for "request_id=" in the query string
size_t request_id_pos = query_string.find("request_id=");
if (request_id_pos == std::string_view::npos) {
send_error_response(conn, 400,
"Missing required query parameter: request_id",
state.connection_close);
return;
}
// Extract the request_id value
size_t value_start = request_id_pos + 11; // length of "request_id="
if (value_start >= query_string.length()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Find the end of the request_id value (next & or end of string)
size_t value_end = query_string.find('&', value_start);
if (value_end == std::string_view::npos) {
value_end = query_string.length();
}
state.status_request_id =
query_string.substr(value_start, value_end - value_start);
if (state.status_request_id.empty()) {
send_error_response(conn, 400, "Empty request_id parameter",
state.connection_close);
return;
}
// Ready for pipeline processing
} }
void HttpHandler::handlePutRetention(Connection &conn, void HttpHandler::handlePutRetention(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
// TODO: Parse retention policy from body and store // TODO: Parse retention policy from body and store
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
state.connection_close); state.connection_close);
} }
void HttpHandler::handleGetRetention(Connection &conn, void HttpHandler::handleGetRetention(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
// TODO: Extract policy_id from URL or return all policies // TODO: Extract policy_id from URL or return all policies
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close); send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
} }
void HttpHandler::handleDeleteRetention(Connection &conn, void HttpHandler::handleDeleteRetention(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
// TODO: Extract policy_id from URL and delete // TODO: Extract policy_id from URL and delete
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
state.connection_close); state.connection_close);
} }
@@ -255,16 +407,16 @@ void HttpHandler::handleGetMetrics(Connection &conn,
arena, "HTTP/1.1 200 OK\r\n", arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n", "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"Connection: close\r\n", "\r\n"); "\r\n", "Connection: close\r\n", "\r\n");
conn.close_after_send(); conn.close_after_send();
} else { } else {
headers = static_format( headers = static_format(
arena, "HTTP/1.1 200 OK\r\n", arena, "HTTP/1.1 200 OK\r\n",
"Content-Type: text/plain; version=0.0.4\r\n", "Content-Type: text/plain; version=0.0.4\r\n",
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n", "Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n", "X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
"Connection: keep-alive\r\n", "\r\n"); "\r\n", "Connection: keep-alive\r\n", "\r\n");
} }
// Send headers // Send headers
@@ -278,91 +430,81 @@ void HttpHandler::handleGetMetrics(Connection &conn,
void HttpHandler::handleGetOk(Connection &conn, void HttpHandler::handleGetOk(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id)); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
sendResponse(conn, 200, "text/plain", "OK", state.connection_close); sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
} }
void HttpHandler::handleNotFound(Connection &conn, void HttpHandler::handleNotFound(Connection &conn,
const HttpConnectionState &state) { const HttpConnectionState &state) {
sendErrorResponse(conn, 404, "Not found", state.connection_close); send_error_response(conn, 404, "Not found", state.connection_close);
} }
// HTTP utility methods // HTTP utility methods
void HttpHandler::sendResponse(Connection &conn, int status_code, void HttpHandler::sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view content_type,
std::string_view body, bool close_connection) { std::string_view body, bool close_connection) {
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena(); ArenaAllocator &arena = conn.get_arena();
// Build HTTP response using arena
std::string response;
response.reserve(256 + body.size());
response += "HTTP/1.1 ";
response += std::to_string(status_code);
response += " ";
// Status text
switch (status_code) {
case 200:
response += "OK";
break;
case 400:
response += "Bad Request";
break;
case 404:
response += "Not Found";
break;
case 500:
response += "Internal Server Error";
break;
default:
response += "Unknown";
break;
}
auto *state = static_cast<HttpConnectionState *>(conn.user_data); auto *state = static_cast<HttpConnectionState *>(conn.user_data);
response += "\r\n"; // Status text
response += "Content-Type: "; std::string_view status_text;
response += content_type; switch (status_code) {
response += "\r\n"; case 200:
response += "Content-Length: "; status_text = "OK";
response += std::to_string(body.size()); break;
response += "\r\n"; case 400:
response += "X-Response-ID: "; status_text = "Bad Request";
response += std::to_string(state->request_id); break;
response += "\r\n"; case 404:
status_text = "Not Found";
if (close_connection) { break;
response += "Connection: close\r\n"; case 500:
conn.close_after_send(); // Signal connection should be closed after sending status_text = "Internal Server Error";
} else { break;
response += "Connection: keep-alive\r\n"; default:
status_text = "Unknown";
break;
} }
response += "\r\n"; const char *connection_header = close_connection ? "close" : "keep-alive";
response += body;
std::string_view response =
format(arena,
"HTTP/1.1 %d %.*s\r\n"
"Content-Type: %.*s\r\n"
"Content-Length: %zu\r\n"
"X-Response-ID: %ld\r\n"
"Connection: %s\r\n"
"\r\n%.*s",
status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), state->http_request_id,
connection_header, static_cast<int>(body.size()), body.data());
if (close_connection) {
conn.close_after_send();
}
conn.append_message(response); conn.append_message(response);
} }
void HttpHandler::sendJsonResponse(Connection &conn, int status_code, void HttpHandler::send_json_response(Connection &conn, int status_code,
std::string_view json, std::string_view json,
bool close_connection) { bool close_connection) {
sendResponse(conn, status_code, "application/json", json, close_connection); sendResponse(conn, status_code, "application/json", json, close_connection);
} }
void HttpHandler::sendErrorResponse(Connection &conn, int status_code, void HttpHandler::send_error_response(Connection &conn, int status_code,
std::string_view message, std::string_view message,
bool close_connection) { bool close_connection) {
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena(); ArenaAllocator &arena = conn.get_arena();
std::string json = R"({"error":")"; std::string_view json =
json += message; format(arena, R"({"error":"%.*s"})", static_cast<int>(message.size()),
json += R"("})"; message.data());
sendJsonResponse(conn, status_code, json, close_connection); send_json_response(conn, status_code, json, close_connection);
} }
// llhttp callbacks // llhttp callbacks
@@ -423,7 +565,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
id = id * 10 + (c - '0'); id = id * 10 + (c - '0');
} }
} }
state->request_id = id; state->http_request_id = id;
} }
// Clear buffers for next header // Clear buffers for next header
@@ -443,19 +585,330 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
llhttp_method_name(static_cast<llhttp_method_t>(parser->method)); llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
state->method = std::string_view(method_str); state->method = std::string_view(method_str);
// Check if this looks like a POST to /v1/commit to initialize streaming
// parser
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
// Initialize streaming commit request parsing
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
state->commit_request = std::make_unique<CommitRequest>();
state->parsing_commit =
state->commit_parser->begin_streaming_parse(*state->commit_request);
if (!state->parsing_commit) {
return -1; // Signal parsing error to llhttp
}
}
return 0; return 0;
} }
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) { int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
[[maybe_unused]] auto *state = auto *state = static_cast<HttpConnectionState *>(parser->data);
static_cast<HttpConnectionState *>(parser->data);
(void)at; if (state->parsing_commit && state->commit_parser) {
(void)length; // Stream data to commit request parser
auto status =
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0; return 0;
} }
int HttpHandler::onMessageComplete(llhttp_t *parser) { int HttpHandler::onMessageComplete(llhttp_t *parser) {
auto *state = static_cast<HttpConnectionState *>(parser->data); auto *state = static_cast<HttpConnectionState *>(parser->data);
state->message_complete = true; state->message_complete = true;
if (state->parsing_commit && state->commit_parser) {
// Finish streaming parse
auto status = state->commit_parser->finish_streaming_parse();
if (status == CommitRequestParser::ParseStatus::Error) {
return -1; // Signal parsing error to llhttp
}
}
return 0; return 0;
} }
// Pipeline stage implementations (batch-based)
bool HttpHandler::process_sequence_batch(BatchType &batch) {
// Stage 0: Sequence assignment
// This stage performs ONLY work that requires serial processing:
// - Version/sequence number assignment (must be sequential)
// - Request ID banned list management
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: check banned list, assign version
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Should not happen - basic validation was done on I/O thread
send_error_response(*commit_entry.connection, 500,
"Internal server error", true);
return false;
}
// Check if request_id is banned (for status queries)
// Only check CommitRequest request_id, not HTTP header
if (state->commit_request &&
state->commit_request->request_id().has_value()) {
auto commit_request_id =
state->commit_request->request_id().value();
if (banned_request_ids.find(commit_request_id) !=
banned_request_ids.end()) {
// Request ID is banned, this commit should fail
send_json_response(
*commit_entry.connection, 409,
R"({"status": "not_committed", "error": "request_id_banned"})",
state->connection_close);
return false;
}
}
// Assign sequential version number
commit_entry.assigned_version = next_version++;
TRACE_EVENT("http", "sequence_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: add request_id to banned list, get version
// upper bound
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state && !state->status_request_id.empty()) {
// Add request_id to banned list - store the string in arena and
// use string_view
char *arena_chars = banned_request_arena.allocate<char>(
state->status_request_id.size());
std::memcpy(arena_chars, state->status_request_id.data(),
state->status_request_id.size());
std::string_view request_id_view(arena_chars,
state->status_request_id.size());
banned_request_ids.insert(request_id_view);
// Update memory usage metric
banned_request_ids_memory_gauge.set(
banned_request_arena.total_allocated());
// Set version upper bound to current highest assigned version
status_entry.version_upper_bound = next_version - 1;
}
TRACE_EVENT("http", "sequence_status",
perfetto::Flow::Global(state->http_request_id));
// TODO: Transfer to status threadpool - for now just respond
// not_committed
send_json_response(*status_entry.connection, 200,
R"({"status": "not_committed"})",
state->connection_close);
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_resolve_batch(BatchType &batch) {
// Stage 1: Precondition resolution
// This stage must be serialized to maintain consistent database state view
// - Validate preconditions against current database state
// - Check for conflicts with other transactions
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: accept all commits (simplified
// implementation)
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (!state || !state->commit_request) {
// Skip processing for failed sequence stage
return false;
}
// Accept all commits (simplified implementation)
commit_entry.resolve_success = true;
TRACE_EVENT("http", "resolve_commit",
perfetto::Flow::Global(state->http_request_id));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in resolve stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_persist_batch(BatchType &batch) {
// Stage 2: Transaction persistence
// Mark everything as durable immediately (simplified implementation)
// In real implementation: batch S3 writes, update subscribers, etc.
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: mark as durable, generate response
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
// Skip if resolve failed or connection is in error state
if (!state || !state->commit_request ||
!commit_entry.resolve_success) {
return false;
}
// Mark as persisted and update committed version high water mark
commit_entry.persist_success = true;
committed_version.store(commit_entry.assigned_version,
std::memory_order_seq_cst);
TRACE_EVENT("http", "persist_commit",
perfetto::Flow::Global(state->http_request_id));
const CommitRequest &commit_request = *state->commit_request;
ArenaAllocator &arena = commit_entry.connection->get_arena();
std::string_view response;
// Generate success response with actual assigned version
if (commit_request.request_id().has_value()) {
response = format(
arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response = format(
arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
send_json_response(*commit_entry.connection, 200, response,
state->connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Status entries are not processed in persist stage
// They were already handled in sequence stage
return false;
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}
bool HttpHandler::process_release_batch(BatchType &batch) {
// Stage 3: Connection release
// Return connections to server for response transmission
for (auto &entry : batch) {
// Pattern match on pipeline entry variant
bool should_shutdown = std::visit(
[&](auto &&e) -> bool {
using T = std::decay_t<decltype(e)>;
if constexpr (std::is_same_v<T, ShutdownEntry>) {
return true; // Signal shutdown
} else if constexpr (std::is_same_v<T, CommitEntry>) {
// Process commit entry: return connection to server
auto &commit_entry = e;
auto *state = static_cast<HttpConnectionState *>(
commit_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_commit",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(commit_entry.connection));
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
auto &status_entry = e;
auto *state = static_cast<HttpConnectionState *>(
status_entry.connection->user_data);
if (state) {
TRACE_EVENT("http", "release_status",
perfetto::Flow::Global(state->http_request_id));
}
// Return connection to server for further processing or cleanup
Server::release_back_to_server(std::move(status_entry.connection));
return false; // Continue processing
}
return false; // Unknown type, continue
},
entry);
if (should_shutdown) {
return true;
}
}
return false; // Continue processing
}

View File

@@ -1,18 +1,26 @@
#pragma once #pragma once
#include <atomic>
#include <memory> #include <memory>
#include <string_view> #include <string_view>
#include <thread> #include <thread>
#include <unordered_set>
#include <llhttp.h> #include <llhttp.h>
#include "arena_allocator.hpp"
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp" #include "connection_handler.hpp"
#include "loop_iterations.hpp" #include "loop_iterations.hpp"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "pipeline_entry.hpp"
#include "server.hpp" #include "server.hpp"
#include "thread_pipeline.hpp" #include "thread_pipeline.hpp"
// Forward declarations
struct CommitRequest;
struct JsonCommitRequestParser;
/** /**
* HTTP routes supported by WeaselDB server. * HTTP routes supported by WeaselDB server.
* Using enum for efficient switch-based routing. * Using enum for efficient switch-based routing.
@@ -48,13 +56,25 @@ struct HttpConnectionState {
bool connection_close = false; // Client requested connection close bool connection_close = false; // Client requested connection close
HttpRoute route = HttpRoute::NotFound; HttpRoute route = HttpRoute::NotFound;
// Status request data
std::string_view
status_request_id; // Request ID extracted from /v1/status/{id} URL
// Header accumulation buffers (arena-allocated) // Header accumulation buffers (arena-allocated)
using ArenaString = using ArenaString =
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>; std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
ArenaString current_header_field_buf; ArenaString current_header_field_buf;
ArenaString current_header_value_buf; ArenaString current_header_value_buf;
bool header_field_complete = false; bool header_field_complete = false;
int64_t request_id = 0; // X-Request-Id header value int64_t http_request_id =
0; // X-Request-Id header value (for tracing/logging)
// Streaming parser for POST requests
std::unique_ptr<JsonCommitRequestParser> commit_parser;
std::unique_ptr<CommitRequest> commit_request;
bool parsing_commit = false;
bool basic_validation_passed =
false; // Set to true if basic validation passes
explicit HttpConnectionState(ArenaAllocator &arena); explicit HttpConnectionState(ArenaAllocator &arena);
}; };
@@ -64,70 +84,66 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing. * Supports the WeaselDB REST API endpoints with enum-based routing.
*/ */
struct HttpHandler : ConnectionHandler { struct HttpHandler : ConnectionHandler {
HttpHandler() { HttpHandler()
finalStageThreads.emplace_back([this]() { : banned_request_ids(
pthread_setname_np(pthread_self(), "stage-1-0"); ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
// Stage 0: Sequence assignment thread
sequenceThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-sequence");
for (;;) { for (;;) {
auto guard = pipeline.acquire<1, 0>(); auto guard = commitPipeline.acquire<0, 0>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { if (process_sequence_batch(guard.batch)) {
if ((it.index() % 2) == 0) { // Thread 0 handles even indices return; // Shutdown signal received
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
} }
} }
} }};
});
finalStageThreads.emplace_back([this]() { // Stage 1: Precondition resolution thread
pthread_setname_np(pthread_self(), "stage-1-1"); resolveThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-resolve");
for (;;) { for (;;) {
auto guard = pipeline.acquire<1, 1>(); auto guard = commitPipeline.acquire<1, 0>();
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { if (process_resolve_batch(guard.batch)) {
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices return; // Shutdown signal received
auto &c = *it;
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "release",
perfetto::Flow::Global(state->request_id));
Server::release_back_to_server(std::move(c));
} }
} }
} }};
});
stage0Thread = std::thread{[this]() { // Stage 2: Transaction persistence thread
pthread_setname_np(pthread_self(), "stage-0"); persistThread = std::thread{[this]() {
int nulls = 0; pthread_setname_np(pthread_self(), "txn-persist");
for (;;) { for (;;) {
auto guard = pipeline.acquire<0, 0>(1); auto guard = commitPipeline.acquire<2, 0>();
for (auto &c : guard.batch) { if (process_persist_batch(guard.batch)) {
nulls += !c; return; // Shutdown signal received
if (nulls == 2) {
return;
} }
for (volatile int i = 0; i < loop_iterations; i = i + 1) }
; }};
// Stage 3: Connection return to server thread
releaseThread = std::thread{[this]() {
pthread_setname_np(pthread_self(), "txn-release");
for (;;) {
auto guard = commitPipeline.acquire<3, 0>();
if (process_release_batch(guard.batch)) {
return; // Shutdown signal received
} }
} }
}}; }};
} }
~HttpHandler() { ~HttpHandler() {
// Send single shutdown signal that flows through all pipeline stages
{ {
auto guard = pipeline.push(2, true); auto guard = commitPipeline.push(1, true);
for (auto &c : guard.batch) { guard.batch[0] =
c = {}; ShutdownEntry{}; // Single ShutdownEntry flows through all stages
}
}
stage0Thread.join();
for (auto &thread : finalStageThreads) {
thread.join();
} }
// Join all pipeline threads
sequenceThread.join();
resolveThread.join();
persistThread.join();
releaseThread.join();
} }
void on_connection_established(Connection &conn) override; void on_connection_established(Connection &conn) override;
@@ -153,17 +169,49 @@ struct HttpHandler : ConnectionHandler {
private: private:
static constexpr int lg_size = 16; static constexpr int lg_size = 16;
StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfUpstreamIdle, 1, 2> // Pipeline state (sequence thread only)
pipeline{lg_size}; int64_t next_version = 1; // Next version to assign (sequence thread only)
std::thread stage0Thread;
std::vector<std::thread> finalStageThreads; // Pipeline state (persist thread writes, I/O threads read)
std::atomic<int64_t> committed_version{
0}; // Highest committed version (persist thread writes, I/O threads read)
// Arena for banned request IDs and related data structures (sequence thread
// only)
ArenaAllocator banned_request_arena;
using BannedRequestIdSet =
std::unordered_set<std::string_view, std::hash<std::string_view>,
std::equal_to<std::string_view>,
ArenaStlAllocator<std::string_view>>;
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
// (string_views into arena)
// Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
1>
commitPipeline{lg_size};
// Pipeline stage threads
std::thread sequenceThread;
std::thread resolveThread;
std::thread persistThread;
std::thread releaseThread;
// Pipeline stage processing methods (batch-based)
using BatchType =
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
1, 1, 1>::Batch;
bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
bool process_release_batch(BatchType &batch);
// Route handlers // Route handlers
void handleGetVersion(Connection &conn, const HttpConnectionState &state); void handleGetVersion(Connection &conn, const HttpConnectionState &state);
void handlePostCommit(Connection &conn, const HttpConnectionState &state); void handlePostCommit(Connection &conn, const HttpConnectionState &state);
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state); void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
void handleGetStatus(Connection &conn, const HttpConnectionState &state); void handleGetStatus(Connection &conn, HttpConnectionState &state);
void handlePutRetention(Connection &conn, const HttpConnectionState &state); void handlePutRetention(Connection &conn, const HttpConnectionState &state);
void handleGetRetention(Connection &conn, const HttpConnectionState &state); void handleGetRetention(Connection &conn, const HttpConnectionState &state);
void handleDeleteRetention(Connection &conn, void handleDeleteRetention(Connection &conn,
@@ -176,10 +224,10 @@ private:
static void sendResponse(Connection &conn, int status_code, static void sendResponse(Connection &conn, int status_code,
std::string_view content_type, std::string_view body, std::string_view content_type, std::string_view body,
bool close_connection = false); bool close_connection = false);
static void sendJsonResponse(Connection &conn, int status_code, static void send_json_response(Connection &conn, int status_code,
std::string_view json, std::string_view json,
bool close_connection = false); bool close_connection = false);
static void sendErrorResponse(Connection &conn, int status_code, static void send_error_response(Connection &conn, int status_code,
std::string_view message, std::string_view message,
bool close_connection = false); bool close_connection = false);
}; };

View File

@@ -29,12 +29,7 @@ void signal_handler(int sig) {
} }
} }
std::vector<int> create_listen_sockets(const weaseldb::Config &config) { int create_unix_socket(const std::string &path) {
std::vector<int> listen_fds;
// Check if unix socket path is specified
if (!config.server.unix_socket_path.empty()) {
// Create unix socket
int sfd = socket(AF_UNIX, SOCK_STREAM, 0); int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1) { if (sfd == -1) {
perror("socket"); perror("socket");
@@ -42,19 +37,18 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
} }
// Remove existing socket file if it exists // Remove existing socket file if it exists
unlink(config.server.unix_socket_path.c_str()); unlink(path.c_str());
struct sockaddr_un addr; struct sockaddr_un addr;
std::memset(&addr, 0, sizeof(addr)); std::memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX; addr.sun_family = AF_UNIX;
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) { if (path.length() >= sizeof(addr.sun_path)) {
std::fprintf(stderr, "Unix socket path too long\n"); std::fprintf(stderr, "Unix socket path too long: %s\n", path.c_str());
std::abort(); std::abort();
} }
std::strncpy(addr.sun_path, config.server.unix_socket_path.c_str(), std::strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind"); perror("bind");
@@ -66,11 +60,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::abort(); std::abort();
} }
listen_fds.push_back(sfd); return sfd;
return listen_fds;
} }
// TCP socket creation int create_tcp_socket(const std::string &address, int port) {
struct addrinfo hints; struct addrinfo hints;
struct addrinfo *result, *rp; struct addrinfo *result, *rp;
int s; int s;
@@ -84,8 +77,8 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
hints.ai_addr = nullptr; hints.ai_addr = nullptr;
hints.ai_next = nullptr; hints.ai_next = nullptr;
s = getaddrinfo(config.server.bind_address.c_str(), s = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints,
std::to_string(config.server.port).c_str(), &hints, &result); &result);
if (s != 0) { if (s != 0) {
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
std::abort(); std::abort();
@@ -94,18 +87,13 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
int sfd = -1; int sfd = -1;
for (rp = result; rp != nullptr; rp = rp->ai_next) { for (rp = result; rp != nullptr; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) { if (sfd == -1)
continue; continue;
}
int val = 1; int val = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) { if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
perror("setsockopt SO_REUSEADDR"); perror("setsockopt SO_REUSEADDR");
int e = close(sfd); close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (SO_REUSEADDR failed)");
std::abort();
}
continue; continue;
} }
@@ -113,40 +101,56 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) { if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) { if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
perror("setsockopt TCP_NODELAY"); perror("setsockopt TCP_NODELAY");
int e = close(sfd); close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (TCP_NODELAY failed)");
std::abort();
}
continue; continue;
} }
} }
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
close(sfd);
freeaddrinfo(result);
std::abort();
}
break; /* Success */ break; /* Success */
} }
int e = close(sfd); close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (bind failed)");
std::abort();
}
sfd = -1; sfd = -1;
} }
freeaddrinfo(result); freeaddrinfo(result);
if (rp == nullptr || sfd == -1) { if (sfd == -1) {
std::fprintf(stderr, "Could not bind to any address\n"); std::fprintf(stderr, "Could not bind to %s:%d\n", address.c_str(), port);
std::abort(); std::abort();
} }
if (listen(sfd, SOMAXCONN) == -1) { return sfd;
perror("listen"); }
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::vector<int> listen_fds;
for (const auto &iface : config.server.interfaces) {
int fd;
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
fd = create_tcp_socket(iface.address, iface.port);
std::cout << "Listening on TCP " << iface.address << ":" << iface.port
<< std::endl;
} else {
fd = create_unix_socket(iface.path);
std::cout << "Listening on Unix socket " << iface.path << std::endl;
}
listen_fds.push_back(fd);
}
if (listen_fds.empty()) {
std::fprintf(stderr, "No interfaces configured\n");
std::abort(); std::abort();
} }
listen_fds.push_back(sfd);
return listen_fds; return listen_fds;
} }
@@ -218,13 +222,13 @@ int main(int argc, char *argv[]) {
} }
std::cout << "Configuration loaded successfully:" << std::endl; std::cout << "Configuration loaded successfully:" << std::endl;
if (!config->server.unix_socket_path.empty()) { std::cout << "Interfaces: " << config->server.interfaces.size() << std::endl;
std::cout << "Unix socket path: " << config->server.unix_socket_path for (const auto &iface : config->server.interfaces) {
<< std::endl; if (iface.type == weaseldb::ListenInterface::Type::TCP) {
std::cout << " TCP: " << iface.address << ":" << iface.port << std::endl;
} else { } else {
std::cout << "Server bind address: " << config->server.bind_address std::cout << " Unix socket: " << iface.path << std::endl;
<< std::endl; }
std::cout << "Server port: " << config->server.port << std::endl;
} }
std::cout << "Max request size: " << config->server.max_request_size_bytes std::cout << "Max request size: " << config->server.max_request_size_bytes
<< " bytes" << std::endl; << " bytes" << std::endl;

47
src/pipeline_entry.hpp Normal file
View File

@@ -0,0 +1,47 @@
#pragma once
#include "connection.hpp"
#include <memory>
#include <variant>
/**
* Pipeline entry for commit requests that need full 4-stage processing.
* Contains connection with parsed CommitRequest.
*/
struct CommitEntry {
std::unique_ptr<Connection> connection;
int64_t assigned_version = 0; // Set by sequence stage
bool resolve_success = false; // Set by resolve stage
bool persist_success = false; // Set by persist stage
CommitEntry() = default; // Default constructor for variant
explicit CommitEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for status requests that need sequence stage processing
* then transfer to status threadpool.
*/
struct StatusEntry {
std::unique_ptr<Connection> connection;
int64_t version_upper_bound = 0; // Set by sequence stage
StatusEntry() = default; // Default constructor for variant
explicit StatusEntry(std::unique_ptr<Connection> conn)
: connection(std::move(conn)) {}
};
/**
* Pipeline entry for coordinated shutdown of all stages.
* Flows through all stages to ensure proper cleanup.
*/
struct ShutdownEntry {
// Empty struct - presence indicates shutdown
};
/**
* Pipeline entry variant type used by the commit processing pipeline.
* Each stage pattern-matches on the variant type to handle appropriately.
*/
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;

View File

@@ -98,9 +98,11 @@ Server::~Server() {
} }
} }
// Clean up unix socket file if it exists // Clean up unix socket files if they exist
if (!config_.server.unix_socket_path.empty()) { for (const auto &iface : config_.server.interfaces) {
unlink(config_.server.unix_socket_path.c_str()); if (iface.type == weaseldb::ListenInterface::Type::Unix) {
unlink(iface.path.c_str());
}
} }
} }

View File

@@ -102,6 +102,27 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
- **String views** with `std::string_view` to minimize unnecessary copying - **String views** with `std::string_view` to minimize unnecessary copying
- **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc) - **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc)
### String Formatting
- **Always use `format.hpp` functions** - formats directly into arena-allocated memory
- **Use `static_format()` for performance-sensitive code** - faster but less flexible than `format()`
- **Use `format()` function with arena allocator** for printf-style formatting
```cpp
// Most performance-sensitive - compile-time optimized concatenation
std::string_view response = static_format(arena,
"HTTP/1.1 ", status_code, " OK\r\n",
"Content-Length: ", body.size(), "\r\n",
"\r\n", body);
// Printf-style formatting - runtime flexible
ArenaAllocator& arena = conn.get_arena();
std::string_view response = format(arena,
"HTTP/1.1 %d OK\r\n"
"Content-Length: %zu\r\n"
"\r\n%.*s",
status_code, body.size(),
static_cast<int>(body.size()), body.data());
```
### Complexity Control ### Complexity Control
- **Encapsulation is the main tool for controlling complexity** - **Encapsulation is the main tool for controlling complexity**
- **Header files define the interface** - they are the contract with users of your code - **Header files define the interface** - they are the contract with users of your code

View File

@@ -1,9 +1,11 @@
# WeaselDB Configuration File # WeaselDB Configuration File
[server] [server]
unix_socket_path = "weaseldb.sock" # Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing
bind_address = "127.0.0.1" interfaces = [
port = 8080 { type = "tcp", address = "127.0.0.1", port = 8080 },
{ type = "unix", path = "weaseldb.sock" }
]
# Maximum request size in bytes (for 413 Content Too Large responses) # Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events # Number of I/O threads for handling connections and network events