Compare commits
7 Commits
b8eb00e313
...
55069c0c79
| Author | SHA1 | Date | |
|---|---|---|---|
| 55069c0c79 | |||
| 96aae52853 | |||
| 8b6736127a | |||
| 9272048108 | |||
| b2ffe3bfab | |||
| 978861c430 | |||
| 46edb7cd26 |
@@ -156,13 +156,25 @@ add_executable(
|
|||||||
test_http_handler
|
test_http_handler
|
||||||
tests/test_http_handler.cpp
|
tests/test_http_handler.cpp
|
||||||
src/http_handler.cpp
|
src/http_handler.cpp
|
||||||
|
src/server.cpp
|
||||||
|
src/config.cpp
|
||||||
|
src/json_commit_request_parser.cpp
|
||||||
src/arena_allocator.cpp
|
src/arena_allocator.cpp
|
||||||
src/format.cpp
|
src/format.cpp
|
||||||
src/connection.cpp
|
src/connection.cpp
|
||||||
src/connection_registry.cpp
|
src/connection_registry.cpp
|
||||||
src/metric.cpp)
|
src/metric.cpp
|
||||||
target_link_libraries(test_http_handler doctest::doctest llhttp_static
|
${CMAKE_BINARY_DIR}/json_tokens.cpp)
|
||||||
Threads::Threads perfetto simdutf::simdutf)
|
add_dependencies(test_http_handler generate_json_tokens)
|
||||||
|
target_link_libraries(
|
||||||
|
test_http_handler
|
||||||
|
doctest::doctest
|
||||||
|
llhttp_static
|
||||||
|
Threads::Threads
|
||||||
|
toml11::toml11
|
||||||
|
perfetto
|
||||||
|
simdutf::simdutf
|
||||||
|
weaseljson)
|
||||||
target_include_directories(test_http_handler PRIVATE src)
|
target_include_directories(test_http_handler PRIVATE src)
|
||||||
target_compile_definitions(test_http_handler
|
target_compile_definitions(test_http_handler
|
||||||
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
|
PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN)
|
||||||
@@ -177,6 +189,7 @@ add_executable(
|
|||||||
src/arena_allocator.cpp
|
src/arena_allocator.cpp
|
||||||
src/config.cpp
|
src/config.cpp
|
||||||
src/http_handler.cpp
|
src/http_handler.cpp
|
||||||
|
src/json_commit_request_parser.cpp
|
||||||
src/format.cpp
|
src/format.cpp
|
||||||
src/metric.cpp
|
src/metric.cpp
|
||||||
${CMAKE_BINARY_DIR}/json_tokens.cpp)
|
${CMAKE_BINARY_DIR}/json_tokens.cpp)
|
||||||
|
|||||||
499
commit_pipeline.md
Normal file
499
commit_pipeline.md
Normal file
@@ -0,0 +1,499 @@
|
|||||||
|
# Commit Processing Pipeline
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
WeaselDB implements a high-performance 4-stage commit processing pipeline that transforms HTTP commit requests into durable transactions. The pipeline provides strict serialization where needed while maximizing throughput through batching and asynchronous processing.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
The commit processing pipeline consists of four sequential stages, each running on a dedicated thread:
|
||||||
|
|
||||||
|
```
|
||||||
|
HTTP I/O Threads → [Sequence] → [Resolve] → [Persist] → [Release] → HTTP I/O Threads
|
||||||
|
```
|
||||||
|
|
||||||
|
### Pipeline Flow
|
||||||
|
|
||||||
|
1. **HTTP I/O Threads**: Parse and validate incoming commit requests
|
||||||
|
2. **Sequence Stage**: Assign sequential version numbers to commits
|
||||||
|
3. **Resolve Stage**: Validate preconditions and check for conflicts
|
||||||
|
4. **Persist Stage**: Write commits to durable storage and notify subscribers
|
||||||
|
5. **Release Stage**: Return connections to HTTP I/O threads for response handling
|
||||||
|
|
||||||
|
## Stage Details
|
||||||
|
|
||||||
|
### Stage 0: Sequence Assignment
|
||||||
|
|
||||||
|
**Thread**: `txn-sequence`
|
||||||
|
**Purpose**: Version assignment and request ID management
|
||||||
|
**Serialization**: **Required** - Must be single-threaded
|
||||||
|
|
||||||
|
**Responsibilities**:
|
||||||
|
- **For CommitEntry**: Check request_id against banned list, assign sequential version number if not banned, forward to resolve stage
|
||||||
|
- **For StatusEntry**: Add request_id to banned list, note current highest assigned version as upper bound, transfer connection to status threadpool
|
||||||
|
- Record version assignments for transaction tracking
|
||||||
|
|
||||||
|
**Why Serialization is Required**:
|
||||||
|
- Version numbers must be strictly sequential without gaps
|
||||||
|
- Banned list updates must be atomic with version assignment
|
||||||
|
- Status requests must get accurate upper bound on potential commit versions
|
||||||
|
|
||||||
|
**Request ID Banned List**:
|
||||||
|
- Purpose: Make transactions no longer in-flight and establish version upper bounds for status queries
|
||||||
|
- Lifecycle: Grows indefinitely until process restart (leader change)
|
||||||
|
- Removal: Only on process restart/leader change, which invalidates all old request IDs
|
||||||
|
|
||||||
|
**Current Implementation**:
|
||||||
|
```cpp
|
||||||
|
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
if (std::holds_alternative<ShutdownEntry>(entry)) {
|
||||||
|
return true; // Shutdown signal
|
||||||
|
}
|
||||||
|
// TODO: Pattern match on CommitEntry vs StatusEntry
|
||||||
|
// TODO: Implement sequence assignment logic for each type
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Stage 1: Precondition Resolution
|
||||||
|
|
||||||
|
**Thread**: `txn-resolve`
|
||||||
|
**Purpose**: Validate preconditions and detect conflicts
|
||||||
|
**Serialization**: **Required** - Must be single-threaded
|
||||||
|
|
||||||
|
**Responsibilities**:
|
||||||
|
- **For CommitEntry**: Check preconditions against in-memory recent writes set, add writes to recent writes set if accepted
|
||||||
|
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
||||||
|
- Mark failed commits with failure information (including which preconditions failed)
|
||||||
|
|
||||||
|
**Why Serialization is Required**:
|
||||||
|
- Must maintain consistent view of in-memory recent writes set
|
||||||
|
- Conflict detection requires atomic evaluation of all preconditions against recent writes
|
||||||
|
- Recent writes set updates must be synchronized
|
||||||
|
|
||||||
|
**Transaction State Transitions**:
|
||||||
|
- **Assigned Version** (from sequence) → **Semi-committed** (resolve accepts) → **Committed** (persist completes)
|
||||||
|
- Failed transactions continue through pipeline with failure information for client response
|
||||||
|
|
||||||
|
**Current Implementation**:
|
||||||
|
```cpp
|
||||||
|
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||||
|
// TODO: Implement precondition resolution logic:
|
||||||
|
// 1. For CommitEntry: Check preconditions against in-memory recent writes set
|
||||||
|
// 2. If accepted: Add writes to in-memory recent writes set, mark as semi-committed
|
||||||
|
// 3. If failed: Mark with failure info (which preconditions failed)
|
||||||
|
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Stage 2: Transaction Persistence
|
||||||
|
|
||||||
|
**Thread**: `txn-persist`
|
||||||
|
**Purpose**: Write semi-committed transactions to durable storage
|
||||||
|
**Serialization**: **Required** - Must mark batches durable in order
|
||||||
|
|
||||||
|
**Responsibilities**:
|
||||||
|
- **For CommitEntry**: Apply operations to persistent storage, update committed version high water mark
|
||||||
|
- **For StatusEntry**: N/A (transferred to status threadpool after sequence stage)
|
||||||
|
- Generate durability events for `/v1/subscribe` when committed version advances
|
||||||
|
- Batch multiple commits for efficient persistence operations
|
||||||
|
|
||||||
|
**Why Serialization is Required**:
|
||||||
|
- Batches must be marked durable in sequential version order
|
||||||
|
- High water mark updates must reflect strict ordering of committed versions
|
||||||
|
- Ensures consistency guarantees across all endpoints
|
||||||
|
|
||||||
|
**Committed Version High Water Mark**:
|
||||||
|
- Global atomic value tracking highest durably committed version
|
||||||
|
- Updated after each batch commits: set to highest version in the batch
|
||||||
|
- Read by `/v1/version` endpoint using atomic seq_cst reads
|
||||||
|
- Enables `/v1/subscribe` durability events when high water mark advances
|
||||||
|
|
||||||
|
**Batching Strategy**:
|
||||||
|
- Multiple semi-committed transactions can be persisted in a single batch
|
||||||
|
- High water mark updated once per batch to highest version in that batch
|
||||||
|
- See `persistence.md` for detailed persistence design
|
||||||
|
|
||||||
|
**Current Implementation**:
|
||||||
|
```cpp
|
||||||
|
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||||
|
// TODO: Implement actual persistence logic:
|
||||||
|
// 1. For CommitEntry: Apply operations to persistent storage
|
||||||
|
// 2. Update committed version high water mark to highest version in batch
|
||||||
|
// 3. Generate durability events for /v1/subscribe
|
||||||
|
// 4. For StatusEntry: N/A (already transferred to status threadpool)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Stage 3: Connection Release
|
||||||
|
|
||||||
|
**Thread**: `txn-release`
|
||||||
|
**Purpose**: Return connections to HTTP server for client response
|
||||||
|
**Serialization**: Not required - Independent connection handling
|
||||||
|
|
||||||
|
**Responsibilities**:
|
||||||
|
- Return processed connections to HTTP server for all request types
|
||||||
|
- Connection carries response data (success/failure) and status information
|
||||||
|
- Trigger response transmission to clients
|
||||||
|
|
||||||
|
**Response Handling**:
|
||||||
|
- **CommitRequests**: Response generated by persist stage (success with version, or failure with conflicting preconditions)
|
||||||
|
- **StatusRequests**: Response generated by separate status lookup logic (not part of pipeline)
|
||||||
|
- Failed transactions carry failure information through entire pipeline for proper client response
|
||||||
|
|
||||||
|
**Implementation**:
|
||||||
|
```cpp
|
||||||
|
bool HttpHandler::process_release_batch(BatchType &batch) {
|
||||||
|
// Stage 3: Connection release
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (!conn) {
|
||||||
|
return true; // Shutdown signal
|
||||||
|
}
|
||||||
|
// Return connection to server for further processing or cleanup
|
||||||
|
Server::release_back_to_server(std::move(conn));
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Threading Model
|
||||||
|
|
||||||
|
### Thread Pipeline Configuration
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
||||||
|
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
|
||||||
|
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
|
||||||
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||||
|
commitPipeline{lg_size};
|
||||||
|
|
||||||
|
// Pipeline entry type (to be implemented)
|
||||||
|
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
||||||
|
```
|
||||||
|
|
||||||
|
### Thread Creation and Management
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
HttpHandler() {
|
||||||
|
// Stage 0: Sequence assignment thread
|
||||||
|
sequenceThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-sequence");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = commitPipeline.acquire<0, 0>();
|
||||||
|
if (process_sequence_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}};
|
||||||
|
|
||||||
|
// Similar pattern for resolve, persist, and release threads...
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Batch Processing
|
||||||
|
|
||||||
|
Each stage processes connections in batches using RAII guards:
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
auto guard = commitPipeline.acquire<STAGE_NUM, 0>();
|
||||||
|
// Process batch
|
||||||
|
for (auto &conn : guard.batch) {
|
||||||
|
// Stage-specific processing
|
||||||
|
}
|
||||||
|
// Guard destructor automatically publishes batch to next stage
|
||||||
|
```
|
||||||
|
|
||||||
|
## Flow Control
|
||||||
|
|
||||||
|
### Pipeline Entry
|
||||||
|
|
||||||
|
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
|
||||||
|
// Collect commit requests that passed basic validation for 4-stage pipeline processing
|
||||||
|
int commit_count = 0;
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (conn && conn->user_data) {
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
if (state->route == HttpRoute::POST_commit &&
|
||||||
|
state->commit_request &&
|
||||||
|
state->parsing_commit) {
|
||||||
|
commit_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send commit requests to 4-stage pipeline in batch
|
||||||
|
if (commit_count > 0) {
|
||||||
|
auto guard = commitPipeline.push(commit_count, true);
|
||||||
|
// Move qualifying connections into pipeline
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Backpressure Handling
|
||||||
|
|
||||||
|
The pipeline implements natural backpressure:
|
||||||
|
- Each stage blocks if downstream stages are full
|
||||||
|
- `WaitIfUpstreamIdle` strategy balances latency vs throughput
|
||||||
|
- Ring buffer size (`lg_size = 16`) controls maximum queued batches
|
||||||
|
|
||||||
|
### Shutdown Coordination
|
||||||
|
|
||||||
|
Pipeline shutdown is coordinated by sending a single ShutdownEntry that flows through all stages:
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
~HttpHandler() {
|
||||||
|
// Send single shutdown signal that flows through all pipeline stages
|
||||||
|
{
|
||||||
|
auto guard = commitPipeline.push(1, true);
|
||||||
|
guard.batch[0] = ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join all pipeline threads
|
||||||
|
sequenceThread.join();
|
||||||
|
resolveThread.join();
|
||||||
|
persistThread.join();
|
||||||
|
releaseThread.join();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
**Note**: Multiple entries would only be needed if stages had multiple threads, with each thread needing its own shutdown signal.
|
||||||
|
|
||||||
|
## Error Handling
|
||||||
|
|
||||||
|
### Stage-Level Error Handling
|
||||||
|
|
||||||
|
Each stage handles different entry types:
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
// Pattern matching on pipeline entry variant
|
||||||
|
std::visit([&](auto&& entry) {
|
||||||
|
using T = std::decay_t<decltype(entry)>;
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Handle status entry (or skip if transferred)
|
||||||
|
}
|
||||||
|
}, pipeline_entry);
|
||||||
|
```
|
||||||
|
|
||||||
|
### Connection Error States
|
||||||
|
|
||||||
|
- Failed CommitEntries are passed through the pipeline with error information
|
||||||
|
- Downstream stages skip processing for error connections but forward them
|
||||||
|
- Error responses are sent when connection reaches release stage
|
||||||
|
- Connection ownership is always transferred to ensure cleanup
|
||||||
|
|
||||||
|
### Pipeline Integrity
|
||||||
|
|
||||||
|
- ShutdownEntry signals shutdown to all stages
|
||||||
|
- Each stage checks for ShutdownEntry and returns true to signal shutdown
|
||||||
|
- RAII guards ensure entries are always published downstream
|
||||||
|
- No entries are lost even during error conditions
|
||||||
|
|
||||||
|
## Performance Characteristics
|
||||||
|
|
||||||
|
### Throughput Optimization
|
||||||
|
|
||||||
|
- **Batching**: Multiple connections processed per stage activation
|
||||||
|
- **Lock-Free Communication**: Ring buffer between stages
|
||||||
|
- **Minimal Context Switching**: Dedicated threads per stage
|
||||||
|
- **Arena Allocation**: Efficient memory management throughout pipeline
|
||||||
|
|
||||||
|
### Latency Optimization
|
||||||
|
|
||||||
|
- **Single-Pass Processing**: Each connection flows through all stages once
|
||||||
|
- **Streaming Design**: Stages process concurrently
|
||||||
|
- **Minimal Copying**: Connection ownership transfer, not data copying
|
||||||
|
- **Direct Response**: Release stage triggers immediate response transmission
|
||||||
|
|
||||||
|
### Scalability Characteristics
|
||||||
|
|
||||||
|
- **Batch Size Tuning**: Ring buffer size controls memory vs latency tradeoff
|
||||||
|
- **Thread Affinity**: Dedicated threads reduce scheduling overhead
|
||||||
|
- **NUMA Awareness**: Can pin threads to specific CPU cores
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### Pipeline Parameters
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
private:
|
||||||
|
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
|
||||||
|
|
||||||
|
// 4-stage pipeline configuration
|
||||||
|
StaticThreadPipeline<std::unique_ptr<Connection>,
|
||||||
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||||
|
commitPipeline{lg_size};
|
||||||
|
```
|
||||||
|
|
||||||
|
### Tuning Considerations
|
||||||
|
|
||||||
|
- **Ring Buffer Size**: Larger buffers increase memory usage but improve batching
|
||||||
|
- **Wait Strategy**: `WaitIfUpstreamIdle` balances CPU usage vs latency
|
||||||
|
- **Thread Affinity**: OS scheduling vs explicit CPU pinning tradeoffs
|
||||||
|
|
||||||
|
## Pipeline Entry Types
|
||||||
|
|
||||||
|
The pipeline processes different types of entries using a variant/union type system instead of `std::unique_ptr<Connection>`:
|
||||||
|
|
||||||
|
### Pipeline Entry Variants
|
||||||
|
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
|
||||||
|
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
|
||||||
|
- **ShutdownEntry**: Signals pipeline shutdown to all stages
|
||||||
|
- **Future types**: Pipeline design supports additional entry types
|
||||||
|
|
||||||
|
### Stage Processing by Type
|
||||||
|
|
||||||
|
| Stage | CommitEntry | StatusEntry | ShutdownEntry | Serialization |
|
||||||
|
|-------|-------------|-------------|---------------|---------------|
|
||||||
|
| **Sequence** | Check banned list, assign version | Add to banned list, transfer to status threadpool | Return true (shutdown) | **Required** |
|
||||||
|
| **Resolve** | Check preconditions, update recent writes | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||||
|
| **Persist** | Apply operations, update high water mark | N/A (transferred) | Return true (shutdown) | **Required** |
|
||||||
|
| **Release** | Return connection to HTTP threads | N/A (transferred) | Return true (shutdown) | Not required |
|
||||||
|
|
||||||
|
## API Endpoint Integration
|
||||||
|
|
||||||
|
### `/v1/commit` - Transaction Submission
|
||||||
|
|
||||||
|
**Pipeline Interaction**: Full pipeline traversal as CommitEntry
|
||||||
|
|
||||||
|
#### Request Processing Flow
|
||||||
|
|
||||||
|
1. **HTTP I/O Thread Processing** (`src/http_handler.cpp:210-273`):
|
||||||
|
```cpp
|
||||||
|
void HttpHandler::handlePostCommit(Connection &conn, HttpConnectionState &state) {
|
||||||
|
// Parse and validate anything that doesn't need serialization:
|
||||||
|
// - JSON parsing and CommitRequest construction
|
||||||
|
// - Basic validation: leader_id check, operation format validation
|
||||||
|
// - Check that we have at least one operation
|
||||||
|
|
||||||
|
// If validation fails, send error response immediately and return
|
||||||
|
// If validation succeeds, connection will enter pipeline in on_batch_complete()
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Pipeline Entry**: Successfully parsed connections enter pipeline as CommitEntry (containing the connection with CommitRequest)
|
||||||
|
|
||||||
|
3. **Pipeline Processing**:
|
||||||
|
- **Sequence**: Check banned list → assign version (or reject)
|
||||||
|
- **Resolve**: Check preconditions against in-memory recent writes → mark semi-committed (or failed with conflict details)
|
||||||
|
- **Persist**: Apply operations → mark committed, update high water mark
|
||||||
|
- **Release**: Return connection with response data
|
||||||
|
|
||||||
|
4. **Response Generation**: Based on pipeline results
|
||||||
|
- **Success**: `{"status": "committed", "version": N, "leader_id": "...", "request_id": "..."}`
|
||||||
|
- **Failure**: `{"status": "not_committed", "conflicts": [...], "version": N, "leader_id": "..."}`
|
||||||
|
|
||||||
|
### `/v1/status` - Commit Status Lookup
|
||||||
|
|
||||||
|
**Pipeline Interaction**: StatusEntry through sequence stage, then transfer to status threadpool
|
||||||
|
|
||||||
|
#### Request Processing Flow
|
||||||
|
|
||||||
|
1. **HTTP I/O Thread Processing**:
|
||||||
|
```cpp
|
||||||
|
void HttpHandler::handleGetStatus(Connection &conn, const HttpConnectionState &state) {
|
||||||
|
// TODO: Extract request_id from URL and min_version from query params
|
||||||
|
// Current: Returns placeholder static response
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Two-Phase Processing**:
|
||||||
|
- **Phase 1 - Sequence Stage**: StatusEntry enters pipeline to add request_id to banned list and get version upper bound
|
||||||
|
- **Phase 2 - Status Threadpool**: Connection transferred from sequence stage to dedicated status threadpool for actual status lookup logic
|
||||||
|
|
||||||
|
3. **Status Lookup Logic**: Performed in status threadpool - scan transaction log to determine actual commit status of the now-banned request_id
|
||||||
|
|
||||||
|
### `/v1/subscribe` - Real-time Transaction Stream
|
||||||
|
|
||||||
|
**Pipeline Integration**: Consumes events from resolve and persist stages
|
||||||
|
|
||||||
|
#### Event Sources
|
||||||
|
- **Resolve Stage**: Semi-committed transactions (accepted preconditions) for low-latency streaming
|
||||||
|
- **Persist Stage**: Durability events when committed version high water mark advances
|
||||||
|
|
||||||
|
#### Current Implementation
|
||||||
|
```cpp
|
||||||
|
void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState &state) {
|
||||||
|
// TODO: Parse query parameters (after, durable)
|
||||||
|
// TODO: Establish Server-Sent Events stream
|
||||||
|
// TODO: Subscribe to resolve stage (semi-committed) and persist stage (durability) events
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### `/v1/version` - Version Information
|
||||||
|
|
||||||
|
**Pipeline Integration**: Direct atomic read, no pipeline interaction
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
// TODO: Implement direct atomic read of committed version high water mark
|
||||||
|
// No pipeline interaction needed - seq_cst atomic read
|
||||||
|
// Leader ID is process-lifetime constant
|
||||||
|
```
|
||||||
|
|
||||||
|
**Response**: `{"version": <high_water_mark>, "leader_id": "<process_leader_id>"}`
|
||||||
|
|
||||||
|
## Integration Points
|
||||||
|
|
||||||
|
### HTTP Handler Integration
|
||||||
|
|
||||||
|
The pipeline integrates with the HTTP handler at two points:
|
||||||
|
|
||||||
|
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
|
||||||
|
2. **Exit**: Release stage calls `Server::release_back_to_server()`
|
||||||
|
|
||||||
|
### Persistence Layer Integration
|
||||||
|
|
||||||
|
The persist stage interfaces with:
|
||||||
|
- **S3 Backend**: Batch writes for durability (see `persistence.md`)
|
||||||
|
- **Subscriber System**: Real-time change stream notifications
|
||||||
|
- **Metrics System**: Transaction throughput and latency tracking
|
||||||
|
|
||||||
|
### Database State Integration
|
||||||
|
|
||||||
|
- **Sequence Stage**: Updates version number generator
|
||||||
|
- **Resolve Stage**: Queries current database state for precondition validation
|
||||||
|
- **Persist Stage**: Applies mutations to authoritative database state
|
||||||
|
|
||||||
|
## Future Optimizations
|
||||||
|
|
||||||
|
### Potential Enhancements
|
||||||
|
|
||||||
|
1. **Dynamic Thread Counts**: Make resolve and release thread counts configurable
|
||||||
|
2. **NUMA Optimization**: Pin pipeline threads to specific CPU cores
|
||||||
|
3. **Batch Size Tuning**: Dynamic batch size based on load
|
||||||
|
4. **Stage Bypassing**: Skip resolve stage for transactions without preconditions
|
||||||
|
5. **Persistence Batching**: Aggregate multiple commits into larger S3 writes
|
||||||
|
|
||||||
|
### Monitoring and Observability
|
||||||
|
|
||||||
|
1. **Stage Metrics**: Throughput, latency, and queue depth per stage
|
||||||
|
2. **Error Tracking**: Error rates and types by stage
|
||||||
|
3. **Resource Utilization**: CPU and memory usage per pipeline thread
|
||||||
|
4. **Flow Control Events**: Backpressure and stall detection
|
||||||
|
|
||||||
|
## Implementation Status
|
||||||
|
|
||||||
|
### Current State
|
||||||
|
|
||||||
|
- ✅ Pipeline structure implemented with 4 stages
|
||||||
|
- ✅ Thread creation and management
|
||||||
|
- ✅ RAII batch processing
|
||||||
|
- ✅ Error handling framework
|
||||||
|
- ✅ Shutdown coordination
|
||||||
|
|
||||||
|
### TODO Items
|
||||||
|
|
||||||
|
- ⏳ Sequence assignment logic implementation
|
||||||
|
- ⏳ Precondition resolution implementation
|
||||||
|
- ⏳ S3 persistence batching implementation
|
||||||
|
- ⏳ Subscriber notification system
|
||||||
|
- ⏳ Performance monitoring and metrics
|
||||||
|
- ⏳ Configuration tuning and optimization
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
# WeaselDB Configuration File
|
# WeaselDB Configuration File
|
||||||
|
|
||||||
[server]
|
[server]
|
||||||
bind_address = "127.0.0.1"
|
# Network interfaces to listen on - production config with just TCP
|
||||||
port = 8080
|
interfaces = [
|
||||||
|
{ type = "tcp", address = "127.0.0.1", port = 8080 }
|
||||||
|
]
|
||||||
# Maximum request size in bytes (for 413 Content Too Large responses)
|
# Maximum request size in bytes (for 413 Content Too Large responses)
|
||||||
max_request_size_bytes = 1048576 # 1MB
|
max_request_size_bytes = 1048576 # 1MB
|
||||||
# Number of I/O threads for handling connections and network events
|
# Number of I/O threads for handling connections and network events
|
||||||
|
|||||||
@@ -79,9 +79,31 @@ void ConfigParser::parse_section(const auto &toml_data,
|
|||||||
void ConfigParser::parse_server_config(const auto &toml_data,
|
void ConfigParser::parse_server_config(const auto &toml_data,
|
||||||
ServerConfig &config) {
|
ServerConfig &config) {
|
||||||
parse_section(toml_data, "server", [&](const auto &srv) {
|
parse_section(toml_data, "server", [&](const auto &srv) {
|
||||||
parse_field(srv, "bind_address", config.bind_address);
|
// Parse interfaces array
|
||||||
parse_field(srv, "port", config.port);
|
if (srv.contains("interfaces")) {
|
||||||
parse_field(srv, "unix_socket_path", config.unix_socket_path);
|
auto interfaces = srv.at("interfaces");
|
||||||
|
if (interfaces.is_array()) {
|
||||||
|
for (const auto &iface : interfaces.as_array()) {
|
||||||
|
if (iface.contains("type")) {
|
||||||
|
std::string type = iface.at("type").as_string();
|
||||||
|
if (type == "tcp") {
|
||||||
|
std::string address = iface.at("address").as_string();
|
||||||
|
int port = iface.at("port").as_integer();
|
||||||
|
config.interfaces.push_back(ListenInterface::tcp(address, port));
|
||||||
|
} else if (type == "unix") {
|
||||||
|
std::string path = iface.at("path").as_string();
|
||||||
|
config.interfaces.push_back(ListenInterface::unix_socket(path));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no interfaces configured, use default TCP interface
|
||||||
|
if (config.interfaces.empty()) {
|
||||||
|
config.interfaces.push_back(ListenInterface::tcp("127.0.0.1", 8080));
|
||||||
|
}
|
||||||
|
|
||||||
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
|
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
|
||||||
parse_field(srv, "io_threads", config.io_threads);
|
parse_field(srv, "io_threads", config.io_threads);
|
||||||
|
|
||||||
@@ -127,24 +149,37 @@ void ConfigParser::parse_subscription_config(const auto &toml_data,
|
|||||||
bool ConfigParser::validate_config(const Config &config) {
|
bool ConfigParser::validate_config(const Config &config) {
|
||||||
bool valid = true;
|
bool valid = true;
|
||||||
|
|
||||||
// Validate server configuration
|
// Validate server interfaces
|
||||||
if (config.server.unix_socket_path.empty()) {
|
if (config.server.interfaces.empty()) {
|
||||||
// TCP mode validation
|
std::cerr << "Configuration error: no interfaces configured" << std::endl;
|
||||||
if (config.server.port <= 0 || config.server.port > 65535) {
|
valid = false;
|
||||||
std::cerr << "Configuration error: server.port must be between 1 and "
|
}
|
||||||
"65535, got "
|
|
||||||
<< config.server.port << std::endl;
|
for (const auto &iface : config.server.interfaces) {
|
||||||
valid = false;
|
if (iface.type == ListenInterface::Type::TCP) {
|
||||||
}
|
if (iface.port <= 0 || iface.port > 65535) {
|
||||||
} else {
|
std::cerr << "Configuration error: TCP port must be between 1 and "
|
||||||
// Unix socket mode validation
|
"65535, got "
|
||||||
if (config.server.unix_socket_path.length() >
|
<< iface.port << std::endl;
|
||||||
107) { // UNIX_PATH_MAX is typically 108
|
valid = false;
|
||||||
std::cerr << "Configuration error: unix_socket_path too long (max 107 "
|
}
|
||||||
"chars), got "
|
if (iface.address.empty()) {
|
||||||
<< config.server.unix_socket_path.length() << " chars"
|
std::cerr << "Configuration error: TCP address cannot be empty"
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
valid = false;
|
valid = false;
|
||||||
|
}
|
||||||
|
} else { // Unix socket
|
||||||
|
if (iface.path.empty()) {
|
||||||
|
std::cerr << "Configuration error: Unix socket path cannot be empty"
|
||||||
|
<< std::endl;
|
||||||
|
valid = false;
|
||||||
|
}
|
||||||
|
if (iface.path.length() > 107) { // UNIX_PATH_MAX is typically 108
|
||||||
|
std::cerr << "Configuration error: Unix socket path too long (max 107 "
|
||||||
|
"chars), got "
|
||||||
|
<< iface.path.length() << " chars" << std::endl;
|
||||||
|
valid = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,19 +3,40 @@
|
|||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <optional>
|
#include <optional>
|
||||||
#include <string>
|
#include <string>
|
||||||
|
#include <vector>
|
||||||
|
|
||||||
namespace weaseldb {
|
namespace weaseldb {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Configuration for a single network interface to listen on.
|
||||||
|
*/
|
||||||
|
struct ListenInterface {
|
||||||
|
enum class Type { TCP, Unix };
|
||||||
|
|
||||||
|
Type type;
|
||||||
|
/// For TCP: IP address to bind to (e.g., "127.0.0.1", "0.0.0.0")
|
||||||
|
std::string address;
|
||||||
|
/// For TCP: port number
|
||||||
|
int port = 0;
|
||||||
|
/// For Unix: socket file path
|
||||||
|
std::string path;
|
||||||
|
|
||||||
|
// Factory methods for cleaner config creation
|
||||||
|
static ListenInterface tcp(const std::string &addr, int port_num) {
|
||||||
|
return {Type::TCP, addr, port_num, ""};
|
||||||
|
}
|
||||||
|
|
||||||
|
static ListenInterface unix_socket(const std::string &socket_path) {
|
||||||
|
return {Type::Unix, "", 0, socket_path};
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Configuration settings for the WeaselDB server component.
|
* @brief Configuration settings for the WeaselDB server component.
|
||||||
*/
|
*/
|
||||||
struct ServerConfig {
|
struct ServerConfig {
|
||||||
/// IP address to bind the server to (default: localhost)
|
/// Network interfaces to listen on (TCP and/or Unix sockets)
|
||||||
std::string bind_address = "127.0.0.1";
|
std::vector<ListenInterface> interfaces;
|
||||||
/// TCP port number for the server to listen on
|
|
||||||
int port = 8080;
|
|
||||||
/// Unix socket path (if specified, takes precedence over TCP)
|
|
||||||
std::string unix_socket_path;
|
|
||||||
/// Maximum size in bytes for incoming HTTP requests (default: 1MB)
|
/// Maximum size in bytes for incoming HTTP requests (default: 1MB)
|
||||||
int64_t max_request_size_bytes = 1024 * 1024;
|
int64_t max_request_size_bytes = 1024 * 1024;
|
||||||
/// Number of I/O threads for handling connections and network events
|
/// Number of I/O threads for handling connections and network events
|
||||||
|
|||||||
@@ -209,7 +209,7 @@ struct Connection {
|
|||||||
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
|
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
int64_t outgoingBytesQueued() const {
|
int64_t outgoing_bytes_queued() const {
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
// Debug build: validate counter accuracy
|
// Debug build: validate counter accuracy
|
||||||
int64_t computed_total = 0;
|
int64_t computed_total = 0;
|
||||||
|
|||||||
@@ -79,6 +79,8 @@
|
|||||||
* - **Simple concatenation**: Basic string + number + string combinations
|
* - **Simple concatenation**: Basic string + number + string combinations
|
||||||
* - **Compile-time optimization**: When all types/values known at compile time
|
* - **Compile-time optimization**: When all types/values known at compile time
|
||||||
* - **Template contexts**: Where compile-time buffer sizing is beneficial
|
* - **Template contexts**: Where compile-time buffer sizing is beneficial
|
||||||
|
* - **IMPORTANT**: Only works with compile-time string literals, NOT runtime
|
||||||
|
* const char*
|
||||||
*
|
*
|
||||||
* ## Optimization Details:
|
* ## Optimization Details:
|
||||||
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
|
* The function uses `ArenaAllocator::allocate_remaining_space()` to claim all
|
||||||
@@ -206,10 +208,12 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
|
|||||||
* optimized term writers for maximum speed.
|
* optimized term writers for maximum speed.
|
||||||
*
|
*
|
||||||
* ## Supported Types:
|
* ## Supported Types:
|
||||||
* - **String literals**: C-style string literals and arrays
|
* - **String literals**: C-style string literals and arrays ("Hello", "World")
|
||||||
* - **Integers**: All integral types (int, int64_t, uint32_t, etc.)
|
* - **Integers**: All integral types (int, int64_t, uint32_t, etc.)
|
||||||
* - **Floating point**: double (uses high-precision Grisu2 algorithm)
|
* - **Floating point**: double (uses high-precision Grisu2 algorithm)
|
||||||
* - **Custom types**: Via specialization of `detail::term()`
|
* - **Custom types**: Via specialization of `detail::term()`
|
||||||
|
* - **NOT supported**: const char* variables, std::string, std::string_view
|
||||||
|
* variables
|
||||||
*
|
*
|
||||||
* ## Performance Characteristics:
|
* ## Performance Characteristics:
|
||||||
* - **Compile-time buffer sizing**: Buffer size calculated at compile time (no
|
* - **Compile-time buffer sizing**: Buffer size calculated at compile time (no
|
||||||
@@ -245,16 +249,23 @@ inline constexpr DoubleTerm term(double s) { return DoubleTerm(s); }
|
|||||||
*
|
*
|
||||||
* ## When to Use:
|
* ## When to Use:
|
||||||
* - **Hot paths**: Performance-critical code where formatting speed matters
|
* - **Hot paths**: Performance-critical code where formatting speed matters
|
||||||
* - **Known types**: When argument types are known at compile time
|
* - **Compile-time string literals**: All string arguments must be string
|
||||||
|
* literals (e.g., "Hello")
|
||||||
* - **Simple formatting**: Concatenation and basic type conversion
|
* - **Simple formatting**: Concatenation and basic type conversion
|
||||||
* - **Template code**: Where compile-time optimization is beneficial
|
* - **Template code**: Where compile-time optimization is beneficial
|
||||||
|
* - **CANNOT use runtime strings**: No const char*, std::string, or string_view
|
||||||
|
* variables
|
||||||
*
|
*
|
||||||
* ## When to Use format() Instead:
|
* ## When to Use format() Instead:
|
||||||
* - **Printf-style formatting**: When you need format specifiers like "%d",
|
* - **Printf-style formatting**: When you need format specifiers like "%d",
|
||||||
* "%.2f"
|
* "%.2f"
|
||||||
* - **Runtime flexibility**: When format strings come from variables/config
|
* - **Runtime strings**: When you have const char*, std::string, or string_view
|
||||||
* - **Complex formatting**: When you need padding, precision, etc.
|
* variables
|
||||||
* - **Convenience**: For quick debugging or non-critical paths
|
* - **Dynamic content**: When format strings come from variables/config/user
|
||||||
|
* input
|
||||||
|
* - **Complex formatting**: When you need padding, precision, width specifiers
|
||||||
|
* - **Mixed literal/runtime**: When combining string literals with runtime
|
||||||
|
* string data
|
||||||
*
|
*
|
||||||
* @note All arguments are passed by forwarding reference for optimal
|
* @note All arguments are passed by forwarding reference for optimal
|
||||||
* performance
|
* performance
|
||||||
|
|||||||
@@ -1,19 +1,37 @@
|
|||||||
#include "http_handler.hpp"
|
#include "http_handler.hpp"
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <strings.h>
|
#include <strings.h>
|
||||||
|
|
||||||
#include "arena_allocator.hpp"
|
#include "arena_allocator.hpp"
|
||||||
#include "format.hpp"
|
#include "format.hpp"
|
||||||
|
#include "json_commit_request_parser.hpp"
|
||||||
#include "metric.hpp"
|
#include "metric.hpp"
|
||||||
#include "perfetto_categories.hpp"
|
#include "perfetto_categories.hpp"
|
||||||
|
#include "pipeline_entry.hpp"
|
||||||
|
#include "server.hpp"
|
||||||
|
|
||||||
auto requests_counter_family = metric::create_counter(
|
auto requests_counter_family = metric::create_counter(
|
||||||
"weaseldb_http_requests_total", "Total http requests");
|
"weaseldb_http_requests_total", "Total http requests");
|
||||||
thread_local auto metrics_counter =
|
thread_local auto metrics_counter =
|
||||||
requests_counter_family.create({{"path", "/metrics"}});
|
requests_counter_family.create({{"path", "/metrics"}});
|
||||||
|
|
||||||
|
// API endpoint request counters
|
||||||
|
thread_local auto commit_counter =
|
||||||
|
requests_counter_family.create({{"path", "/v1/commit"}});
|
||||||
|
thread_local auto status_counter =
|
||||||
|
requests_counter_family.create({{"path", "/v1/status"}});
|
||||||
|
thread_local auto version_counter =
|
||||||
|
requests_counter_family.create({{"path", "/v1/version"}});
|
||||||
|
|
||||||
|
// Metric for banned request IDs memory usage
|
||||||
|
auto banned_request_ids_memory_gauge =
|
||||||
|
metric::create_gauge("weaseldb_banned_request_ids_memory_bytes",
|
||||||
|
"Memory used by banned request IDs arena")
|
||||||
|
.create({});
|
||||||
|
|
||||||
// HttpConnectionState implementation
|
// HttpConnectionState implementation
|
||||||
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
|
HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
|
||||||
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
|
: current_header_field_buf(ArenaStlAllocator<char>(&arena)),
|
||||||
@@ -61,16 +79,46 @@ void HttpHandler::on_write_buffer_drained(
|
|||||||
|
|
||||||
void HttpHandler::on_batch_complete(
|
void HttpHandler::on_batch_complete(
|
||||||
std::span<std::unique_ptr<Connection>> batch) {
|
std::span<std::unique_ptr<Connection>> batch) {
|
||||||
int readyCount = 0;
|
// Collect commit requests and status requests for pipeline processing
|
||||||
for (int i = 0; i < int(batch.size()); ++i) {
|
int pipeline_count = 0;
|
||||||
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
|
|
||||||
|
// Count both commit and status requests
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (conn && conn->user_data) {
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
|
||||||
|
// Count commit requests that passed basic validation
|
||||||
|
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||||
|
state->parsing_commit && state->basic_validation_passed) {
|
||||||
|
pipeline_count++;
|
||||||
|
}
|
||||||
|
// Count status requests
|
||||||
|
else if (state->route == HttpRoute::GET_status &&
|
||||||
|
// Error message not already queued
|
||||||
|
conn->outgoing_bytes_queued() == 0) {
|
||||||
|
pipeline_count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (readyCount > 0) {
|
|
||||||
auto guard = pipeline.push(readyCount, /*block=*/true);
|
// Send requests to 4-stage pipeline in batch
|
||||||
auto outIter = guard.batch.begin();
|
if (pipeline_count > 0) {
|
||||||
for (int i = 0; i < int(batch.size()); ++i) {
|
auto guard = commitPipeline.push(pipeline_count, true);
|
||||||
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
|
auto out_iter = guard.batch.begin();
|
||||||
*outIter++ = std::move(batch[i]);
|
|
||||||
|
for (auto &conn : batch) {
|
||||||
|
if (conn && conn->user_data) {
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(conn->user_data);
|
||||||
|
|
||||||
|
// Create CommitEntry for commit requests
|
||||||
|
if (state->route == HttpRoute::POST_commit && state->commit_request &&
|
||||||
|
state->parsing_commit && state->basic_validation_passed) {
|
||||||
|
*out_iter++ = CommitEntry{std::move(conn)};
|
||||||
|
}
|
||||||
|
// Create StatusEntry for status requests
|
||||||
|
else if (state->route == HttpRoute::GET_status) {
|
||||||
|
*out_iter++ = StatusEntry{std::move(conn)};
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -80,7 +128,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
|||||||
std::unique_ptr<Connection> &conn_ptr) {
|
std::unique_ptr<Connection> &conn_ptr) {
|
||||||
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
|
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
|
||||||
if (!state) {
|
if (!state) {
|
||||||
sendErrorResponse(*conn_ptr, 500, "Internal server error", true);
|
send_error_response(*conn_ptr, 500, "Internal server error", true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -94,7 +142,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
|||||||
llhttp_execute(&state->parser, data.data(), data.size());
|
llhttp_execute(&state->parser, data.data(), data.size());
|
||||||
|
|
||||||
if (err != HPE_OK) {
|
if (err != HPE_OK) {
|
||||||
sendErrorResponse(*conn_ptr, 400, "Bad request", true);
|
send_error_response(*conn_ptr, 400, "Bad request", true);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -181,57 +229,161 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
|
|||||||
// Route handlers (basic implementations)
|
// Route handlers (basic implementations)
|
||||||
void HttpHandler::handleGetVersion(Connection &conn,
|
void HttpHandler::handleGetVersion(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
sendJsonResponse(
|
version_counter.inc();
|
||||||
|
send_json_response(
|
||||||
conn, 200,
|
conn, 200,
|
||||||
R"({"version":"0.0.1","leader":"node-1","committed_version":42})",
|
format(conn.get_arena(), R"({"version":%ld,"leader":""})",
|
||||||
|
this->committed_version.load(std::memory_order_seq_cst)),
|
||||||
state.connection_close);
|
state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handlePostCommit(Connection &conn,
|
void HttpHandler::handlePostCommit(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
// TODO: Parse commit request from state.body and process
|
commit_counter.inc();
|
||||||
sendJsonResponse(
|
// Check if streaming parse was successful
|
||||||
conn, 200,
|
if (!state.commit_request || !state.parsing_commit) {
|
||||||
R"({"request_id":"example","status":"committed","version":43})",
|
const char *error = state.commit_parser
|
||||||
state.connection_close);
|
? state.commit_parser->get_parse_error()
|
||||||
|
: "No parser initialized";
|
||||||
|
ArenaAllocator &arena = conn.get_arena();
|
||||||
|
std::string_view error_msg =
|
||||||
|
format(arena, "Parse failed: %s", error ? error : "Unknown error");
|
||||||
|
send_error_response(conn, 400, error_msg, state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
const CommitRequest &commit_request = *state.commit_request;
|
||||||
|
|
||||||
|
// Perform basic validation that doesn't require serialization (done on I/O
|
||||||
|
// threads)
|
||||||
|
bool valid = true;
|
||||||
|
std::string_view error_msg;
|
||||||
|
|
||||||
|
// Check that we have at least one operation
|
||||||
|
if (commit_request.operations().empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Commit request must contain at least one operation";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check leader_id is not empty
|
||||||
|
if (valid && commit_request.leader_id().empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Commit request must specify a leader_id";
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check operations are well-formed
|
||||||
|
if (valid) {
|
||||||
|
for (const auto &op : commit_request.operations()) {
|
||||||
|
if (op.param1.empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Operation key cannot be empty";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (op.type == Operation::Type::Write && op.param2.empty()) {
|
||||||
|
valid = false;
|
||||||
|
error_msg = "Write operation value cannot be empty";
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!valid) {
|
||||||
|
send_error_response(conn, 400, error_msg, state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic validation passed - mark for 4-stage pipeline processing
|
||||||
|
const_cast<HttpConnectionState &>(state).basic_validation_passed = true;
|
||||||
|
// Response will be sent after 4-stage pipeline processing is complete
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleGetSubscribe(Connection &conn,
|
void HttpHandler::handleGetSubscribe(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
// TODO: Implement subscription streaming
|
// TODO: Implement subscription streaming
|
||||||
sendJsonResponse(
|
send_json_response(
|
||||||
conn, 200,
|
conn, 200,
|
||||||
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
|
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
|
||||||
state.connection_close);
|
state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleGetStatus(Connection &conn,
|
void HttpHandler::handleGetStatus(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
HttpConnectionState &state) {
|
||||||
// TODO: Extract request_id from URL and check status
|
status_counter.inc();
|
||||||
sendJsonResponse(
|
// Status requests are processed through the pipeline
|
||||||
conn, 200,
|
// Response will be generated in the sequence stage
|
||||||
R"({"request_id":"example","status":"committed","version":43})",
|
// This handler extracts request_id from query parameters and prepares for
|
||||||
state.connection_close);
|
// pipeline processing
|
||||||
|
|
||||||
|
// Extract request_id from query parameters:
|
||||||
|
// /v1/status?request_id=<ID>&min_version=<VERSION>
|
||||||
|
std::string_view url = state.url;
|
||||||
|
|
||||||
|
// Find query parameters
|
||||||
|
size_t query_pos = url.find('?');
|
||||||
|
if (query_pos == std::string_view::npos) {
|
||||||
|
// No query parameters
|
||||||
|
send_error_response(conn, 400,
|
||||||
|
"Missing required query parameter: request_id",
|
||||||
|
state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string_view query_string = url.substr(query_pos + 1);
|
||||||
|
|
||||||
|
// Simple query parameter parsing for request_id
|
||||||
|
// Look for "request_id=" in the query string
|
||||||
|
size_t request_id_pos = query_string.find("request_id=");
|
||||||
|
if (request_id_pos == std::string_view::npos) {
|
||||||
|
send_error_response(conn, 400,
|
||||||
|
"Missing required query parameter: request_id",
|
||||||
|
state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract the request_id value
|
||||||
|
size_t value_start = request_id_pos + 11; // length of "request_id="
|
||||||
|
if (value_start >= query_string.length()) {
|
||||||
|
send_error_response(conn, 400, "Empty request_id parameter",
|
||||||
|
state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find the end of the request_id value (next & or end of string)
|
||||||
|
size_t value_end = query_string.find('&', value_start);
|
||||||
|
if (value_end == std::string_view::npos) {
|
||||||
|
value_end = query_string.length();
|
||||||
|
}
|
||||||
|
|
||||||
|
state.status_request_id =
|
||||||
|
query_string.substr(value_start, value_end - value_start);
|
||||||
|
|
||||||
|
if (state.status_request_id.empty()) {
|
||||||
|
send_error_response(conn, 400, "Empty request_id parameter",
|
||||||
|
state.connection_close);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ready for pipeline processing
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handlePutRetention(Connection &conn,
|
void HttpHandler::handlePutRetention(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
// TODO: Parse retention policy from body and store
|
// TODO: Parse retention policy from body and store
|
||||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})",
|
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
|
||||||
state.connection_close);
|
state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleGetRetention(Connection &conn,
|
void HttpHandler::handleGetRetention(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
// TODO: Extract policy_id from URL or return all policies
|
// TODO: Extract policy_id from URL or return all policies
|
||||||
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close);
|
send_json_response(conn, 200, R"({"policies":[]})", state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleDeleteRetention(Connection &conn,
|
void HttpHandler::handleDeleteRetention(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
// TODO: Extract policy_id from URL and delete
|
// TODO: Extract policy_id from URL and delete
|
||||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})",
|
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
|
||||||
state.connection_close);
|
state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleGetMetrics(Connection &conn,
|
void HttpHandler::handleGetMetrics(Connection &conn,
|
||||||
@@ -255,16 +407,16 @@ void HttpHandler::handleGetMetrics(Connection &conn,
|
|||||||
arena, "HTTP/1.1 200 OK\r\n",
|
arena, "HTTP/1.1 200 OK\r\n",
|
||||||
"Content-Type: text/plain; version=0.0.4\r\n",
|
"Content-Type: text/plain; version=0.0.4\r\n",
|
||||||
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
||||||
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
|
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
|
||||||
"Connection: close\r\n", "\r\n");
|
"\r\n", "Connection: close\r\n", "\r\n");
|
||||||
conn.close_after_send();
|
conn.close_after_send();
|
||||||
} else {
|
} else {
|
||||||
headers = static_format(
|
headers = static_format(
|
||||||
arena, "HTTP/1.1 200 OK\r\n",
|
arena, "HTTP/1.1 200 OK\r\n",
|
||||||
"Content-Type: text/plain; version=0.0.4\r\n",
|
"Content-Type: text/plain; version=0.0.4\r\n",
|
||||||
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
"Content-Length: ", static_cast<uint64_t>(total_size), "\r\n",
|
||||||
"X-Response-ID: ", static_cast<int64_t>(http_state->request_id), "\r\n",
|
"X-Response-ID: ", static_cast<int64_t>(http_state->http_request_id),
|
||||||
"Connection: keep-alive\r\n", "\r\n");
|
"\r\n", "Connection: keep-alive\r\n", "\r\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send headers
|
// Send headers
|
||||||
@@ -278,91 +430,81 @@ void HttpHandler::handleGetMetrics(Connection &conn,
|
|||||||
|
|
||||||
void HttpHandler::handleGetOk(Connection &conn,
|
void HttpHandler::handleGetOk(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.request_id));
|
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
|
||||||
|
|
||||||
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
|
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::handleNotFound(Connection &conn,
|
void HttpHandler::handleNotFound(Connection &conn,
|
||||||
const HttpConnectionState &state) {
|
const HttpConnectionState &state) {
|
||||||
sendErrorResponse(conn, 404, "Not found", state.connection_close);
|
send_error_response(conn, 404, "Not found", state.connection_close);
|
||||||
}
|
}
|
||||||
|
|
||||||
// HTTP utility methods
|
// HTTP utility methods
|
||||||
void HttpHandler::sendResponse(Connection &conn, int status_code,
|
void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||||
std::string_view content_type,
|
std::string_view content_type,
|
||||||
std::string_view body, bool close_connection) {
|
std::string_view body, bool close_connection) {
|
||||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
ArenaAllocator &arena = conn.get_arena();
|
||||||
|
|
||||||
// Build HTTP response using arena
|
|
||||||
std::string response;
|
|
||||||
response.reserve(256 + body.size());
|
|
||||||
|
|
||||||
response += "HTTP/1.1 ";
|
|
||||||
response += std::to_string(status_code);
|
|
||||||
response += " ";
|
|
||||||
|
|
||||||
// Status text
|
|
||||||
switch (status_code) {
|
|
||||||
case 200:
|
|
||||||
response += "OK";
|
|
||||||
break;
|
|
||||||
case 400:
|
|
||||||
response += "Bad Request";
|
|
||||||
break;
|
|
||||||
case 404:
|
|
||||||
response += "Not Found";
|
|
||||||
break;
|
|
||||||
case 500:
|
|
||||||
response += "Internal Server Error";
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
response += "Unknown";
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
|
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
|
||||||
|
|
||||||
response += "\r\n";
|
// Status text
|
||||||
response += "Content-Type: ";
|
std::string_view status_text;
|
||||||
response += content_type;
|
switch (status_code) {
|
||||||
response += "\r\n";
|
case 200:
|
||||||
response += "Content-Length: ";
|
status_text = "OK";
|
||||||
response += std::to_string(body.size());
|
break;
|
||||||
response += "\r\n";
|
case 400:
|
||||||
response += "X-Response-ID: ";
|
status_text = "Bad Request";
|
||||||
response += std::to_string(state->request_id);
|
break;
|
||||||
response += "\r\n";
|
case 404:
|
||||||
|
status_text = "Not Found";
|
||||||
if (close_connection) {
|
break;
|
||||||
response += "Connection: close\r\n";
|
case 500:
|
||||||
conn.close_after_send(); // Signal connection should be closed after sending
|
status_text = "Internal Server Error";
|
||||||
} else {
|
break;
|
||||||
response += "Connection: keep-alive\r\n";
|
default:
|
||||||
|
status_text = "Unknown";
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
response += "\r\n";
|
const char *connection_header = close_connection ? "close" : "keep-alive";
|
||||||
response += body;
|
|
||||||
|
std::string_view response =
|
||||||
|
format(arena,
|
||||||
|
"HTTP/1.1 %d %.*s\r\n"
|
||||||
|
"Content-Type: %.*s\r\n"
|
||||||
|
"Content-Length: %zu\r\n"
|
||||||
|
"X-Response-ID: %ld\r\n"
|
||||||
|
"Connection: %s\r\n"
|
||||||
|
"\r\n%.*s",
|
||||||
|
status_code, static_cast<int>(status_text.size()),
|
||||||
|
status_text.data(), static_cast<int>(content_type.size()),
|
||||||
|
content_type.data(), body.size(), state->http_request_id,
|
||||||
|
connection_header, static_cast<int>(body.size()), body.data());
|
||||||
|
|
||||||
|
if (close_connection) {
|
||||||
|
conn.close_after_send();
|
||||||
|
}
|
||||||
|
|
||||||
conn.append_message(response);
|
conn.append_message(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
|
void HttpHandler::send_json_response(Connection &conn, int status_code,
|
||||||
std::string_view json,
|
std::string_view json,
|
||||||
bool close_connection) {
|
bool close_connection) {
|
||||||
sendResponse(conn, status_code, "application/json", json, close_connection);
|
sendResponse(conn, status_code, "application/json", json, close_connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
|
void HttpHandler::send_error_response(Connection &conn, int status_code,
|
||||||
std::string_view message,
|
std::string_view message,
|
||||||
bool close_connection) {
|
bool close_connection) {
|
||||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
ArenaAllocator &arena = conn.get_arena();
|
||||||
|
|
||||||
std::string json = R"({"error":")";
|
std::string_view json =
|
||||||
json += message;
|
format(arena, R"({"error":"%.*s"})", static_cast<int>(message.size()),
|
||||||
json += R"("})";
|
message.data());
|
||||||
|
|
||||||
sendJsonResponse(conn, status_code, json, close_connection);
|
send_json_response(conn, status_code, json, close_connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
// llhttp callbacks
|
// llhttp callbacks
|
||||||
@@ -423,7 +565,7 @@ int HttpHandler::onHeaderValueComplete(llhttp_t *parser) {
|
|||||||
id = id * 10 + (c - '0');
|
id = id * 10 + (c - '0');
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
state->request_id = id;
|
state->http_request_id = id;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clear buffers for next header
|
// Clear buffers for next header
|
||||||
@@ -443,19 +585,330 @@ int HttpHandler::onHeadersComplete(llhttp_t *parser) {
|
|||||||
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
|
llhttp_method_name(static_cast<llhttp_method_t>(parser->method));
|
||||||
state->method = std::string_view(method_str);
|
state->method = std::string_view(method_str);
|
||||||
|
|
||||||
|
// Check if this looks like a POST to /v1/commit to initialize streaming
|
||||||
|
// parser
|
||||||
|
if (state->method == "POST" && state->url.find("/v1/commit") == 0) {
|
||||||
|
// Initialize streaming commit request parsing
|
||||||
|
state->commit_parser = std::make_unique<JsonCommitRequestParser>();
|
||||||
|
state->commit_request = std::make_unique<CommitRequest>();
|
||||||
|
state->parsing_commit =
|
||||||
|
state->commit_parser->begin_streaming_parse(*state->commit_request);
|
||||||
|
|
||||||
|
if (!state->parsing_commit) {
|
||||||
|
return -1; // Signal parsing error to llhttp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
|
int HttpHandler::onBody(llhttp_t *parser, const char *at, size_t length) {
|
||||||
[[maybe_unused]] auto *state =
|
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||||
static_cast<HttpConnectionState *>(parser->data);
|
|
||||||
(void)at;
|
if (state->parsing_commit && state->commit_parser) {
|
||||||
(void)length;
|
// Stream data to commit request parser
|
||||||
|
auto status =
|
||||||
|
state->commit_parser->parse_chunk(const_cast<char *>(at), length);
|
||||||
|
if (status == CommitRequestParser::ParseStatus::Error) {
|
||||||
|
return -1; // Signal parsing error to llhttp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int HttpHandler::onMessageComplete(llhttp_t *parser) {
|
int HttpHandler::onMessageComplete(llhttp_t *parser) {
|
||||||
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||||
state->message_complete = true;
|
state->message_complete = true;
|
||||||
|
|
||||||
|
if (state->parsing_commit && state->commit_parser) {
|
||||||
|
// Finish streaming parse
|
||||||
|
auto status = state->commit_parser->finish_streaming_parse();
|
||||||
|
if (status == CommitRequestParser::ParseStatus::Error) {
|
||||||
|
return -1; // Signal parsing error to llhttp
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Pipeline stage implementations (batch-based)
|
||||||
|
bool HttpHandler::process_sequence_batch(BatchType &batch) {
|
||||||
|
// Stage 0: Sequence assignment
|
||||||
|
// This stage performs ONLY work that requires serial processing:
|
||||||
|
// - Version/sequence number assignment (must be sequential)
|
||||||
|
// - Request ID banned list management
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: check banned list, assign version
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
commit_entry.connection->user_data);
|
||||||
|
|
||||||
|
if (!state || !state->commit_request) {
|
||||||
|
// Should not happen - basic validation was done on I/O thread
|
||||||
|
send_error_response(*commit_entry.connection, 500,
|
||||||
|
"Internal server error", true);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if request_id is banned (for status queries)
|
||||||
|
// Only check CommitRequest request_id, not HTTP header
|
||||||
|
if (state->commit_request &&
|
||||||
|
state->commit_request->request_id().has_value()) {
|
||||||
|
auto commit_request_id =
|
||||||
|
state->commit_request->request_id().value();
|
||||||
|
if (banned_request_ids.find(commit_request_id) !=
|
||||||
|
banned_request_ids.end()) {
|
||||||
|
// Request ID is banned, this commit should fail
|
||||||
|
send_json_response(
|
||||||
|
*commit_entry.connection, 409,
|
||||||
|
R"({"status": "not_committed", "error": "request_id_banned"})",
|
||||||
|
state->connection_close);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assign sequential version number
|
||||||
|
commit_entry.assigned_version = next_version++;
|
||||||
|
|
||||||
|
TRACE_EVENT("http", "sequence_commit",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Process status entry: add request_id to banned list, get version
|
||||||
|
// upper bound
|
||||||
|
auto &status_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
status_entry.connection->user_data);
|
||||||
|
|
||||||
|
if (state && !state->status_request_id.empty()) {
|
||||||
|
// Add request_id to banned list - store the string in arena and
|
||||||
|
// use string_view
|
||||||
|
char *arena_chars = banned_request_arena.allocate<char>(
|
||||||
|
state->status_request_id.size());
|
||||||
|
std::memcpy(arena_chars, state->status_request_id.data(),
|
||||||
|
state->status_request_id.size());
|
||||||
|
std::string_view request_id_view(arena_chars,
|
||||||
|
state->status_request_id.size());
|
||||||
|
banned_request_ids.insert(request_id_view);
|
||||||
|
|
||||||
|
// Update memory usage metric
|
||||||
|
banned_request_ids_memory_gauge.set(
|
||||||
|
banned_request_arena.total_allocated());
|
||||||
|
|
||||||
|
// Set version upper bound to current highest assigned version
|
||||||
|
status_entry.version_upper_bound = next_version - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
TRACE_EVENT("http", "sequence_status",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
|
||||||
|
// TODO: Transfer to status threadpool - for now just respond
|
||||||
|
// not_committed
|
||||||
|
send_json_response(*status_entry.connection, 200,
|
||||||
|
R"({"status": "not_committed"})",
|
||||||
|
state->connection_close);
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpHandler::process_resolve_batch(BatchType &batch) {
|
||||||
|
// Stage 1: Precondition resolution
|
||||||
|
// This stage must be serialized to maintain consistent database state view
|
||||||
|
// - Validate preconditions against current database state
|
||||||
|
// - Check for conflicts with other transactions
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: accept all commits (simplified
|
||||||
|
// implementation)
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
commit_entry.connection->user_data);
|
||||||
|
|
||||||
|
if (!state || !state->commit_request) {
|
||||||
|
// Skip processing for failed sequence stage
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Accept all commits (simplified implementation)
|
||||||
|
commit_entry.resolve_success = true;
|
||||||
|
|
||||||
|
TRACE_EVENT("http", "resolve_commit",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Status entries are not processed in resolve stage
|
||||||
|
// They were already handled in sequence stage
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpHandler::process_persist_batch(BatchType &batch) {
|
||||||
|
// Stage 2: Transaction persistence
|
||||||
|
// Mark everything as durable immediately (simplified implementation)
|
||||||
|
// In real implementation: batch S3 writes, update subscribers, etc.
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: mark as durable, generate response
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
commit_entry.connection->user_data);
|
||||||
|
|
||||||
|
// Skip if resolve failed or connection is in error state
|
||||||
|
if (!state || !state->commit_request ||
|
||||||
|
!commit_entry.resolve_success) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark as persisted and update committed version high water mark
|
||||||
|
commit_entry.persist_success = true;
|
||||||
|
committed_version.store(commit_entry.assigned_version,
|
||||||
|
std::memory_order_seq_cst);
|
||||||
|
|
||||||
|
TRACE_EVENT("http", "persist_commit",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
|
||||||
|
const CommitRequest &commit_request = *state->commit_request;
|
||||||
|
ArenaAllocator &arena = commit_entry.connection->get_arena();
|
||||||
|
std::string_view response;
|
||||||
|
|
||||||
|
// Generate success response with actual assigned version
|
||||||
|
if (commit_request.request_id().has_value()) {
|
||||||
|
response = format(
|
||||||
|
arena,
|
||||||
|
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||||
|
static_cast<int>(commit_request.request_id().value().size()),
|
||||||
|
commit_request.request_id().value().data(),
|
||||||
|
commit_entry.assigned_version);
|
||||||
|
} else {
|
||||||
|
response = format(
|
||||||
|
arena,
|
||||||
|
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
|
||||||
|
commit_entry.assigned_version);
|
||||||
|
}
|
||||||
|
|
||||||
|
send_json_response(*commit_entry.connection, 200, response,
|
||||||
|
state->connection_close);
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Status entries are not processed in persist stage
|
||||||
|
// They were already handled in sequence stage
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
bool HttpHandler::process_release_batch(BatchType &batch) {
|
||||||
|
// Stage 3: Connection release
|
||||||
|
// Return connections to server for response transmission
|
||||||
|
|
||||||
|
for (auto &entry : batch) {
|
||||||
|
// Pattern match on pipeline entry variant
|
||||||
|
bool should_shutdown = std::visit(
|
||||||
|
[&](auto &&e) -> bool {
|
||||||
|
using T = std::decay_t<decltype(e)>;
|
||||||
|
|
||||||
|
if constexpr (std::is_same_v<T, ShutdownEntry>) {
|
||||||
|
return true; // Signal shutdown
|
||||||
|
} else if constexpr (std::is_same_v<T, CommitEntry>) {
|
||||||
|
// Process commit entry: return connection to server
|
||||||
|
auto &commit_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
commit_entry.connection->user_data);
|
||||||
|
|
||||||
|
if (state) {
|
||||||
|
TRACE_EVENT("http", "release_commit",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return connection to server for further processing or cleanup
|
||||||
|
Server::release_back_to_server(std::move(commit_entry.connection));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
|
// Process status entry: return connection to server
|
||||||
|
auto &status_entry = e;
|
||||||
|
auto *state = static_cast<HttpConnectionState *>(
|
||||||
|
status_entry.connection->user_data);
|
||||||
|
|
||||||
|
if (state) {
|
||||||
|
TRACE_EVENT("http", "release_status",
|
||||||
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return connection to server for further processing or cleanup
|
||||||
|
Server::release_back_to_server(std::move(status_entry.connection));
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Unknown type, continue
|
||||||
|
},
|
||||||
|
entry);
|
||||||
|
|
||||||
|
if (should_shutdown) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false; // Continue processing
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,18 +1,26 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <atomic>
|
||||||
#include <memory>
|
#include <memory>
|
||||||
#include <string_view>
|
#include <string_view>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
|
#include <unordered_set>
|
||||||
|
|
||||||
#include <llhttp.h>
|
#include <llhttp.h>
|
||||||
|
|
||||||
|
#include "arena_allocator.hpp"
|
||||||
#include "connection.hpp"
|
#include "connection.hpp"
|
||||||
#include "connection_handler.hpp"
|
#include "connection_handler.hpp"
|
||||||
#include "loop_iterations.hpp"
|
#include "loop_iterations.hpp"
|
||||||
#include "perfetto_categories.hpp"
|
#include "perfetto_categories.hpp"
|
||||||
|
#include "pipeline_entry.hpp"
|
||||||
#include "server.hpp"
|
#include "server.hpp"
|
||||||
#include "thread_pipeline.hpp"
|
#include "thread_pipeline.hpp"
|
||||||
|
|
||||||
|
// Forward declarations
|
||||||
|
struct CommitRequest;
|
||||||
|
struct JsonCommitRequestParser;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HTTP routes supported by WeaselDB server.
|
* HTTP routes supported by WeaselDB server.
|
||||||
* Using enum for efficient switch-based routing.
|
* Using enum for efficient switch-based routing.
|
||||||
@@ -48,13 +56,25 @@ struct HttpConnectionState {
|
|||||||
bool connection_close = false; // Client requested connection close
|
bool connection_close = false; // Client requested connection close
|
||||||
HttpRoute route = HttpRoute::NotFound;
|
HttpRoute route = HttpRoute::NotFound;
|
||||||
|
|
||||||
|
// Status request data
|
||||||
|
std::string_view
|
||||||
|
status_request_id; // Request ID extracted from /v1/status/{id} URL
|
||||||
|
|
||||||
// Header accumulation buffers (arena-allocated)
|
// Header accumulation buffers (arena-allocated)
|
||||||
using ArenaString =
|
using ArenaString =
|
||||||
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
|
std::basic_string<char, std::char_traits<char>, ArenaStlAllocator<char>>;
|
||||||
ArenaString current_header_field_buf;
|
ArenaString current_header_field_buf;
|
||||||
ArenaString current_header_value_buf;
|
ArenaString current_header_value_buf;
|
||||||
bool header_field_complete = false;
|
bool header_field_complete = false;
|
||||||
int64_t request_id = 0; // X-Request-Id header value
|
int64_t http_request_id =
|
||||||
|
0; // X-Request-Id header value (for tracing/logging)
|
||||||
|
|
||||||
|
// Streaming parser for POST requests
|
||||||
|
std::unique_ptr<JsonCommitRequestParser> commit_parser;
|
||||||
|
std::unique_ptr<CommitRequest> commit_request;
|
||||||
|
bool parsing_commit = false;
|
||||||
|
bool basic_validation_passed =
|
||||||
|
false; // Set to true if basic validation passes
|
||||||
|
|
||||||
explicit HttpConnectionState(ArenaAllocator &arena);
|
explicit HttpConnectionState(ArenaAllocator &arena);
|
||||||
};
|
};
|
||||||
@@ -64,70 +84,66 @@ struct HttpConnectionState {
|
|||||||
* Supports the WeaselDB REST API endpoints with enum-based routing.
|
* Supports the WeaselDB REST API endpoints with enum-based routing.
|
||||||
*/
|
*/
|
||||||
struct HttpHandler : ConnectionHandler {
|
struct HttpHandler : ConnectionHandler {
|
||||||
HttpHandler() {
|
HttpHandler()
|
||||||
finalStageThreads.emplace_back([this]() {
|
: banned_request_ids(
|
||||||
pthread_setname_np(pthread_self(), "stage-1-0");
|
ArenaStlAllocator<std::string_view>(&banned_request_arena)) {
|
||||||
|
// Stage 0: Sequence assignment thread
|
||||||
|
sequenceThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-sequence");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<1, 0>();
|
auto guard = commitPipeline.acquire<0, 0>();
|
||||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
if (process_sequence_batch(guard.batch)) {
|
||||||
if ((it.index() % 2) == 0) { // Thread 0 handles even indices
|
return; // Shutdown signal received
|
||||||
auto &c = *it;
|
|
||||||
if (!c) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
|
||||||
TRACE_EVENT("http", "release",
|
|
||||||
perfetto::Flow::Global(state->request_id));
|
|
||||||
Server::release_back_to_server(std::move(c));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}};
|
||||||
finalStageThreads.emplace_back([this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "stage-1-1");
|
// Stage 1: Precondition resolution thread
|
||||||
|
resolveThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-resolve");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<1, 1>();
|
auto guard = commitPipeline.acquire<1, 0>();
|
||||||
for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
|
if (process_resolve_batch(guard.batch)) {
|
||||||
if ((it.index() % 2) == 1) { // Thread 1 handles odd indices
|
return; // Shutdown signal received
|
||||||
auto &c = *it;
|
|
||||||
if (!c) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
|
||||||
TRACE_EVENT("http", "release",
|
|
||||||
perfetto::Flow::Global(state->request_id));
|
|
||||||
Server::release_back_to_server(std::move(c));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
}};
|
||||||
stage0Thread = std::thread{[this]() {
|
|
||||||
pthread_setname_np(pthread_self(), "stage-0");
|
// Stage 2: Transaction persistence thread
|
||||||
int nulls = 0;
|
persistThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-persist");
|
||||||
for (;;) {
|
for (;;) {
|
||||||
auto guard = pipeline.acquire<0, 0>(1);
|
auto guard = commitPipeline.acquire<2, 0>();
|
||||||
for (auto &c : guard.batch) {
|
if (process_persist_batch(guard.batch)) {
|
||||||
nulls += !c;
|
return; // Shutdown signal received
|
||||||
if (nulls == 2) {
|
}
|
||||||
return;
|
}
|
||||||
}
|
}};
|
||||||
for (volatile int i = 0; i < loop_iterations; i = i + 1)
|
|
||||||
;
|
// Stage 3: Connection return to server thread
|
||||||
|
releaseThread = std::thread{[this]() {
|
||||||
|
pthread_setname_np(pthread_self(), "txn-release");
|
||||||
|
for (;;) {
|
||||||
|
auto guard = commitPipeline.acquire<3, 0>();
|
||||||
|
if (process_release_batch(guard.batch)) {
|
||||||
|
return; // Shutdown signal received
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
}
|
}
|
||||||
~HttpHandler() {
|
~HttpHandler() {
|
||||||
|
// Send single shutdown signal that flows through all pipeline stages
|
||||||
{
|
{
|
||||||
auto guard = pipeline.push(2, true);
|
auto guard = commitPipeline.push(1, true);
|
||||||
for (auto &c : guard.batch) {
|
guard.batch[0] =
|
||||||
c = {};
|
ShutdownEntry{}; // Single ShutdownEntry flows through all stages
|
||||||
}
|
|
||||||
}
|
|
||||||
stage0Thread.join();
|
|
||||||
for (auto &thread : finalStageThreads) {
|
|
||||||
thread.join();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Join all pipeline threads
|
||||||
|
sequenceThread.join();
|
||||||
|
resolveThread.join();
|
||||||
|
persistThread.join();
|
||||||
|
releaseThread.join();
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_connection_established(Connection &conn) override;
|
void on_connection_established(Connection &conn) override;
|
||||||
@@ -153,17 +169,49 @@ struct HttpHandler : ConnectionHandler {
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
static constexpr int lg_size = 16;
|
static constexpr int lg_size = 16;
|
||||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
|
||||||
WaitStrategy::WaitIfUpstreamIdle, 1, 2>
|
// Pipeline state (sequence thread only)
|
||||||
pipeline{lg_size};
|
int64_t next_version = 1; // Next version to assign (sequence thread only)
|
||||||
std::thread stage0Thread;
|
|
||||||
std::vector<std::thread> finalStageThreads;
|
// Pipeline state (persist thread writes, I/O threads read)
|
||||||
|
std::atomic<int64_t> committed_version{
|
||||||
|
0}; // Highest committed version (persist thread writes, I/O threads read)
|
||||||
|
|
||||||
|
// Arena for banned request IDs and related data structures (sequence thread
|
||||||
|
// only)
|
||||||
|
ArenaAllocator banned_request_arena;
|
||||||
|
using BannedRequestIdSet =
|
||||||
|
std::unordered_set<std::string_view, std::hash<std::string_view>,
|
||||||
|
std::equal_to<std::string_view>,
|
||||||
|
ArenaStlAllocator<std::string_view>>;
|
||||||
|
BannedRequestIdSet banned_request_ids; // Request IDs that should not commit
|
||||||
|
// (string_views into arena)
|
||||||
|
|
||||||
|
// Main commit processing pipeline: sequence -> resolve -> persist -> release
|
||||||
|
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
|
||||||
|
1>
|
||||||
|
commitPipeline{lg_size};
|
||||||
|
|
||||||
|
// Pipeline stage threads
|
||||||
|
std::thread sequenceThread;
|
||||||
|
std::thread resolveThread;
|
||||||
|
std::thread persistThread;
|
||||||
|
std::thread releaseThread;
|
||||||
|
|
||||||
|
// Pipeline stage processing methods (batch-based)
|
||||||
|
using BatchType =
|
||||||
|
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
|
||||||
|
1, 1, 1>::Batch;
|
||||||
|
bool process_sequence_batch(BatchType &batch);
|
||||||
|
bool process_resolve_batch(BatchType &batch);
|
||||||
|
bool process_persist_batch(BatchType &batch);
|
||||||
|
bool process_release_batch(BatchType &batch);
|
||||||
|
|
||||||
// Route handlers
|
// Route handlers
|
||||||
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
|
void handleGetVersion(Connection &conn, const HttpConnectionState &state);
|
||||||
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
|
void handlePostCommit(Connection &conn, const HttpConnectionState &state);
|
||||||
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
|
void handleGetSubscribe(Connection &conn, const HttpConnectionState &state);
|
||||||
void handleGetStatus(Connection &conn, const HttpConnectionState &state);
|
void handleGetStatus(Connection &conn, HttpConnectionState &state);
|
||||||
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
|
void handlePutRetention(Connection &conn, const HttpConnectionState &state);
|
||||||
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
|
void handleGetRetention(Connection &conn, const HttpConnectionState &state);
|
||||||
void handleDeleteRetention(Connection &conn,
|
void handleDeleteRetention(Connection &conn,
|
||||||
@@ -176,10 +224,10 @@ private:
|
|||||||
static void sendResponse(Connection &conn, int status_code,
|
static void sendResponse(Connection &conn, int status_code,
|
||||||
std::string_view content_type, std::string_view body,
|
std::string_view content_type, std::string_view body,
|
||||||
bool close_connection = false);
|
bool close_connection = false);
|
||||||
static void sendJsonResponse(Connection &conn, int status_code,
|
static void send_json_response(Connection &conn, int status_code,
|
||||||
std::string_view json,
|
std::string_view json,
|
||||||
bool close_connection = false);
|
bool close_connection = false);
|
||||||
static void sendErrorResponse(Connection &conn, int status_code,
|
static void send_error_response(Connection &conn, int status_code,
|
||||||
std::string_view message,
|
std::string_view message,
|
||||||
bool close_connection = false);
|
bool close_connection = false);
|
||||||
};
|
};
|
||||||
|
|||||||
146
src/main.cpp
146
src/main.cpp
@@ -29,48 +29,41 @@ void signal_handler(int sig) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
int create_unix_socket(const std::string &path) {
|
||||||
std::vector<int> listen_fds;
|
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
|
||||||
|
if (sfd == -1) {
|
||||||
// Check if unix socket path is specified
|
perror("socket");
|
||||||
if (!config.server.unix_socket_path.empty()) {
|
std::abort();
|
||||||
// Create unix socket
|
|
||||||
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
|
|
||||||
if (sfd == -1) {
|
|
||||||
perror("socket");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove existing socket file if it exists
|
|
||||||
unlink(config.server.unix_socket_path.c_str());
|
|
||||||
|
|
||||||
struct sockaddr_un addr;
|
|
||||||
std::memset(&addr, 0, sizeof(addr));
|
|
||||||
addr.sun_family = AF_UNIX;
|
|
||||||
|
|
||||||
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
|
|
||||||
std::fprintf(stderr, "Unix socket path too long\n");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
std::strncpy(addr.sun_path, config.server.unix_socket_path.c_str(),
|
|
||||||
sizeof(addr.sun_path) - 1);
|
|
||||||
|
|
||||||
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
|
||||||
perror("bind");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (listen(sfd, SOMAXCONN) == -1) {
|
|
||||||
perror("listen");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
|
|
||||||
listen_fds.push_back(sfd);
|
|
||||||
return listen_fds;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TCP socket creation
|
// Remove existing socket file if it exists
|
||||||
|
unlink(path.c_str());
|
||||||
|
|
||||||
|
struct sockaddr_un addr;
|
||||||
|
std::memset(&addr, 0, sizeof(addr));
|
||||||
|
addr.sun_family = AF_UNIX;
|
||||||
|
|
||||||
|
if (path.length() >= sizeof(addr.sun_path)) {
|
||||||
|
std::fprintf(stderr, "Unix socket path too long: %s\n", path.c_str());
|
||||||
|
std::abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::strncpy(addr.sun_path, path.c_str(), sizeof(addr.sun_path) - 1);
|
||||||
|
|
||||||
|
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
||||||
|
perror("bind");
|
||||||
|
std::abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen(sfd, SOMAXCONN) == -1) {
|
||||||
|
perror("listen");
|
||||||
|
std::abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
return sfd;
|
||||||
|
}
|
||||||
|
|
||||||
|
int create_tcp_socket(const std::string &address, int port) {
|
||||||
struct addrinfo hints;
|
struct addrinfo hints;
|
||||||
struct addrinfo *result, *rp;
|
struct addrinfo *result, *rp;
|
||||||
int s;
|
int s;
|
||||||
@@ -84,8 +77,8 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
hints.ai_addr = nullptr;
|
hints.ai_addr = nullptr;
|
||||||
hints.ai_next = nullptr;
|
hints.ai_next = nullptr;
|
||||||
|
|
||||||
s = getaddrinfo(config.server.bind_address.c_str(),
|
s = getaddrinfo(address.c_str(), std::to_string(port).c_str(), &hints,
|
||||||
std::to_string(config.server.port).c_str(), &hints, &result);
|
&result);
|
||||||
if (s != 0) {
|
if (s != 0) {
|
||||||
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
|
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
|
||||||
std::abort();
|
std::abort();
|
||||||
@@ -94,18 +87,13 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
int sfd = -1;
|
int sfd = -1;
|
||||||
for (rp = result; rp != nullptr; rp = rp->ai_next) {
|
for (rp = result; rp != nullptr; rp = rp->ai_next) {
|
||||||
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
||||||
if (sfd == -1) {
|
if (sfd == -1)
|
||||||
continue;
|
continue;
|
||||||
}
|
|
||||||
|
|
||||||
int val = 1;
|
int val = 1;
|
||||||
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
|
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
|
||||||
perror("setsockopt SO_REUSEADDR");
|
perror("setsockopt SO_REUSEADDR");
|
||||||
int e = close(sfd);
|
close(sfd);
|
||||||
if (e == -1 && errno != EINTR) {
|
|
||||||
perror("close sfd (SO_REUSEADDR failed)");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -113,40 +101,56 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
|
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
|
||||||
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
|
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
|
||||||
perror("setsockopt TCP_NODELAY");
|
perror("setsockopt TCP_NODELAY");
|
||||||
int e = close(sfd);
|
close(sfd);
|
||||||
if (e == -1 && errno != EINTR) {
|
|
||||||
perror("close sfd (TCP_NODELAY failed)");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
|
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
|
||||||
|
if (listen(sfd, SOMAXCONN) == -1) {
|
||||||
|
perror("listen");
|
||||||
|
close(sfd);
|
||||||
|
freeaddrinfo(result);
|
||||||
|
std::abort();
|
||||||
|
}
|
||||||
break; /* Success */
|
break; /* Success */
|
||||||
}
|
}
|
||||||
|
|
||||||
int e = close(sfd);
|
close(sfd);
|
||||||
if (e == -1 && errno != EINTR) {
|
|
||||||
perror("close sfd (bind failed)");
|
|
||||||
std::abort();
|
|
||||||
}
|
|
||||||
sfd = -1;
|
sfd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
freeaddrinfo(result);
|
freeaddrinfo(result);
|
||||||
|
|
||||||
if (rp == nullptr || sfd == -1) {
|
if (sfd == -1) {
|
||||||
std::fprintf(stderr, "Could not bind to any address\n");
|
std::fprintf(stderr, "Could not bind to %s:%d\n", address.c_str(), port);
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listen(sfd, SOMAXCONN) == -1) {
|
return sfd;
|
||||||
perror("listen");
|
}
|
||||||
|
|
||||||
|
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
||||||
|
std::vector<int> listen_fds;
|
||||||
|
|
||||||
|
for (const auto &iface : config.server.interfaces) {
|
||||||
|
int fd;
|
||||||
|
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
|
||||||
|
fd = create_tcp_socket(iface.address, iface.port);
|
||||||
|
std::cout << "Listening on TCP " << iface.address << ":" << iface.port
|
||||||
|
<< std::endl;
|
||||||
|
} else {
|
||||||
|
fd = create_unix_socket(iface.path);
|
||||||
|
std::cout << "Listening on Unix socket " << iface.path << std::endl;
|
||||||
|
}
|
||||||
|
listen_fds.push_back(fd);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (listen_fds.empty()) {
|
||||||
|
std::fprintf(stderr, "No interfaces configured\n");
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
listen_fds.push_back(sfd);
|
|
||||||
return listen_fds;
|
return listen_fds;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -218,13 +222,13 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "Configuration loaded successfully:" << std::endl;
|
std::cout << "Configuration loaded successfully:" << std::endl;
|
||||||
if (!config->server.unix_socket_path.empty()) {
|
std::cout << "Interfaces: " << config->server.interfaces.size() << std::endl;
|
||||||
std::cout << "Unix socket path: " << config->server.unix_socket_path
|
for (const auto &iface : config->server.interfaces) {
|
||||||
<< std::endl;
|
if (iface.type == weaseldb::ListenInterface::Type::TCP) {
|
||||||
} else {
|
std::cout << " TCP: " << iface.address << ":" << iface.port << std::endl;
|
||||||
std::cout << "Server bind address: " << config->server.bind_address
|
} else {
|
||||||
<< std::endl;
|
std::cout << " Unix socket: " << iface.path << std::endl;
|
||||||
std::cout << "Server port: " << config->server.port << std::endl;
|
}
|
||||||
}
|
}
|
||||||
std::cout << "Max request size: " << config->server.max_request_size_bytes
|
std::cout << "Max request size: " << config->server.max_request_size_bytes
|
||||||
<< " bytes" << std::endl;
|
<< " bytes" << std::endl;
|
||||||
|
|||||||
47
src/pipeline_entry.hpp
Normal file
47
src/pipeline_entry.hpp
Normal file
@@ -0,0 +1,47 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include "connection.hpp"
|
||||||
|
#include <memory>
|
||||||
|
#include <variant>
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pipeline entry for commit requests that need full 4-stage processing.
|
||||||
|
* Contains connection with parsed CommitRequest.
|
||||||
|
*/
|
||||||
|
struct CommitEntry {
|
||||||
|
std::unique_ptr<Connection> connection;
|
||||||
|
int64_t assigned_version = 0; // Set by sequence stage
|
||||||
|
bool resolve_success = false; // Set by resolve stage
|
||||||
|
bool persist_success = false; // Set by persist stage
|
||||||
|
|
||||||
|
CommitEntry() = default; // Default constructor for variant
|
||||||
|
explicit CommitEntry(std::unique_ptr<Connection> conn)
|
||||||
|
: connection(std::move(conn)) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pipeline entry for status requests that need sequence stage processing
|
||||||
|
* then transfer to status threadpool.
|
||||||
|
*/
|
||||||
|
struct StatusEntry {
|
||||||
|
std::unique_ptr<Connection> connection;
|
||||||
|
int64_t version_upper_bound = 0; // Set by sequence stage
|
||||||
|
|
||||||
|
StatusEntry() = default; // Default constructor for variant
|
||||||
|
explicit StatusEntry(std::unique_ptr<Connection> conn)
|
||||||
|
: connection(std::move(conn)) {}
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pipeline entry for coordinated shutdown of all stages.
|
||||||
|
* Flows through all stages to ensure proper cleanup.
|
||||||
|
*/
|
||||||
|
struct ShutdownEntry {
|
||||||
|
// Empty struct - presence indicates shutdown
|
||||||
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pipeline entry variant type used by the commit processing pipeline.
|
||||||
|
* Each stage pattern-matches on the variant type to handle appropriately.
|
||||||
|
*/
|
||||||
|
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
||||||
@@ -98,9 +98,11 @@ Server::~Server() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Clean up unix socket file if it exists
|
// Clean up unix socket files if they exist
|
||||||
if (!config_.server.unix_socket_path.empty()) {
|
for (const auto &iface : config_.server.interfaces) {
|
||||||
unlink(config_.server.unix_socket_path.c_str());
|
if (iface.type == weaseldb::ListenInterface::Type::Unix) {
|
||||||
|
unlink(iface.path.c_str());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
21
style.md
21
style.md
@@ -102,6 +102,27 @@ auto addr = reinterpret_cast<uintptr_t>(ptr); // Pointer to integer conv
|
|||||||
- **String views** with `std::string_view` to minimize unnecessary copying
|
- **String views** with `std::string_view` to minimize unnecessary copying
|
||||||
- **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc)
|
- **Arena allocation** for efficient memory management (~1ns vs ~20-270ns for malloc)
|
||||||
|
|
||||||
|
### String Formatting
|
||||||
|
- **Always use `format.hpp` functions** - formats directly into arena-allocated memory
|
||||||
|
- **Use `static_format()` for performance-sensitive code** - faster but less flexible than `format()`
|
||||||
|
- **Use `format()` function with arena allocator** for printf-style formatting
|
||||||
|
```cpp
|
||||||
|
// Most performance-sensitive - compile-time optimized concatenation
|
||||||
|
std::string_view response = static_format(arena,
|
||||||
|
"HTTP/1.1 ", status_code, " OK\r\n",
|
||||||
|
"Content-Length: ", body.size(), "\r\n",
|
||||||
|
"\r\n", body);
|
||||||
|
|
||||||
|
// Printf-style formatting - runtime flexible
|
||||||
|
ArenaAllocator& arena = conn.get_arena();
|
||||||
|
std::string_view response = format(arena,
|
||||||
|
"HTTP/1.1 %d OK\r\n"
|
||||||
|
"Content-Length: %zu\r\n"
|
||||||
|
"\r\n%.*s",
|
||||||
|
status_code, body.size(),
|
||||||
|
static_cast<int>(body.size()), body.data());
|
||||||
|
```
|
||||||
|
|
||||||
### Complexity Control
|
### Complexity Control
|
||||||
- **Encapsulation is the main tool for controlling complexity**
|
- **Encapsulation is the main tool for controlling complexity**
|
||||||
- **Header files define the interface** - they are the contract with users of your code
|
- **Header files define the interface** - they are the contract with users of your code
|
||||||
|
|||||||
@@ -1,9 +1,11 @@
|
|||||||
# WeaselDB Configuration File
|
# WeaselDB Configuration File
|
||||||
|
|
||||||
[server]
|
[server]
|
||||||
unix_socket_path = "weaseldb.sock"
|
# Network interfaces to listen on - both TCP for external access and Unix socket for high-performance local testing
|
||||||
bind_address = "127.0.0.1"
|
interfaces = [
|
||||||
port = 8080
|
{ type = "tcp", address = "127.0.0.1", port = 8080 },
|
||||||
|
{ type = "unix", path = "weaseldb.sock" }
|
||||||
|
]
|
||||||
# Maximum request size in bytes (for 413 Content Too Large responses)
|
# Maximum request size in bytes (for 413 Content Too Large responses)
|
||||||
max_request_size_bytes = 1048576 # 1MB
|
max_request_size_bytes = 1048576 # 1MB
|
||||||
# Number of I/O threads for handling connections and network events
|
# Number of I/O threads for handling connections and network events
|
||||||
|
|||||||
Reference in New Issue
Block a user