Compare commits
7 Commits
b8eb00e313
...
55069c0c79
| Author | SHA1 | Date | |
|---|---|---|---|
| 55069c0c79 | |||
| 96aae52853 | |||
| 8b6736127a | |||
| 9272048108 | |||
| b2ffe3bfab | |||
| 978861c430 | |||
| 46edb7cd26 |
@@ -156,13 +156,25 @@ add_executable(
|
||||
test_http_handler
|
||||
tests/test_http_handler.cpp
|
||||
src/http_handler.cpp
|
||||
src/server.cpp
|
||||
src/config.cpp
|
||||
src/json_commit_request_parser.cpp
|
||||
src/arena_allocator.cpp
|
||||
src/format.cpp
|
||||
src/connection.cpp
|
||||
src/connection_registry.cpp
|
||||
src/metric.cpp)
|
||||
target_link_libraries(test_http_handler doctest::doctest llhttp_static
|
||||
Threads::Threads perfetto simdutf::simdutf)
|
||||
src/metric.cpp
|
||||
${CMAKE_BINARY_DIR}/json_tokens.cpp)
|
||||
add_dependencies(test_http_handler generate_json_tokens)
|
||||
target_link_libraries(
|
||||
test_http_handler
|
||||
doctest::doctest
|
||||
llhttp_static
|
||||
Threads::Threads
|
||||
toml11::toml11
|
||||
perfetto
|
||||
simdutf::simdutf
|
||||
weaseljson)
|
||||
target_include_directories(test_http_handler PRIVATE src)
|
||||
target_compile_definitions(test_http_handler
|
||||
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
|
||||
@@ -177,6 +189,7 @@ add_executable(
|
||||
src/arena_allocator.cpp
|
||||
src/config.cpp
|
||||
src/http_handler.cpp
|
||||
src/json_commit_request_parser.cpp
|
||||
src/format.cpp
|
||||
src/metric.cpp
|
||||
${CMAKE_BINARY_DIR}/json_tokens.cpp)
|
||||
|
||||
499
commit_pipeline.md
Normal file
499
commit_pipeline.md
Normal file
@@ -0,0 +1,499 @@
|
||||
# Commit Processing Pipeline
|
||||
|
||||
## Overview
|
||||
|
||||
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
|
||||
|
||||
## Architecture
|
||||
|
||||
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
|
||||
|
||||
```
|
||||
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
|
||||
```
|
||||
|
||||
### Pipeline Flow
|
||||
|
||||
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
|
||||
2. **Sequence Stage**: Assign sequential version numbers to commits
|
||||
3. **Resolve Stage**: Validate preconditions and check for conflicts
|
||||
4. **Persist Stage**: Write commits to durable storage and notify subscribers
|
||||
5. **Release Stage**: Return connections to HTTP I/O threads for response handling
|
||||
|
||||
## Stage Details
|
||||
|
||||
### Stage 0: Sequence Assignment
|
||||
|
||||
**Thread**: `txn-sequence`
|
||||
**Purpose**: Version assignment and request ID management
|
||||
**Serialization**: **Required** - Must be single-threaded
|
||||
|
||||
**Responsibilities**:
|
||||
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
|
||||
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
|
||||
- Record version assignments for transaction tracking
|
||||
|
||||
**Why Serialization is Required**:
|
||||
- Version numbers must be strictly sequential without gaps
|
||||
- Banned list updates must be atomic with version assignment
|
||||
- Status requests must get accurate upper bound on potential commit versions
|
||||
|
||||
**Request ID Banned List**:
|
||||
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
|
||||
- Lifecycle: Grows indefinitely until process restart (leader change)
|
||||
- Removal: Only on process restart/leader change, which invalidates all old request IDs
|
||||
|
||||
**Current Implementation**:
|
||||
```cpp
|
||||
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
||||
for (auto &entry : batch) {
|
||||
if (std::holds_alternative<ShutdownEntry>(entry)) {
|
||||
return true; // Shutdown signal
|
||||
}
|
||||
// TODO: Pattern match on CommitEntry vs StatusEntry
|
||||
// TODO: Implement sequence assignment logic for each type
|
||||
}
|
||||
return false; // Continue processing
|
||||
}
|
||||
```
|
||||
|
||||
### Stage 1: Precondition Resolution
|
||||
|
||||
**Thread**: `txn-resolve`
|
||||
**Purpose**: Validate preconditions and detect conflicts
|
||||
**Serialization**: **Required** - Must be single-threaded
|
||||
|
||||
**Responsibilities**:
|
||||
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
|
||||
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
||||
- Mark failed commits with failure information (including which preconditions failed)
|
||||
|
||||
**Why Serialization is Required**:
|
||||
- Must maintain consistent view of in-memory recent writes set
|
||||
- Conflict detection requires atomic evaluation of all preconditions against recent writes
|
||||
- Recent writes set updates must be synchronized
|
||||
|
||||
**Transaction State Transitions**:
|
||||
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
|
||||
- Failed transactions continue through pipeline with failure information for client response
|
||||
|
||||
**Current Implementation**:
|
||||
```cpp
|
||||
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||
// TODO: Implement precondition resolution logic:
|
||||
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
|
||||
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
|
||||
// 3. If failed: Mark with failure info (which preconditions failed)
|
||||
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
||||
}
|
||||
```
|
||||
|
||||
### Stage 2: Transaction Persistence
|
||||
|
||||
**Thread**: `txn-persist`
|
||||
**Purpose**: Write semi-committed transactions to durable storage
|
||||
**Serialization**: **Required** - Must mark batches durable in order
|
||||
|
||||
**Responsibilities**:
|
||||
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
|
||||
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
||||
- Generate durability events for `/v1/subscribe` when committed version advances
|
||||
- Batch multiple commits for efficient persistence operations
|
||||
|
||||
**Why Serialization is Required**:
|
||||
- Batches must be marked durable in sequential version order
|
||||
- High water mark updates must reflect strict ordering of committed versions
|
||||
- Ensures consistency guarantees across all endpoints
|
||||
|
||||
**Committed Version High Water Mark**:
|
||||
- Global atomic value tracking highest durably committed version
|
||||
- Updated after each batch commits: set to highest version in the batch
|
||||
- Read by `/v1/version` endpoint using atomic seq_cst reads
|
||||
- Enables `/v1/subscribe` durability events when high water mark advances
|
||||
|
||||
**Batching Strategy**:
|
||||
- Multiple semi-committed transactions can be persisted in a single batch
|
||||
- High water mark updated once per batch to highest version in that batch
|
||||
- See `persistence.md` for detailed persistence design
|
||||
|
||||
**Current Implementation**:
|
||||
```cpp
|
||||
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||
// TODO: Implement actual persistence logic:
|
||||
// 1. For CommitEntry: Apply operations to persistent storage
|
||||
// 2. Update committed version high water mark to highest version in batch
|
||||
// 3. Generate durability events for /v1/subscribe
|
||||
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
||||
}
|
||||
```
|
||||
|
||||
### Stage 3: Connection Release
|
||||
|
||||
**Thread**: `txn-release`
|
||||
**Purpose**: Return connections to HTTP server for client response
|
||||
**Serialization**: Not required - Independent connection handling
|
||||
|
||||
**Responsibilities**:
|
||||
- Return processed connections to HTTP server for all request types
|
||||
- Connection carries response data (success/failure) and status information
|
||||
- Trigger response transmission to clients
|
||||
|
||||
**Response Handling**:
|
||||
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
|
||||
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
|
||||
- Failed transactions carry failure information through entire pipeline for proper client response
|
||||
|
||||
**Implementation**:
|
||||
```cpp
|
||||
bool HttpHandler::process_release_batch(BatchType &batch) {
|
||||
// Stage 3: Connection release
|
||||
for (auto &conn : batch) {
|
||||
if (!conn) {
|
||||
return true; // Shutdown signal
|
||||
}
|
||||
// Return connection to server for further processing or cleanup
|
||||
Server::release_back_to_server(std::move(conn));
|
||||
}
|
||||
return false; // Continue processing
|
||||
}
|
||||
```
|
||||
|
||||
## Threading Model
|
||||
|
||||
### Thread Pipeline Configuration
|
||||
|
||||
```cpp
|
||||
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
||||
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
|
||||
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
|
||||
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||
commitPipeline{lg_size};
|
||||
|
||||
// Pipeline entry type (to be implemented)
|
||||
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
||||
```
|
||||
|
||||
### Thread Creation and Management
|
||||
|
||||
```cpp
|
||||
HttpHandler() {
|
||||
// Stage 0: Sequence assignment thread
|
||||
sequenceThread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "txn-sequence");
|
||||
for (;;) {
|
||||
auto guard = commitPipeline.acquire<0, 0>();
|
||||
if (process_sequence_batch(guard.batch)) {
|
||||
return; // Shutdown signal received
|
||||
}
|
||||
}
|
||||
}};
|
||||
|
||||
// Similar pattern for resolve, persist, and release threads...
|
||||
}
|
||||
```
|
||||
|
||||
### Batch Processing
|
||||
|
||||
Each stage processes connections in batches using RAII guards:
|
||||
|
||||
```cpp
|
||||
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
|
||||
// Process batch
|
||||
for (auto &conn : guard.batch) {
|
||||
// Stage-specific processing
|
||||
}
|
||||
// Guard destructor automatically publishes batch to next stage
|
||||
```
|
||||
|
||||
## Flow Control
|
||||
|
||||
### Pipeline Entry
|
||||
|
||||
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
|
||||
|
||||
```cpp
|
||||
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
|
||||
// Collect commit requests that passed basic validation for 4-stage pipeline processing
|
||||
int commit_count = 0;
|
||||
for (auto &conn : batch) {
|
||||
if (conn && conn->user_data) {
|
||||
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||
if (state->route == HttpRoute::POST_commit &&
|
||||
state->commit_request &&
|
||||
state->parsing_commit) {
|
||||
commit_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Send commit requests to 4-stage pipeline in batch
|
||||
if (commit_count > 0) {
|
||||
auto guard = commitPipeline.push(commit_count, true);
|
||||
// Move qualifying connections into pipeline
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Backpressure Handling
|
||||
|
||||
The pipeline implements natural backpressure:
|
||||
- Each stage blocks if downstream stages are full
|
||||
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
|
||||
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
|
||||
|
||||
### Shutdown Coordination
|
||||
|
||||
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
|
||||
|
||||
```cpp
|
||||
~HttpHandler() {
|
||||
// Send single shutdown signal that flows through all pipeline stages
|
||||
{
|
||||
auto guard = commitPipeline.push(1, true);
|
||||
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
||||
}
|
||||
|
||||
// Join all pipeline threads
|
||||
sequenceThread.join();
|
||||
resolveThread.join();
|
||||
persistThread.join();
|
||||
releaseThread.join();
|
||||
}
|
||||
```
|
||||
|
||||
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
|
||||
|
||||
## Error Handling
|
||||
|
||||
### Stage-Level Error Handling
|
||||
|
||||
Each stage handles different entry types:
|
||||
|
||||
```cpp
|
||||
// Pattern matching on pipeline entry variant
|
||||
std::visit([&](auto&& entry) {
|
||||
using T = std::decay_t<decltype(entry)>;
|
||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||
return true; // Signal shutdown
|
||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||
// Process commit entry
|
||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||
// Handle status entry (or skip if transferred)
|
||||
}
|
||||
}, pipeline_entry);
|
||||
```
|
||||
|
||||
### Connection Error States
|
||||
|
||||
- Failed CommitEntries are passed through the pipeline with error information
|
||||
- Downstream stages skip processing for error connections but forward them
|
||||
- Error responses are sent when connection reaches release stage
|
||||
- Connection ownership is always transferred to ensure cleanup
|
||||
|
||||
### Pipeline Integrity
|
||||
|
||||
- ShutdownEntry signals shutdown to all stages
|
||||
- Each stage checks for ShutdownEntry and returns true to signal shutdown
|
||||
- RAII guards ensure entries are always published downstream
|
||||
- No entries are lost even during error conditions
|
||||
|
||||
## Performance Characteristics
|
||||
|
||||
### Throughput Optimization
|
||||
|
||||
- **Batching**: Multiple connections processed per stage activation
|
||||
- **Lock-Free Communication**: Ring buffer between stages
|
||||
- **Minimal Context Switching**: Dedicated threads per stage
|
||||
- **Arena Allocation**: Efficient memory management throughout pipeline
|
||||
|
||||
### Latency Optimization
|
||||
|
||||
- **Single-Pass Processing**: Each connection flows through all stages once
|
||||
- **Streaming Design**: Stages process concurrently
|
||||
- **Minimal Copying**: Connection ownership transfer, not data copying
|
||||
- **Direct Response**: Release stage triggers immediate response transmission
|
||||
|
||||
### Scalability Characteristics
|
||||
|
||||
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
|
||||
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
|
||||
- **NUMA Awareness**: Can pin threads to specific CPU cores
|
||||
|
||||
## Configuration
|
||||
|
||||
### Pipeline Parameters
|
||||
|
||||
```cpp
|
||||
private:
|
||||
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
|
||||
|
||||
// 4-stage pipeline configuration
|
||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||
commitPipeline{lg_size};
|
||||
```
|
||||
|
||||
### Tuning Considerations
|
||||
|
||||
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
|
||||
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
|
||||
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
|
||||
|
||||
## Pipeline Entry Types
|
||||
|
||||
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
|
||||
|
||||
### Pipeline Entry Variants
|
||||
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
|
||||
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
|
||||
- **ShutdownEntry**: Signals pipeline shutdown to all stages
|
||||
- **Future types**: Pipeline design supports additional entry types
|
||||
|
||||
### Stage Processing by Type
|
||||
|
||||
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|
||||
|-------|-------------|-------------|---------------|---------------|
|
||||
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
|
||||
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
|
||||
|
||||
## API Endpoint Integration
|
||||
|
||||
### `/v1/commit` - Transaction Submission
|
||||
|
||||
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
|
||||
|
||||
#### Request Processing Flow
|
||||
|
||||
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
|
||||
```cpp
|
||||
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
|
||||
// Parse and validate anything that doesn't need serialization:
|
||||
// - JSON parsing and CommitRequest construction
|
||||
// - Basic validation: leader_id check, operation format validation
|
||||
// - Check that we have at least one operation
|
||||
|
||||
// If validation fails, send error response immediately and return
|
||||
// If validation succeeds, connection will enter pipeline in on_batch_complete()
|
||||
}
|
||||
```
|
||||
|
||||
2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
|
||||
|
||||
3. **Pipeline Processing**:
|
||||
- **Sequence**: Check banned list → assign version (or reject)
|
||||
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
|
||||
- **Persist**: Apply operations → mark committed, update high water mark
|
||||
- **Release**: Return connection with response data
|
||||
|
||||
4. **Response Generation**: Based on pipeline results
|
||||
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
|
||||
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
|
||||
|
||||
### `/v1/status` - Commit Status Lookup
|
||||
|
||||
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
|
||||
|
||||
#### Request Processing Flow
|
||||
|
||||
1. **HTTP I/O Thread Processing**:
|
||||
```cpp
|
||||
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
|
||||
// TODO: Extract request_id from URL and min_version from query params
|
||||
// Current: Returns placeholder static response
|
||||
}
|
||||
```
|
||||
|
||||
2. **Two-Phase Processing**:
|
||||
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
|
||||
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
|
||||
|
||||
3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
|
||||
|
||||
### `/v1/subscribe` - Real-time Transaction Stream
|
||||
|
||||
**Pipeline Integration**: Consumes events from resolve and persist stages
|
||||
|
||||
#### Event Sources
|
||||
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
|
||||
- **Persist Stage**: Durability events when committed version high water mark advances
|
||||
|
||||
#### Current Implementation
|
||||
```cpp
|
||||
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
|
||||
// TODO: Parse query parameters (after, durable)
|
||||
// TODO: Establish Server-Sent Events stream
|
||||
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
|
||||
}
|
||||
```
|
||||
|
||||
### `/v1/version` - Version Information
|
||||
|
||||
**Pipeline Integration**: Direct atomic read, no pipeline interaction
|
||||
|
||||
```cpp
|
||||
// TODO: Implement direct atomic read of committed version high water mark
|
||||
// No pipeline interaction needed - seq_cst atomic read
|
||||
// Leader ID is process-lifetime constant
|
||||
```
|
||||
|
||||
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
|
||||
|
||||
## Integration Points
|
||||
|
||||
### HTTP Handler Integration
|
||||
|
||||
The pipeline integrates with the HTTP handler at two points:
|
||||
|
||||
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
|
||||
2. **Exit**: Release stage calls `Server::release_back_to_server()`
|
||||
|
||||
### Persistence Layer Integration
|
||||
|
||||
The persist stage interfaces with:
|
||||
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
|
||||
- **Subscriber System**: Real-time change stream notifications
|
||||
- **Metrics System**: Transaction throughput and latency tracking
|
||||
|
||||
### Database State Integration
|
||||
|
||||
- **Sequence Stage**: Updates version number generator
|
||||
- **Resolve Stage**: Queries current database state for precondition validation
|
||||
- **Persist Stage**: Applies mutations to authoritative database state
|
||||
|
||||
## Future Optimizations
|
||||
|
||||
### Potential Enhancements
|
||||
|
||||
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
|
||||
2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
|
||||
3. **Batch Size Tuning**: Dynamic batch size based on load
|
||||
4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
|
||||
5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
|
||||
|
||||
### Monitoring and Observability
|
||||
|
||||
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
|
||||
2. **Error Tracking**: Error rates and types by stage
|
||||
3. **Resource Utilization**: CPU and memory usage per pipeline thread
|
||||
4. **Flow Control Events**: Backpressure and stall detection
|
||||
|
||||
## Implementation Status
|
||||
|
||||
### Current State
|
||||
|
||||
- ✅ Pipeline structure implemented with 4 stages
|
||||
- ✅ Thread creation and management
|
||||
- ✅ RAII batch processing
|
||||
- ✅ Error handling framework
|
||||
- ✅ Shutdown coordination
|
||||
|
||||
### TODO Items
|
||||
|
||||
- ⏳ Sequence assignment logic implementation
|
||||
- ⏳ Precondition resolution implementation
|
||||
- ⏳ S3 persistence batching implementation
|
||||
- ⏳ Subscriber notification system
|
||||
- ⏳ Performance monitoring and metrics
|
||||
- ⏳ Configuration tuning and optimization
|
||||
@@ -1,8 +1,10 @@
|
||||
# WeaselDB Configuration File
|
||||
|
||||
[server]
|
||||
bind_address = "127.0.0.1"
|
||||
port = 8080
|
||||
# Network interfaces to listen on - production config with just TCP
|
||||
interfaces = [
|
||||
{ type = "tcp", address = "127.0.0.1", port = 8080 }
|
||||
]
|
||||
# Maximum request size in bytes (for 413 Content Too Large responses)
|
||||
max_request_size_bytes = 1048576 # 1MB
|
||||
# Number of I/O threads for handling connections and network events
|
||||
|
||||
@@ -79,9 +79,31 @@ void ConfigParser::parse_section(const auto &toml_data,
|
||||
void ConfigParser::parse_server_config(const auto &toml_data,
|
||||
ServerConfig &config) {
|
||||
parse_section(toml_data, "server", [&](const auto &srv) {
|
||||
parse_field(srv, "bind_address", config.bind_address);
|
||||
parse_field(srv, "port", config.port);
|
||||
parse_field(srv, "unix_socket_path", config.unix_socket_path);
|
||||
// Parse interfaces array
|
||||
if (srv.contains("interfaces")) {
|
||||
auto interfaces = srv.at("interfaces");
|
||||
if (interfaces.is_array()) {
|
||||
for (const auto &iface : interfaces.as_array()) {
|
||||
if (iface.contains("type")) {
|
||||
std::string type = iface.at("type").as_string();
|
||||
if (type == "tcp") {
|
||||
std::string address = iface.at("address").as_string();
|
||||
int port = iface.at("port").as_integer();
|
||||
config.interfaces.push_back(ListenInterface::tcp(address, port));
|
||||
} else if (type == "unix") {
|
||||
std::string path = iface.at("path").as_string();
|
||||
config.interfaces.push_back(ListenInterface::unix_socket(path));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no interfaces configured, use default TCP interface
|
||||
if (config.interfaces.empty()) {
|
||||
config.interfaces.push_back(ListenInterface::tcp("127.0.0.1", 8080));
|
||||
}
|
||||
|
||||
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
|
||||
parse_field(srv, "io_threads", config.io_threads);
|
||||
|
||||
@@ -127,24 +149,37 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
|
||||
bool ConfigParser::validate_config(const Config &config) {
|
||||
bool valid = true;
|
||||
|
||||
// Validate server configuration
|
||||
if (config.server.unix_socket_path.empty()) {
|
||||
// TCP mode validation
|
||||
if (config.server.port <= 0 || config.server.port > 65535) {
|
||||
std::cerr << "Configuration error: server.port must be between 1 and "
|
||||
"65535, got "
|
||||
<< config.server.port << std::endl;
|
||||
valid = false;
|
||||
}
|
||||
} else {
|
||||
// Unix socket mode validation
|
||||
if (config.server.unix_socket_path.length() >
|
||||
107) { // UNIX_PATH_MAX is typically 108
|
||||
std::cerr << "Configuration error: unix_socket_path too long (max 107 "
|
||||
"chars), got "
|
||||
<< config.server.unix_socket_path.length() << " chars"
|
||||
<< std::endl;
|
||||
valid = false;
|
||||
// Validate server interfaces
|
||||
if (config.server.interfaces.empty()) {
|
||||
std::cerr << "Configuration error: no interfaces configured" << std::endl;
|
||||
valid = false;
|
||||
}
|
||||
|
||||
for (const auto &iface : config.server.interfaces) {
|
||||
if (iface.type == ListenInterface::Type::TCP) {
|
||||
if (iface.port <= 0 || iface.port > 65535) {
|
||||
std::cerr << "Configuration error: TCP port must be between 1 and "
|
||||
"65535, got "
|
||||
<< iface.port << std::endl;
|
||||
valid = false;
|
||||
}
|
||||
if (iface.address.empty()) {
|
||||
std::cerr << "Configuration error: TCP address cannot be empty"
|
||||
<< std::endl;
|
||||
valid = false;
|
||||
}
|
||||
} else { // Unix socket
|
||||
if (iface.path.empty()) {
|
||||
std::cerr << "Configuration error: Unix socket path cannot be empty"
|
||||
<< std::endl;
|
||||
valid = false;
|
||||
}
|
||||
if (iface.path.length() > 107) { // UNIX_PATH_MAX is typically 108
|
||||
std::cerr << "Configuration error: Unix socket path too long (max 107 "
|
||||
"chars), got "
|
||||
<< iface.path.length() << " chars" << std::endl;
|
||||
valid = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -3,19 +3,40 @@
|
||||
#include <chrono>
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
namespace weaseldb {
|
||||
|
||||
/**
|
||||
* @brief Configuration for a single network interface to listen on.
|
||||
*/
|
||||
struct ListenInterface {
|
||||
enum class Type { TCP, Unix };
|
||||
|
||||
Type type;
|
||||
/// For TCP: IP address to bind to (e.g., "127.0.0.1", "0.0.0.0")
|
||||
std::string address;
|
||||
/// For TCP: port number
|
||||
int port = 0;
|
||||
/// For Unix: socket file path
|
||||
std::string path;
|
||||
|
||||
// Factory methods for cleaner config creation
|
||||
static ListenInterface tcp(const std::string &addr, int port_num) {
|
||||
return {Type::TCP, addr, port_num, ""};
|
||||
}
|
||||
|
||||
static ListenInterface unix_socket(const std::string &socket_path) {
|
||||
return {Type::Unix, "", 0, socket_path};
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* @brief Configuration settings for the WeaselDB server component.
|
||||
*/
|
||||
struct ServerConfig {
|
||||
/// IP address to bind the server to (default: localhost)
|
||||
std::string bind_address = "127.0.0.1";
|
||||
/// TCP port number for the server to listen on
|
||||
int port = 8080;
|
||||
/// Unix socket path (if specified, takes precedence over TCP)
|
||||
std::string unix_socket_path;
|
||||
/// Network interfaces to listen on (TCP and/or Unix sockets)
|
||||
std::vector<ListenInterface> interfaces;
|
||||
/// Maximum size in bytes for incoming HTTP requests (default: 1MB)
|
||||
int64_t max_request_size_bytes = 1024 * 1024;
|
||||
/// Number of I/O threads for handling connections and network events
|
||||
|
||||
@@ -209,7 +209,7 @@ struct Connection {
|
||||
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
|
||||
* ```
|
||||
*/
|
||||
int64_t outgoingBytesQueued() const {
|
||||
int64_t outgoing_bytes_queued() const {
|
||||
#ifndef NDEBUG
|
||||
// Debug build: validate counter accuracy
|
||||
int64_t computed_total = 0;
|
||||
|
||||
@@ -79,6 +79,8 @@
|
||||
* - **Simple concatenation**: Basic string + number + string combinations
|
||||
* - **Compile-time optimization**: When all types/values known at compile time
|
||||
* - **Template contexts**: Where compile-time buffer sizing is beneficial
|
||||
* - **IMPORTANT**: Only works with compile-time string literals, NOT runtime
|
||||
* const char*
|
||||
*
|
||||
* ## Optimization Details:
|
||||
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
|
||||
@@ -206,10 +208,12 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
|
||||
* optimized term writers for maximum speed.
|
||||
*
|
||||
* ## Supported Types:
|
||||
* - **String literals**: C-style string literals and arrays
|
||||
* - **String literals**: C-style string literals and arrays ("Hello", "World")
|
||||
* - **Integers**: All integral types (int, int64_t, uint32_t, etc.)
|
||||
* - **Floating point**: double (uses high-precision Grisu2 algorithm)
|
||||
* - **Custom types**: Via specialization of `detail::term()`
|
||||
* - **NOT supported**: const char* variables, std::string, std::string_view
|
||||
* variables
|
||||
*
|
||||
* ## Performance Characteristics:
|
||||
* - **Compile-time buffer sizing**: Buffer size calculated at compile time (no
|
||||
@@ -245,16 +249,23 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
|
||||
*
|
||||
* ## When to Use:
|
||||
* - **Hot paths**: Performance-critical code where formatting speed matters
|
||||
* - **Known types**: When argument types are known at compile time
|
||||
* - **Compile-time string literals**: All string arguments must be string
|
||||
* literals (e.g., "Hello")
|
||||
* - **Simple formatting**: Concatenation and basic type conversion
|
||||
* - **Template code**: Where compile-time optimization is beneficial
|
||||
* - **CANNOT use runtime strings**: No const char*, std::string, or string_view
|
||||
* variables
|
||||
*
|
||||
* ## When to Use format() Instead:
|
||||
* - **Printf-style formatting**: When you need format specifiers like "%d",
|
||||
* "%.2f"
|
||||
* - **Runtime flexibility**: When format strings come from variables/config
|
||||
* - **Complex formatting**: When you need padding, precision, etc.
|
||||
* - **Convenience**: For quick debugging or non-critical paths
|
||||
* - **Runtime strings**: When you have const char*, std::string, or string_view
|
||||
* variables
|
||||
* - **Dynamic content**: When format strings come from variables/config/user
|
||||
* input
|
||||
* - **Complex formatting**: When you need padding, precision, width specifiers
|
||||
* - **Mixed literal/runtime**: When combining string literals with runtime
|
||||
* string data
|
||||
*
|
||||
* @note All arguments are passed by forwarding reference for optimal
|
||||
* performance
|
||||
|
||||
@@ -1,19 +1,37 @@
|
||||
#include "http_handler.hpp"
|
||||
|
||||
#include <atomic>
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <strings.h>
|
||||
|
||||
#include "arena_allocator.hpp"
|
||||
#include "format.hpp"
|
||||
#include "json_commit_request_parser.hpp"
|
||||
#include "metric.hpp"
|
||||
#include "perfetto_categories.hpp"
|
||||
#include "pipeline_entry.hpp"
|
||||
#include "server.hpp"
|
||||
|
||||
auto requests_counter_family = metric::create_counter(
|
||||
"weaseldb_http_requests_total", "Total http requests");
|
||||
thread_local auto metrics_counter =
|
||||
requests_counter_family.create({{"path", "/metrics"}});
|
||||
|
||||
// API endpoint request counters
|
||||
thread_local auto commit_counter =
|
||||
requests_counter_family.create({{"path", "/v1/commit"}});
|
||||
thread_local auto status_counter =
|
||||
requests_counter_family.create({{"path", "/v1/status"}});
|
||||
thread_local auto version_counter =
|
||||
requests_counter_family.create({{"path", "/v1/version"}});
|
||||
|
||||
// Metric for banned request IDs memory usage
|
||||
auto banned_request_ids_memory_gauge =
|
||||
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
|
||||
"Memory used by banned request IDs arena")
|
||||
.create({});
|
||||
|
||||
// HttpConnectionState implementation
|
||||
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
|
||||
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
|
||||
@@ -61,16 +79,46 @@ void HttpHandler::on_write_buffer_drained(
|
||||
|
||||
void HttpHandler::on_batch_complete(
|
||||
std::span<std::unique_ptr<Connection>> batch) {
|
||||
int readyCount = 0;
|
||||
for (int i = 0; i < int(batch.size()); ++i) {
|
||||
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
|
||||
// Collect commit requests and status requests for pipeline processing
|
||||
int pipeline_count = 0;
|
||||
|
||||
// Count both commit and status requests
|
||||
for (auto &conn : batch) {
|
||||
if (conn && conn->user_data) {
|
||||
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||
|
||||
// Count commit requests that passed basic validation
|
||||
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||
state->parsing_commit && state->basic_validation_passed) {
|
||||
pipeline_count++;
|
||||
}
|
||||
// Count status requests
|
||||
else if (state->route == HttpRoute::GET_status &&
|
||||
// Error message not already queued
|
||||
conn->outgoing_bytes_queued() == 0) {
|
||||
pipeline_count++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (readyCount > 0) {
|
||||
auto guard = pipeline.push(readyCount, /*block=*/true);
|
||||
auto outIter = guard.batch.begin();
|
||||
for (int i = 0; i < int(batch.size()); ++i) {
|
||||
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
|
||||
*outIter++ = std::move(batch[i]);
|
||||
|
||||
// Send requests to 4-stage pipeline in batch
|
||||
if (pipeline_count > 0) {
|
||||
auto guard = commitPipeline.push(pipeline_count, true);
|
||||
auto out_iter = guard.batch.begin();
|
||||
|
||||
for (auto &conn : batch) {
|
||||
if (conn && conn->user_data) {
|
||||
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||
|
||||
// Create CommitEntry for commit requests
|
||||
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||
state->parsing_commit && state->basic_validation_passed) {
|
||||
*out_iter++ = CommitEntry{std::move(conn)};
|
||||
}
|
||||
// Create StatusEntry for status requests
|
||||
else if (state->route == HttpRoute::GET_status) {
|
||||
*out_iter++ = StatusEntry{std::move(conn)};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -80,7 +128,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
||||
std::unique_ptr<Connection> &conn_ptr) {
|
||||
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
|
||||
if (!state) {
|
||||
sendErrorResponse(*conn_ptr, 500, "Internal server error", true);
|
||||
send_error_response(*conn_ptr, 500, "Internal server error", true);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -94,7 +142,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
||||
llhttp_execute(&state->parser, data.data(), data.size());
|
||||
|
||||
if (err != HPE_OK) {
|
||||
sendErrorResponse(*conn_ptr, 400, "Bad request", true);
|
||||
send_error_response(*conn_ptr, 400, "Bad request", true);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -181,57 +229,161 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
|
||||
// Route handlers (basic implementations)
|
||||
void HttpHandler::handleGetVersion(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
sendJsonResponse(
|
||||
version_counter.inc();
|
||||
send_json_response(
|
||||
conn, 200,
|
||||
R"({"version":"0.0.1","leader":"node-1","committed_version":42})",
|
||||
format(conn.get_arena(), R"({"version":%ld,"leader":""})",
|
||||
this->committed_version.load(std::memory_order_seq_cst)),
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handlePostCommit(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Parse commit request from state.body and process
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"request_id":"example","status":"committed","version":43})",
|
||||
state.connection_close);
|
||||
commit_counter.inc();
|
||||
// Check if streaming parse was successful
|
||||
if (!state.commit_request || !state.parsing_commit) {
|
||||
const char *error = state.commit_parser
|
||||
? state.commit_parser->get_parse_error()
|
||||
: "No parser initialized";
|
||||
ArenaAllocator &arena = conn.get_arena();
|
||||
std::string_view error_msg =
|
||||
format(arena, "Parse failed: %s", error ? error : "Unknown error");
|
||||
send_error_response(conn, 400, error_msg, state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
const CommitRequest &commit_request = *state.commit_request;
|
||||
|
||||
// Perform basic validation that doesn't require serialization (done on I/O
|
||||
// threads)
|
||||
bool valid = true;
|
||||
std::string_view error_msg;
|
||||
|
||||
// Check that we have at least one operation
|
||||
if (commit_request.operations().empty()) {
|
||||
valid = false;
|
||||
error_msg = "Commit request must contain at least one operation";
|
||||
}
|
||||
|
||||
// Check leader_id is not empty
|
||||
if (valid && commit_request.leader_id().empty()) {
|
||||
valid = false;
|
||||
error_msg = "Commit request must specify a leader_id";
|
||||
}
|
||||
|
||||
// Check operations are well-formed
|
||||
if (valid) {
|
||||
for (const auto &op : commit_request.operations()) {
|
||||
if (op.param1.empty()) {
|
||||
valid = false;
|
||||
error_msg = "Operation key cannot be empty";
|
||||
break;
|
||||
}
|
||||
if (op.type == Operation::Type::Write && op.param2.empty()) {
|
||||
valid = false;
|
||||
error_msg = "Write operation value cannot be empty";
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!valid) {
|
||||
send_error_response(conn, 400, error_msg, state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
// Basic validation passed - mark for 4-stage pipeline processing
|
||||
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
|
||||
// Response will be sent after 4-stage pipeline processing is complete
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetSubscribe(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Implement subscription streaming
|
||||
sendJsonResponse(
|
||||
send_json_response(
|
||||
conn, 200,
|
||||
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetStatus(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract request_id from URL and check status
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"request_id":"example","status":"committed","version":43})",
|
||||
state.connection_close);
|
||||
HttpConnectionState &state) {
|
||||
status_counter.inc();
|
||||
// Status requests are processed through the pipeline
|
||||
// Response will be generated in the sequence stage
|
||||
// This handler extracts request_id from query parameters and prepares for
|
||||
// pipeline processing
|
||||
|
||||
// Extract request_id from query parameters:
|
||||
// /v1/status?request_id=<ID>&min_version=<VERSION>
|
||||
std::string_view url = state.url;
|
||||
|
||||
// Find query parameters
|
||||
size_t query_pos = url.find('?');
|
||||
if (query_pos == std::string_view::npos) {
|
||||
// No query parameters
|
||||
send_error_response(conn, 400,
|
||||
"Missing required query parameter: request_id",
|
||||
state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
std::string_view query_string = url.substr(query_pos + 1);
|
||||
|
||||
// Simple query parameter parsing for request_id
|
||||
// Look for "request_id=" in the query string
|
||||
size_t request_id_pos = query_string.find("request_id=");
|
||||
if (request_id_pos == std::string_view::npos) {
|
||||
send_error_response(conn, 400,
|
||||
"Missing required query parameter: request_id",
|
||||
state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
// Extract the request_id value
|
||||
size_t value_start = request_id_pos + 11; // length of "request_id="
|
||||
if (value_start >= query_string.length()) {
|
||||
send_error_response(conn, 400, "Empty request_id parameter",
|
||||
state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
// Find the end of the request_id value (next & or end of string)
|
||||
size_t value_end = query_string.find('&', value_start);
|
||||
if (value_end == std::string_view::npos) {
|
||||
value_end = query_string.length();
|
||||
}
|
||||
|
||||
state.status_request_id =
|
||||
query_string.substr(value_start, value_end - value_start);
|
||||
|
||||
if (state.status_request_id.empty()) {
|
||||
send_error_response(conn, 400, "Empty request_id parameter",
|
||||
state.connection_close);
|
||||
return;
|
||||
}
|
||||
|
||||
// Ready for pipeline processing
|
||||
}
|
||||
|
||||
void HttpHandler::handlePutRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Parse retention policy from body and store
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})",
|
||||
state.connection_close);
|
||||
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract policy_id from URL or return all policies
|
||||
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close);
|
||||
send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleDeleteRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract policy_id from URL and delete
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})",
|
||||
state.connection_close);
|
||||
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetMetrics(Connection &conn,
|
||||
@@ -255,16 +407,16 @@ void HttpHandler::handleGetMetrics(Connection &conn,
|
||||
arena, "HTTP/1.1 200 OK\r\n",
|
||||
"Content-Type: text/plain; version=0.0.4\r\n",
|
||||
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
||||
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
|
||||
"Connection: close\r\n", "\r\n");
|
||||
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
|
||||
"\r\n", "Connection: close\r\n", "\r\n");
|
||||
conn.close_after_send();
|
||||
} else {
|
||||
headers = static_format(
|
||||
arena, "HTTP/1.1 200 OK\r\n",
|
||||
"Content-Type: text/plain; version=0.0.4\r\n",
|
||||
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
||||
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
|
||||
"Connection: keep-alive\r\n", "\r\n");
|
||||
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
|
||||
"\r\n", "Connection: keep-alive\r\n", "\r\n");
|
||||
}
|
||||
|
||||
// Send headers
|
||||
@@ -278,91 +430,81 @@ void HttpHandler::handleGetMetrics(Connection &conn,
|
||||
|
||||
void HttpHandler::handleGetOk(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id));
|
||||
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
|
||||
|
||||
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleNotFound(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
sendErrorResponse(conn, 404, "Not found", state.connection_close);
|
||||
send_error_response(conn, 404, "Not found", state.connection_close);
|
||||
}
|
||||
|
||||
// HTTP utility methods
|
||||
void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
std::string_view content_type,
|
||||
std::string_view body, bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
||||
|
||||
// Build HTTP response using arena
|
||||
std::string response;
|
||||
response.reserve(256 + body.size());
|
||||
|
||||
response += "HTTP/1.1 ";
|
||||
response += std::to_string(status_code);
|
||||
response += " ";
|
||||
|
||||
// Status text
|
||||
switch (status_code) {
|
||||
case 200:
|
||||
response += "OK";
|
||||
break;
|
||||
case 400:
|
||||
response += "Bad Request";
|
||||
break;
|
||||
case 404:
|
||||
response += "Not Found";
|
||||
break;
|
||||
case 500:
|
||||
response += "Internal Server Error";
|
||||
break;
|
||||
default:
|
||||
response += "Unknown";
|
||||
break;
|
||||
}
|
||||
|
||||
ArenaAllocator &arena = conn.get_arena();
|
||||
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
|
||||
|
||||
response += "\r\n";
|
||||
response += "Content-Type: ";
|
||||
response += content_type;
|
||||
response += "\r\n";
|
||||
response += "Content-Length: ";
|
||||
response += std::to_string(body.size());
|
||||
response += "\r\n";
|
||||
response += "X-Response-ID: ";
|
||||
response += std::to_string(state->request_id);
|
||||
response += "\r\n";
|
||||
|
||||
if (close_connection) {
|
||||
response += "Connection: close\r\n";
|
||||
conn.close_after_send(); // Signal connection should be closed after sending
|
||||
} else {
|
||||
response += "Connection: keep-alive\r\n";
|
||||
// Status text
|
||||
std::string_view status_text;
|
||||
switch (status_code) {
|
||||
case 200:
|
||||
status_text = "OK";
|
||||
break;
|
||||
case 400:
|
||||
status_text = "Bad Request";
|
||||
break;
|
||||
case 404:
|
||||
status_text = "Not Found";
|
||||
break;
|
||||
case 500:
|
||||
status_text = "Internal Server Error";
|
||||
break;
|
||||
default:
|
||||
status_text = "Unknown";
|
||||
break;
|
||||
}
|
||||
|
||||
response += "\r\n";
|
||||
response += body;
|
||||
const char *connection_header = close_connection ? "close" : "keep-alive";
|
||||
|
||||
std::string_view response =
|
||||
format(arena,
|
||||
"HTTP/1.1 %d %.*s\r\n"
|
||||
"Content-Type: %.*s\r\n"
|
||||
"Content-Length: %zu\r\n"
|
||||
"X-Response-ID: %ld\r\n"
|
||||
"Connection: %s\r\n"
|
||||
"\r\n%.*s",
|
||||
status_code, static_cast<int>(status_text.size()),
|
||||
status_text.data(), static_cast<int>(content_type.size()),
|
||||
content_type.data(), body.size(), state->http_request_id,
|
||||
connection_header, static_cast<int>(body.size()), body.data());
|
||||
|
||||
if (close_connection) {
|
||||
conn.close_after_send();
|
||||
}
|
||||
|
||||
conn.append_message(response);
|
||||
}
|
||||
|
||||
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
|
||||
std::string_view json,
|
||||
bool close_connection) {
|
||||
void HttpHandler::send_json_response(Connection &conn, int status_code,
|
||||
std::string_view json,
|
||||
bool close_connection) {
|
||||
sendResponse(conn, status_code, "application/json", json, close_connection);
|
||||
}
|
||||
|
||||
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
|
||||
std::string_view message,
|
||||
bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
||||
void HttpHandler::send_error_response(Connection &conn, int status_code,
|
||||
std::string_view message,
|
||||
bool close_connection) {
|
||||
ArenaAllocator &arena = conn.get_arena();
|
||||
|
||||
std::string json = R"({"error":")";
|
||||
json += message;
|
||||
json += R"("})";
|
||||
std::string_view json =
|
||||
format(arena, R"({"error":"%.*s"})", static_cast<int>(message.size()),
|
||||
message.data());
|
||||
|
||||
sendJsonResponse(conn, status_code, json, close_connection);
|
||||
send_json_response(conn, status_code, json, close_connection);
|
||||
}
|
||||
|
||||
// llhttp callbacks
|
||||
@@ -423,7 +565,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
|
||||
id = id * 10 + (c - '0');
|
||||
}
|
||||
}
|
||||
state->request_id = id;
|
||||
state->http_request_id = id;
|
||||
}
|
||||
|
||||
// Clear buffers for next header
|
||||
@@ -443,19 +585,330 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
|
||||
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
|
||||
state->method = std::string_view(method_str);
|
||||
|
||||
// Check if this looks like a POST to /v1/commit to initialize streaming
|
||||
// parser
|
||||
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
|
||||
// Initialize streaming commit request parsing
|
||||
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
|
||||
state->commit_request = std::make_unique<CommitRequest>();
|
||||
state->parsing_commit =
|
||||
state->commit_parser->begin_streaming_parse(*state->commit_request);
|
||||
|
||||
if (!state->parsing_commit) {
|
||||
return -1; // Signal parsing error to llhttp
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
|
||||
[[maybe_unused]] auto *state =
|
||||
static_cast<HttpConnectionState *>(parser->data);
|
||||
(void)at;
|
||||
(void)length;
|
||||
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||
|
||||
if (state->parsing_commit && state->commit_parser) {
|
||||
// Stream data to commit request parser
|
||||
auto status =
|
||||
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
|
||||
if (status == CommitRequestParser::ParseStatus::Error) {
|
||||
return -1; // Signal parsing error to llhttp
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int HttpHandler::onMessageComplete(llhttp_t *parser) {
|
||||
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||
state->message_complete = true;
|
||||
|
||||
if (state->parsing_commit && state->commit_parser) {
|
||||
// Finish streaming parse
|
||||
auto status = state->commit_parser->finish_streaming_parse();
|
||||
if (status == CommitRequestParser::ParseStatus::Error) {
|
||||
return -1; // Signal parsing error to llhttp
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// Pipeline stage implementations (batch-based)
|
||||
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
||||
// Stage 0: Sequence assignment
|
||||
// This stage performs ONLY work that requires serial processing:
|
||||
// - Version/sequence number assignment (must be sequential)
|
||||
// - Request ID banned list management
|
||||
|
||||
for (auto &entry : batch) {
|
||||
// Pattern match on pipeline entry variant
|
||||
bool should_shutdown = std::visit(
|
||||
[&](auto &&e) -> bool {
|
||||
using T = std::decay_t<decltype(e)>;
|
||||
|
||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||
return true; // Signal shutdown
|
||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||
// Process commit entry: check banned list, assign version
|
||||
auto &commit_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
commit_entry.connection->user_data);
|
||||
|
||||
if (!state || !state->commit_request) {
|
||||
// Should not happen - basic validation was done on I/O thread
|
||||
send_error_response(*commit_entry.connection, 500,
|
||||
"Internal server error", true);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check if request_id is banned (for status queries)
|
||||
// Only check CommitRequest request_id, not HTTP header
|
||||
if (state->commit_request &&
|
||||
state->commit_request->request_id().has_value()) {
|
||||
auto commit_request_id =
|
||||
state->commit_request->request_id().value();
|
||||
if (banned_request_ids.find(commit_request_id) !=
|
||||
banned_request_ids.end()) {
|
||||
// Request ID is banned, this commit should fail
|
||||
send_json_response(
|
||||
*commit_entry.connection, 409,
|
||||
R"({"status": "not_committed", "error": "request_id_banned"})",
|
||||
state->connection_close);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Assign sequential version number
|
||||
commit_entry.assigned_version = next_version++;
|
||||
|
||||
TRACE_EVENT("http", "sequence_commit",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
|
||||
return false; // Continue processing
|
||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||
// Process status entry: add request_id to banned list, get version
|
||||
// upper bound
|
||||
auto &status_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
status_entry.connection->user_data);
|
||||
|
||||
if (state && !state->status_request_id.empty()) {
|
||||
// Add request_id to banned list - store the string in arena and
|
||||
// use string_view
|
||||
char *arena_chars = banned_request_arena.allocate<char>(
|
||||
state->status_request_id.size());
|
||||
std::memcpy(arena_chars, state->status_request_id.data(),
|
||||
state->status_request_id.size());
|
||||
std::string_view request_id_view(arena_chars,
|
||||
state->status_request_id.size());
|
||||
banned_request_ids.insert(request_id_view);
|
||||
|
||||
// Update memory usage metric
|
||||
banned_request_ids_memory_gauge.set(
|
||||
banned_request_arena.total_allocated());
|
||||
|
||||
// Set version upper bound to current highest assigned version
|
||||
status_entry.version_upper_bound = next_version - 1;
|
||||
}
|
||||
|
||||
TRACE_EVENT("http", "sequence_status",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
|
||||
// TODO: Transfer to status threadpool - for now just respond
|
||||
// not_committed
|
||||
send_json_response(*status_entry.connection, 200,
|
||||
R"({"status": "not_committed"})",
|
||||
state->connection_close);
|
||||
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
return false; // Unknown type, continue
|
||||
},
|
||||
entry);
|
||||
|
||||
if (should_shutdown) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||
// Stage 1: Precondition resolution
|
||||
// This stage must be serialized to maintain consistent database state view
|
||||
// - Validate preconditions against current database state
|
||||
// - Check for conflicts with other transactions
|
||||
|
||||
for (auto &entry : batch) {
|
||||
// Pattern match on pipeline entry variant
|
||||
bool should_shutdown = std::visit(
|
||||
[&](auto &&e) -> bool {
|
||||
using T = std::decay_t<decltype(e)>;
|
||||
|
||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||
return true; // Signal shutdown
|
||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||
// Process commit entry: accept all commits (simplified
|
||||
// implementation)
|
||||
auto &commit_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
commit_entry.connection->user_data);
|
||||
|
||||
if (!state || !state->commit_request) {
|
||||
// Skip processing for failed sequence stage
|
||||
return false;
|
||||
}
|
||||
|
||||
// Accept all commits (simplified implementation)
|
||||
commit_entry.resolve_success = true;
|
||||
|
||||
TRACE_EVENT("http", "resolve_commit",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
|
||||
return false; // Continue processing
|
||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||
// Status entries are not processed in resolve stage
|
||||
// They were already handled in sequence stage
|
||||
return false;
|
||||
}
|
||||
|
||||
return false; // Unknown type, continue
|
||||
},
|
||||
entry);
|
||||
|
||||
if (should_shutdown) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||
// Stage 2: Transaction persistence
|
||||
// Mark everything as durable immediately (simplified implementation)
|
||||
// In real implementation: batch S3 writes, update subscribers, etc.
|
||||
|
||||
for (auto &entry : batch) {
|
||||
// Pattern match on pipeline entry variant
|
||||
bool should_shutdown = std::visit(
|
||||
[&](auto &&e) -> bool {
|
||||
using T = std::decay_t<decltype(e)>;
|
||||
|
||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||
return true; // Signal shutdown
|
||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||
// Process commit entry: mark as durable, generate response
|
||||
auto &commit_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
commit_entry.connection->user_data);
|
||||
|
||||
// Skip if resolve failed or connection is in error state
|
||||
if (!state || !state->commit_request ||
|
||||
!commit_entry.resolve_success) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// Mark as persisted and update committed version high water mark
|
||||
commit_entry.persist_success = true;
|
||||
committed_version.store(commit_entry.assigned_version,
|
||||
std::memory_order_seq_cst);
|
||||
|
||||
TRACE_EVENT("http", "persist_commit",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
|
||||
const CommitRequest &commit_request = *state->commit_request;
|
||||
ArenaAllocator &arena = commit_entry.connection->get_arena();
|
||||
std::string_view response;
|
||||
|
||||
// Generate success response with actual assigned version
|
||||
if (commit_request.request_id().has_value()) {
|
||||
response = format(
|
||||
arena,
|
||||
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||
static_cast<int>(commit_request.request_id().value().size()),
|
||||
commit_request.request_id().value().data(),
|
||||
commit_entry.assigned_version);
|
||||
} else {
|
||||
response = format(
|
||||
arena,
|
||||
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||
commit_entry.assigned_version);
|
||||
}
|
||||
|
||||
send_json_response(*commit_entry.connection, 200, response,
|
||||
state->connection_close);
|
||||
|
||||
return false; // Continue processing
|
||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||
// Status entries are not processed in persist stage
|
||||
// They were already handled in sequence stage
|
||||
return false;
|
||||
}
|
||||
|
||||
return false; // Unknown type, continue
|
||||
},
|
||||
entry);
|
||||
|
||||
if (should_shutdown) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
bool HttpHandler::process_release_batch(BatchType &batch) {
|
||||
// Stage 3: Connection release
|
||||
// Return connections to server for response transmission
|
||||
|
||||
for (auto &entry : batch) {
|
||||
// Pattern match on pipeline entry variant
|
||||
bool should_shutdown = std::visit(
|
||||
[&](auto &&e) -> bool {
|
||||
using T = std::decay_t<decltype(e)>;
|
||||
|
||||
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||
return true; // Signal shutdown
|
||||
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||
// Process commit entry: return connection to server
|
||||
auto &commit_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
commit_entry.connection->user_data);
|
||||
|
||||
if (state) {
|
||||
TRACE_EVENT("http", "release_commit",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
}
|
||||
|
||||
// Return connection to server for further processing or cleanup
|
||||
Server::release_back_to_server(std::move(commit_entry.connection));
|
||||
|
||||
return false; // Continue processing
|
||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||
// Process status entry: return connection to server
|
||||
auto &status_entry = e;
|
||||
auto *state = static_cast<HttpConnectionState *>(
|
||||
status_entry.connection->user_data);
|
||||
|
||||
if (state) {
|
||||
TRACE_EVENT("http", "release_status",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
}
|
||||
|
||||
// Return connection to server for further processing or cleanup
|
||||
Server::release_back_to_server(std::move(status_entry.connection));
|
||||
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
return false; // Unknown type, continue
|
||||
},
|
||||
entry);
|
||||
|
||||
if (should_shutdown) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false; // Continue processing
|
||||
}
|
||||
|
||||
@@ -1,18 +1,26 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <string_view>
|
||||
#include <thread>
|
||||
#include <unordered_set>
|
||||
|
||||
#include <llhttp.h>
|
||||
|
||||
#include "arena_allocator.hpp"
|
||||
#include "connection.hpp"
|
||||
#include "connection_handler.hpp"
|
||||
#include "loop_iterations.hpp"
|
||||
#include "perfetto_categories.hpp"
|
||||
#include "pipeline_entry.hpp"
|
||||
#include "server.hpp"
|
||||
#include "thread_pipeline.hpp"
|
||||
|
||||
// Forward declarations
|
||||
struct CommitRequest;
|
||||
struct JsonCommitRequestParser;
|
||||
|
||||
/**
|
||||
* HTTP routes supported by WeaselDB server.
|
||||
* Using enum for efficient switch-based routing.
|
||||
@@ -48,13 +56,25 @@ struct HttpConnectionState {
|
||||
bool connection_close = false; // Client requested connection close
|
||||
HttpRoute route = HttpRoute::NotFound;
|
||||
|
||||
// Status request data
|
||||
std::string_view
|
||||
status_request_id; // Request ID extracted from /v1/status/{id} URL
|
||||
|
||||
// Header accumulation buffers (arena-allocated)
|
||||
using ArenaString =
|
||||
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
|
||||
ArenaString current_header_field_buf;
|
||||
ArenaString current_header_value_buf;
|
||||
bool header_field_complete = false;
|
||||
int64_t request_id = 0; // X-Request-Id header value
|
||||
int64_t http_request_id =
|
||||
0; // X-Request-Id header value (for tracing/logging)
|
||||
|
||||
// Streaming parser for POST requests
|
||||
std::unique_ptr<JsonCommitRequestParser> commit_parser;
|
||||
std::unique_ptr<CommitRequest> commit_request;
|
||||
bool parsing_commit = false;
|
||||
bool basic_validation_passed =
|
||||
false; // Set to true if basic validation passes
|
||||
|
||||
explicit HttpConnectionState(ArenaAllocator &arena);
|
||||
};
|
||||
@@ -64,70 +84,66 @@ struct HttpConnectionState {
|
||||
* Supports the WeaselDB REST API endpoints with enum-based routing.
|
||||
*/
|
||||
struct HttpHandler : ConnectionHandler {
|
||||
HttpHandler() {
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-0");
|
||||
HttpHandler()
|
||||
: banned_request_ids(
|
||||
ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
|
||||
// Stage 0: Sequence assignment thread
|
||||
sequenceThread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "txn-sequence");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 0>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
auto guard = commitPipeline.acquire<0, 0>();
|
||||
if (process_sequence_batch(guard.batch)) {
|
||||
return; // Shutdown signal received
|
||||
}
|
||||
}
|
||||
});
|
||||
finalStageThreads.emplace_back([this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-1-1");
|
||||
}};
|
||||
|
||||
// Stage 1: Precondition resolution thread
|
||||
resolveThread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "txn-resolve");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<1, 1>();
|
||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
||||
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
|
||||
auto &c = *it;
|
||||
if (!c) {
|
||||
return;
|
||||
}
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "release",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
auto guard = commitPipeline.acquire<1, 0>();
|
||||
if (process_resolve_batch(guard.batch)) {
|
||||
return; // Shutdown signal received
|
||||
}
|
||||
}
|
||||
});
|
||||
stage0Thread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "stage-0");
|
||||
int nulls = 0;
|
||||
}};
|
||||
|
||||
// Stage 2: Transaction persistence thread
|
||||
persistThread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "txn-persist");
|
||||
for (;;) {
|
||||
auto guard = pipeline.acquire<0, 0>(1);
|
||||
for (auto &c : guard.batch) {
|
||||
nulls += !c;
|
||||
if (nulls == 2) {
|
||||
return;
|
||||
}
|
||||
for (volatile int i = 0; i < loop_iterations; i = i + 1)
|
||||
;
|
||||
auto guard = commitPipeline.acquire<2, 0>();
|
||||
if (process_persist_batch(guard.batch)) {
|
||||
return; // Shutdown signal received
|
||||
}
|
||||
}
|
||||
}};
|
||||
|
||||
// Stage 3: Connection return to server thread
|
||||
releaseThread = std::thread{[this]() {
|
||||
pthread_setname_np(pthread_self(), "txn-release");
|
||||
for (;;) {
|
||||
auto guard = commitPipeline.acquire<3, 0>();
|
||||
if (process_release_batch(guard.batch)) {
|
||||
return; // Shutdown signal received
|
||||
}
|
||||
}
|
||||
}};
|
||||
}
|
||||
~HttpHandler() {
|
||||
// Send single shutdown signal that flows through all pipeline stages
|
||||
{
|
||||
auto guard = pipeline.push(2, true);
|
||||
for (auto &c : guard.batch) {
|
||||
c = {};
|
||||
}
|
||||
}
|
||||
stage0Thread.join();
|
||||
for (auto &thread : finalStageThreads) {
|
||||
thread.join();
|
||||
auto guard = commitPipeline.push(1, true);
|
||||
guard.batch[0] =
|
||||
ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
||||
}
|
||||
|
||||
// Join all pipeline threads
|
||||
sequenceThread.join();
|
||||
resolveThread.join();
|
||||
persistThread.join();
|
||||
releaseThread.join();
|
||||
}
|
||||
|
||||
void on_connection_established(Connection &conn) override;
|
||||
@@ -153,17 +169,49 @@ struct HttpHandler : ConnectionHandler {
|
||||
|
||||
private:
|
||||
static constexpr int lg_size = 16;
|
||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
|
||||
pipeline{lg_size};
|
||||
std::thread stage0Thread;
|
||||
std::vector<std::thread> finalStageThreads;
|
||||
|
||||
// Pipeline state (sequence thread only)
|
||||
int64_t next_version = 1; // Next version to assign (sequence thread only)
|
||||
|
||||
// Pipeline state (persist thread writes, I/O threads read)
|
||||
std::atomic<int64_t> committed_version{
|
||||
0}; // Highest committed version (persist thread writes, I/O threads read)
|
||||
|
||||
// Arena for banned request IDs and related data structures (sequence thread
|
||||
// only)
|
||||
ArenaAllocator banned_request_arena;
|
||||
using BannedRequestIdSet =
|
||||
std::unordered_set<std::string_view, std::hash<std::string_view>,
|
||||
std::equal_to<std::string_view>,
|
||||
ArenaStlAllocator<std::string_view>>;
|
||||
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
|
||||
// (string_views into arena)
|
||||
|
||||
// Main commit processing pipeline: sequence -> resolve -> persist -> release
|
||||
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
|
||||
1>
|
||||
commitPipeline{lg_size};
|
||||
|
||||
// Pipeline stage threads
|
||||
std::thread sequenceThread;
|
||||
std::thread resolveThread;
|
||||
std::thread persistThread;
|
||||
std::thread releaseThread;
|
||||
|
||||
// Pipeline stage processing methods (batch-based)
|
||||
using BatchType =
|
||||
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
|
||||
1, 1, 1>::Batch;
|
||||
bool process_sequence_batch(BatchType &batch);
|
||||
bool process_resolve_batch(BatchType &batch);
|
||||
bool process_persist_batch(BatchType &batch);
|
||||
bool process_release_batch(BatchType &batch);
|
||||
|
||||
// Route handlers
|
||||
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
|
||||
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
|
||||
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
|
||||
void handleGetStatus(Connection &conn, const HttpConnectionState &state);
|
||||
void handleGetStatus(Connection &conn, HttpConnectionState &state);
|
||||
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
|
||||
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
|
||||
void handleDeleteRetention(Connection &conn,
|
||||
@@ -176,10 +224,10 @@ private:
|
||||
static void sendResponse(Connection &conn, int status_code,
|
||||
std::string_view content_type, std::string_view body,
|
||||
bool close_connection = false);
|
||||
static void sendJsonResponse(Connection &conn, int status_code,
|
||||
std::string_view json,
|
||||
bool close_connection = false);
|
||||
static void sendErrorResponse(Connection &conn, int status_code,
|
||||
std::string_view message,
|
||||
bool close_connection = false);
|
||||
static void send_json_response(Connection &conn, int status_code,
|
||||
std::string_view json,
|
||||
bool close_connection = false);
|
||||
static void send_error_response(Connection &conn, int status_code,
|
||||
std::string_view message,
|
||||
bool close_connection = false);
|
||||
};
|
||||
|
||||
146
src/main.cpp
146
src/main.cpp
@@ -29,48 +29,41 @@ void signal_handler(int sig) {
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||
std::vector<int> listen_fds;
|
||||
|
||||
// Check if unix socket path is specified
|
||||
if (!config.server.unix_socket_path.empty()) {
|
||||
// Create unix socket
|
||||
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (sfd == -1) {
|
||||
perror("socket");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
// Remove existing socket file if it exists
|
||||
unlink(config.server.unix_socket_path.c_str());
|
||||
|
||||
struct sockaddr_un addr;
|
||||
std::memset(&addr, 0, sizeof(addr));
|
||||
addr.sun_family = AF_UNIX;
|
||||
|
||||
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
|
||||
std::fprintf(stderr, "Unix socket path too long\n");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
std::strncpy(addr.sun_path, config.server.unix_socket_path.c_str(),
|
||||
sizeof(addr.sun_path) - 1);
|
||||
|
||||
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
||||
perror("bind");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
if (listen(sfd, SOMAXCONN) == -1) {
|
||||
perror("listen");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
listen_fds.push_back(sfd);
|
||||
return listen_fds;
|
||||
int create_unix_socket(const std::string &path) {
|
||||
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||
if (sfd == -1) {
|
||||
perror("socket");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
// TCP socket creation
|
||||
// Remove existing socket file if it exists
|
||||
unlink(path.c_str());
|
||||
|
||||
struct sockaddr_un addr;
|
||||
std::memset(&addr, 0, sizeof(addr));
|
||||
addr.sun_family = AF_UNIX;
|
||||
|
||||
if (path.length() >= sizeof(addr.sun_path)) {
|
||||
std::fprintf(stderr, "Unix socket path too long: %s\n", path.c_str());
|
||||
std::abort();
|
||||
}
|
||||
|
||||
std::strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
|
||||
|
||||
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
||||
perror("bind");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
if (listen(sfd, SOMAXCONN) == -1) {
|
||||
perror("listen");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
return sfd;
|
||||
}
|
||||
|
||||
int create_tcp_socket(const std::string &address, int port) {
|
||||
struct addrinfo hints;
|
||||
struct addrinfo *result, *rp;
|
||||
int s;
|
||||
@@ -84,8 +77,8 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||
hints.ai_addr = nullptr;
|
||||
hints.ai_next = nullptr;
|
||||
|
||||
s = getaddrinfo(config.server.bind_address.c_str(),
|
||||
std::to_string(config.server.port).c_str(), &hints, &result);
|
||||
s = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints,
|
||||
&result);
|
||||
if (s != 0) {
|
||||
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
|
||||
std::abort();
|
||||
@@ -94,18 +87,13 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||
int sfd = -1;
|
||||
for (rp = result; rp != nullptr; rp = rp->ai_next) {
|
||||
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
||||
if (sfd == -1) {
|
||||
if (sfd == -1)
|
||||
continue;
|
||||
}
|
||||
|
||||
int val = 1;
|
||||
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
|
||||
perror("setsockopt SO_REUSEADDR");
|
||||
int e = close(sfd);
|
||||
if (e == -1 && errno != EINTR) {
|
||||
perror("close sfd (SO_REUSEADDR failed)");
|
||||
std::abort();
|
||||
}
|
||||
close(sfd);
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -113,40 +101,56 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
|
||||
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
|
||||
perror("setsockopt TCP_NODELAY");
|
||||
int e = close(sfd);
|
||||
if (e == -1 && errno != EINTR) {
|
||||
perror("close sfd (TCP_NODELAY failed)");
|
||||
std::abort();
|
||||
}
|
||||
close(sfd);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
|
||||
if (listen(sfd, SOMAXCONN) == -1) {
|
||||
perror("listen");
|
||||
close(sfd);
|
||||
freeaddrinfo(result);
|
||||
std::abort();
|
||||
}
|
||||
break; /* Success */
|
||||
}
|
||||
|
||||
int e = close(sfd);
|
||||
if (e == -1 && errno != EINTR) {
|
||||
perror("close sfd (bind failed)");
|
||||
std::abort();
|
||||
}
|
||||
close(sfd);
|
||||
sfd = -1;
|
||||
}
|
||||
|
||||
freeaddrinfo(result);
|
||||
|
||||
if (rp == nullptr || sfd == -1) {
|
||||
std::fprintf(stderr, "Could not bind to any address\n");
|
||||
if (sfd == -1) {
|
||||
std::fprintf(stderr, "Could not bind to %s:%d\n", address.c_str(), port);
|
||||
std::abort();
|
||||
}
|
||||
|
||||
if (listen(sfd, SOMAXCONN) == -1) {
|
||||
perror("listen");
|
||||
return sfd;
|
||||
}
|
||||
|
||||
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||
std::vector<int> listen_fds;
|
||||
|
||||
for (const auto &iface : config.server.interfaces) {
|
||||
int fd;
|
||||
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
|
||||
fd = create_tcp_socket(iface.address, iface.port);
|
||||
std::cout << "Listening on TCP " << iface.address << ":" << iface.port
|
||||
<< std::endl;
|
||||
} else {
|
||||
fd = create_unix_socket(iface.path);
|
||||
std::cout << "Listening on Unix socket " << iface.path << std::endl;
|
||||
}
|
||||
listen_fds.push_back(fd);
|
||||
}
|
||||
|
||||
if (listen_fds.empty()) {
|
||||
std::fprintf(stderr, "No interfaces configured\n");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
listen_fds.push_back(sfd);
|
||||
return listen_fds;
|
||||
}
|
||||
|
||||
@@ -218,13 +222,13 @@ int main(int argc, char *argv[]) {
|
||||
}
|
||||
|
||||
std::cout << "Configuration loaded successfully:" << std::endl;
|
||||
if (!config->server.unix_socket_path.empty()) {
|
||||
std::cout << "Unix socket path: " << config->server.unix_socket_path
|
||||
<< std::endl;
|
||||
} else {
|
||||
std::cout << "Server bind address: " << config->server.bind_address
|
||||
<< std::endl;
|
||||
std::cout << "Server port: " << config->server.port << std::endl;
|
||||
std::cout << "Interfaces: " << config->server.interfaces.size() << std::endl;
|
||||
for (const auto &iface : config->server.interfaces) {
|
||||
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
|
||||
std::cout << " TCP: " << iface.address << ":" << iface.port << std::endl;
|
||||
} else {
|
||||
std::cout << " Unix socket: " << iface.path << std::endl;
|
||||
}
|
||||
}
|
||||
std::cout << "Max request size: " << config->server.max_request_size_bytes
|
||||
<< " bytes" << std::endl;
|
||||
|
||||
47
src/pipeline_entry.hpp
Normal file
47
src/pipeline_entry.hpp
Normal file
@@ -0,0 +1,47 @@
|
||||
#pragma once
|
||||
|
||||
#include "connection.hpp"
|
||||
#include <memory>
|
||||
#include <variant>
|
||||
|
||||
/**
|
||||
* Pipeline entry for commit requests that need full 4-stage processing.
|
||||
* Contains connection with parsed CommitRequest.
|
||||
*/
|
||||
struct CommitEntry {
|
||||
std::unique_ptr<Connection> connection;
|
||||
int64_t assigned_version = 0; // Set by sequence stage
|
||||
bool resolve_success = false; // Set by resolve stage
|
||||
bool persist_success = false; // Set by persist stage
|
||||
|
||||
CommitEntry() = default; // Default constructor for variant
|
||||
explicit CommitEntry(std::unique_ptr<Connection> conn)
|
||||
: connection(std::move(conn)) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Pipeline entry for status requests that need sequence stage processing
|
||||
* then transfer to status threadpool.
|
||||
*/
|
||||
struct StatusEntry {
|
||||
std::unique_ptr<Connection> connection;
|
||||
int64_t version_upper_bound = 0; // Set by sequence stage
|
||||
|
||||
StatusEntry() = default; // Default constructor for variant
|
||||
explicit StatusEntry(std::unique_ptr<Connection> conn)
|
||||
: connection(std::move(conn)) {}
|
||||
};
|
||||
|
||||
/**
|
||||
* Pipeline entry for coordinated shutdown of all stages.
|
||||
* Flows through all stages to ensure proper cleanup.
|
||||
*/
|
||||
struct ShutdownEntry {
|
||||
// Empty struct - presence indicates shutdown
|
||||
};
|
||||
|
||||
/**
|
||||
* Pipeline entry variant type used by the commit processing pipeline.
|
||||
* Each stage pattern-matches on the variant type to handle appropriately.
|
||||
*/
|
||||
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
||||
@@ -98,9 +98,11 @@ Server::~Server() {
|
||||
}
|
||||
}
|
||||
|
||||
// Clean up unix socket file if it exists
|
||||
if (!config_.server.unix_socket_path.empty()) {
|
||||
unlink(config_.server.unix_socket_path.c_str());
|
||||
// Clean up unix socket files if they exist
|
||||
for (const auto &iface : config_.server.interfaces) {
|
||||
if (iface.type == weaseldb::ListenInterface::Type::Unix) {
|
||||
unlink(iface.path.c_str());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
21
style.md
21
style.md
@@ -102,6 +102,27 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
|
||||
- **String views** with `std::string_view` to minimize unnecessary copying
|
||||
- **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc)
|
||||
|
||||
### String Formatting
|
||||
- **Always use `format.hpp` functions** - formats directly into arena-allocated memory
|
||||
- **Use `static_format()` for performance-sensitive code** - faster but less flexible than `format()`
|
||||
- **Use `format()` function with arena allocator** for printf-style formatting
|
||||
```cpp
|
||||
// Most performance-sensitive - compile-time optimized concatenation
|
||||
std::string_view response = static_format(arena,
|
||||
"HTTP/1.1 ", status_code, " OK\r\n",
|
||||
"Content-Length: ", body.size(), "\r\n",
|
||||
"\r\n", body);
|
||||
|
||||
// Printf-style formatting - runtime flexible
|
||||
ArenaAllocator& arena = conn.get_arena();
|
||||
std::string_view response = format(arena,
|
||||
"HTTP/1.1 %d OK\r\n"
|
||||
"Content-Length: %zu\r\n"
|
||||
"\r\n%.*s",
|
||||
status_code, body.size(),
|
||||
static_cast<int>(body.size()), body.data());
|
||||
```
|
||||
|
||||
### Complexity Control
|
||||
- **Encapsulation is the main tool for controlling complexity**
|
||||
- **Header files define the interface** - they are the contract with users of your code
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
# WeaselDB Configuration File
|
||||
|
||||
[server]
|
||||
unix_socket_path = "weaseldb.sock"
|
||||
bind_address = "127.0.0.1"
|
||||
port = 8080
|
||||
# Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing
|
||||
interfaces = [
|
||||
{ type = "tcp", address = "127.0.0.1", port = 8080 },
|
||||
{ type = "unix", path = "weaseldb.sock" }
|
||||
]
|
||||
# Maximum request size in bytes (for 413 Content Too Large responses)
|
||||
max_request_size_bytes = 1048576 # 1MB
|
||||
# Number of I/O threads for handling connections and network events
|
||||
|
||||
Reference in New Issue
Block a user