Update documentation with new networking model
This commit is contained in:
118
design.md
118
design.md
@@ -259,37 +259,37 @@ CommitRequest {
|
|||||||
|
|
||||||
> **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written.
|
> **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written.
|
||||||
|
|
||||||
#### Threading Model and EPOLLONESHOT
|
#### Threading Model and Server-Owned Connections
|
||||||
|
|
||||||
**EPOLLONESHOT Design Rationale:**
|
**Server-Owned Connection Design:**
|
||||||
WeaselDB uses `EPOLLONESHOT` for all connection file descriptors to enable safe multi-threaded ownership transfer without complex synchronization:
|
WeaselDB uses a server-owned connection model where the server retains ownership of all connections while providing safe concurrent access to handlers:
|
||||||
|
|
||||||
**Key Benefits:**
|
**Key Benefits:**
|
||||||
|
|
||||||
1. **Automatic fd disarming** - When epoll triggers an event, the fd is automatically removed from epoll monitoring
|
1. **Simplified ownership** - Server always owns connections, eliminating complex ownership transfers
|
||||||
1. **Race-free ownership transfer** - Handlers can safely take connection ownership and move to other threads
|
1. **Safe concurrent access** - Connection mutexes synchronize access between I/O threads and handlers
|
||||||
1. **Zero-coordination async processing** - No manual synchronization needed between network threads and handler threads
|
1. **WeakRef pattern** - Handlers use WeakRef<Connection> for safe async processing without ownership
|
||||||
|
|
||||||
**Threading Flow:**
|
**Threading Flow:**
|
||||||
|
|
||||||
1. **Event Trigger**: Network thread gets epoll event → connection auto-disarmed via ONESHOT
|
1. **Event Trigger**: Network thread gets epoll event and processes data
|
||||||
1. **Safe Transfer**: Handler can take ownership (`std::move(conn_ptr)`) with no epoll interference
|
1. **Handler Invocation**: Handler receives Connection& reference - server retains ownership
|
||||||
1. **Async Processing**: Connection processed on handler thread while epoll cannot trigger spurious events
|
1. **Async Processing**: Handler obtains WeakRef<Connection> for safe background processing
|
||||||
1. **Return & Re-arm**: Internal server method re-arms fd with `epoll_ctl(EPOLL_CTL_MOD)` via `Server::release_back_to_server()`
|
1. **Connection Cleanup**: Server manages connection lifecycle including file descriptor operations
|
||||||
|
|
||||||
**Performance Trade-off:**
|
**Performance Benefits:**
|
||||||
|
|
||||||
- **Cost**: One `epoll_ctl(MOD)` syscall per connection return (~100-200ns)
|
- **Reduced syscalls**: Eliminates epoll_ctl(MOD) calls needed for ownership transfer
|
||||||
- **Benefit**: Eliminates complex thread synchronization and prevents race conditions
|
- **Simplified synchronization**: Connection mutexes provide clear concurrent access patterns
|
||||||
- **Alternative cost**: Manual `EPOLL_CTL_DEL`/`ADD` + locking would be significantly higher
|
- **Memory efficiency**: No unique_ptr overhead for ownership management
|
||||||
|
|
||||||
**Without EPOLLONESHOT risks:**
|
**Safe Async Processing:**
|
||||||
|
|
||||||
- Multiple threads processing same fd simultaneously
|
- WeakRef<Connection> prevents use-after-free in background threads
|
||||||
- Use-after-move when network thread accesses transferred connection
|
- Connection mutex ensures thread-safe access to connection state
|
||||||
- Complex synchronization between epoll events and ownership transfers
|
- Server handles all file descriptor management automatically
|
||||||
|
|
||||||
This design enables the async handler pattern where connections can be safely moved between threads for background processing while maintaining high performance and thread safety.
|
This design provides high performance concurrent processing while maintaining thread safety through clear ownership boundaries and synchronization primitives.
|
||||||
|
|
||||||
### API Endpoints
|
### API Endpoints
|
||||||
|
|
||||||
@@ -317,13 +317,13 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions.
|
|||||||
|
|
||||||
- **Server Creation**: Always use `Server::create()` factory method - direct construction is impossible
|
- **Server Creation**: Always use `Server::create()` factory method - direct construction is impossible
|
||||||
- **Connection Creation**: Only the Server can create connections - no public constructor or factory method
|
- **Connection Creation**: Only the Server can create connections - no public constructor or factory method
|
||||||
- **Connection Ownership**: Use unique_ptr semantics for safe ownership transfer between components
|
- **Connection Ownership**: Server retains ownership, handlers use Connection& references
|
||||||
- **Arena Allocator Pattern**: Always use `Arena` for temporary allocations within request processing
|
- **Arena Allocator Pattern**: Always use `Arena` for temporary allocations within request processing
|
||||||
- **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory
|
- **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory
|
||||||
- **Ownership Transfer**: Use `Server::release_back_to_server()` for returning connections to server from handlers
|
- **Async Processing**: Use `conn.get_weak_ref()` for safe background processing without ownership
|
||||||
- **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition
|
- **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition
|
||||||
- **Base64 Handling**: Always use simdutf for base64 encoding/decoding for performance
|
- **Base64 Handling**: Always use simdutf for base64 encoding/decoding for performance
|
||||||
- **Thread Safety**: Connection ownership transfers are designed to be thread-safe with proper RAII cleanup
|
- **Thread Safety**: Connection mutexes provide safe concurrent access between threads
|
||||||
|
|
||||||
### Project Structure
|
### Project Structure
|
||||||
|
|
||||||
@@ -338,12 +338,12 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions.
|
|||||||
#### Adding New Protocol Handlers
|
#### Adding New Protocol Handlers
|
||||||
|
|
||||||
1. Inherit from `ConnectionHandler` in `src/connection_handler.hpp`
|
1. Inherit from `ConnectionHandler` in `src/connection_handler.hpp`
|
||||||
1. Implement `on_data_arrived()` with proper ownership semantics
|
1. Implement `on_data_arrived()` using Connection& reference parameter
|
||||||
1. Use connection's arena allocator for temporary allocations: `conn->get_arena()`
|
1. Use connection's arena allocator for temporary allocations: `conn.get_arena()`
|
||||||
1. Handle partial messages and streaming protocols appropriately
|
1. Handle partial messages and streaming protocols appropriately
|
||||||
1. Use `Server::release_back_to_server()` if taking ownership for async processing
|
1. Use `conn.get_weak_ref()` for safe async processing without ownership transfer
|
||||||
1. Add corresponding test cases and integration tests
|
1. Add corresponding test cases and integration tests
|
||||||
1. Consider performance implications of ownership transfers
|
1. Consider performance implications of concurrent access patterns
|
||||||
|
|
||||||
#### Adding New Parsers
|
#### Adding New Parsers
|
||||||
|
|
||||||
@@ -395,33 +395,32 @@ Only Server can create connections (using private constructor via friend access)
|
|||||||
```cpp
|
```cpp
|
||||||
class HttpHandler : public ConnectionHandler {
|
class HttpHandler : public ConnectionHandler {
|
||||||
public:
|
public:
|
||||||
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
void on_data_arrived(std::string_view data, Connection& conn) override {
|
||||||
// Parse HTTP request using connection's arena
|
// Parse HTTP request using connection's arena
|
||||||
Arena& arena = conn_ptr->get_arena();
|
Arena& arena = conn.get_arena();
|
||||||
|
|
||||||
// Generate response
|
// Generate response
|
||||||
conn_ptr->append_message("HTTP/1.1 200 OK\r\n\r\nHello World");
|
conn.append_message("HTTP/1.1 200 OK\r\n\r\nHello World");
|
||||||
|
|
||||||
// Server retains ownership
|
// Server retains ownership
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Async Handler with Ownership Transfer
|
#### Async Handler with WeakRef
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
class AsyncHandler : public ConnectionHandler {
|
class AsyncHandler : public ConnectionHandler {
|
||||||
public:
|
public:
|
||||||
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
void on_data_arrived(std::string_view data, Connection& conn) override {
|
||||||
// Take ownership for async processing
|
// Get weak reference for async processing
|
||||||
auto connection = std::move(conn_ptr); // conn_ptr is now null
|
auto weak_conn = conn.get_weak_ref();
|
||||||
|
|
||||||
work_queue.push([connection = std::move(connection)](std::string_view data) mutable {
|
work_queue.push([weak_conn, data = std::string(data)]() {
|
||||||
// Process asynchronously
|
// Process asynchronously - connection may be closed
|
||||||
connection->append_message("Async response");
|
if (auto conn_ref = weak_conn.lock()) {
|
||||||
|
conn_ref->append_message("Async response");
|
||||||
// Return ownership to server when done
|
}
|
||||||
Server::release_back_to_server(std::move(connection));
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -442,21 +441,20 @@ public:
|
|||||||
delete static_cast<MyProtocolData*>(conn.user_data);
|
delete static_cast<MyProtocolData*>(conn.user_data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_data_arrived(std::string_view data,
|
void on_data_arrived(std::string_view data, Connection& conn) override {
|
||||||
std::unique_ptr<Connection> &conn_ptr) override {
|
|
||||||
// Process data and maybe store some results in the user_data
|
// Process data and maybe store some results in the user_data
|
||||||
auto* proto_data = static_cast<MyProtocolData*>(conn_ptr->user_data);
|
auto* proto_data = static_cast<MyProtocolData*>(conn.user_data);
|
||||||
proto_data->process(data);
|
proto_data->process(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_batch_complete(std::span<std::unique_ptr<Connection>> batch) override {
|
void on_batch_complete(std::span<Connection *const> batch) override {
|
||||||
// Process a batch of connections
|
// Process a batch of connections
|
||||||
for (auto& conn_ptr : batch) {
|
for (auto* conn : batch) {
|
||||||
if (conn_ptr) {
|
if (conn) {
|
||||||
auto* proto_data = static_cast<MyProtocolData*>(conn_ptr->user_data);
|
auto* proto_data = static_cast<MyProtocolData*>(conn->user_data);
|
||||||
if (proto_data->is_ready()) {
|
if (proto_data->is_ready()) {
|
||||||
// This connection is ready for the next stage, move it to the pipeline
|
// This connection is ready for the next stage, get weak ref for pipeline
|
||||||
pipeline_.push(std::move(conn_ptr));
|
pipeline_.push(conn->get_weak_ref());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -477,12 +475,12 @@ public:
|
|||||||
conn.append_message("y\n");
|
conn.append_message("y\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_write_progress(std::unique_ptr<Connection> &conn) override {
|
void on_write_progress(Connection &conn) override {
|
||||||
if (conn->outgoing_bytes_queued() == 0) {
|
if (conn.outgoing_bytes_queued() == 0) {
|
||||||
// Don't use an unbounded amount of memory
|
// Don't use an unbounded amount of memory
|
||||||
conn->reset();
|
conn.reset();
|
||||||
// Write "y\n" repeatedly
|
// Write "y\n" repeatedly
|
||||||
conn->append_message("y\n");
|
conn.append_message("y\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -500,19 +498,19 @@ std::string_view process_json_key(const char* data, Arena& arena);
|
|||||||
std::string process_json_key(const char* data);
|
std::string process_json_key(const char* data);
|
||||||
```
|
```
|
||||||
|
|
||||||
#### Safe Connection Ownership Transfer
|
#### Safe Async Connection Processing
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
// In handler - take ownership for background processing
|
// In handler - get weak reference for background processing
|
||||||
Connection* raw_conn = conn_ptr.release();
|
auto weak_conn = conn.get_weak_ref();
|
||||||
|
|
||||||
// Process on worker thread
|
// Process on worker thread
|
||||||
background_processor.submit([raw_conn]() {
|
background_processor.submit([weak_conn]() {
|
||||||
// Do work...
|
// Do work...
|
||||||
raw_conn->append_message("Background result");
|
if (auto conn_ref = weak_conn.lock()) {
|
||||||
|
conn_ref->append_message("Background result");
|
||||||
// Return to server safely (handles server destruction)
|
}
|
||||||
Server::release_back_to_server(std::unique_ptr<Connection>(raw_conn));
|
// Connection automatically cleaned up by server
|
||||||
});
|
});
|
||||||
```
|
```
|
||||||
|
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ I/O Threads (8) → HttpHandler::on_batch_complete() → Commit Pipeline
|
|||||||
| Stage 3: Release (connection return)
|
| Stage 3: Release (connection return)
|
||||||
| (optimized futex wake)
|
| (optimized futex wake)
|
||||||
| ↓
|
| ↓
|
||||||
└─────────────────────── Server::release_back_to_server()
|
└─────────────────────── Connection returned to server pool
|
||||||
```
|
```
|
||||||
|
|
||||||
## Test Configuration
|
## Test Configuration
|
||||||
|
|||||||
11
todo.md
11
todo.md
@@ -4,10 +4,11 @@
|
|||||||
|
|
||||||
### Core Database Features
|
### Core Database Features
|
||||||
|
|
||||||
- [ ] Design commit pipeline architecture with three-stage processing
|
- [ ] Design commit pipeline architecture with four-stage processing
|
||||||
- [ ] Stage 1: Version assignment and precondition validation thread
|
- [ ] Stage 0: Sequence assignment and request validation
|
||||||
- [ ] Stage 2: Transaction persistence and subscriber streaming thread
|
- [ ] Stage 1: Precondition resolution and conflict detection
|
||||||
- [ ] Stage 3: Connection return to server thread
|
- [ ] Stage 2: Transaction persistence and subscriber streaming
|
||||||
|
- [ ] Stage 3: Response generation and connection cleanup
|
||||||
- [ ] Use ThreadPipeline for inter-stage communication
|
- [ ] Use ThreadPipeline for inter-stage communication
|
||||||
- [ ] Design persistence interface for pluggable storage backends (S3, local disk)
|
- [ ] Design persistence interface for pluggable storage backends (S3, local disk)
|
||||||
- [ ] Integrate https://git.weaselab.dev/weaselab/conflict-set for optimistic concurrency control
|
- [ ] Integrate https://git.weaselab.dev/weaselab/conflict-set for optimistic concurrency control
|
||||||
@@ -84,6 +85,6 @@
|
|||||||
- [x] Built streaming JSON parser for commit requests with high-performance parsing
|
- [x] Built streaming JSON parser for commit requests with high-performance parsing
|
||||||
- [x] Implemented HTTP server with multi-threaded networking using multiple epoll instances
|
- [x] Implemented HTTP server with multi-threaded networking using multiple epoll instances
|
||||||
- [x] Created threading model with pipeline for serial request processing for optimistic concurrency control
|
- [x] Created threading model with pipeline for serial request processing for optimistic concurrency control
|
||||||
- [x] Designed connection ownership transfer system to enable the serial processing model
|
- [x] Implemented server-owned connection model with WeakRef pattern for safe concurrent access
|
||||||
- [x] Implemented arena-per-connection memory model for clean memory lifetime management
|
- [x] Implemented arena-per-connection memory model for clean memory lifetime management
|
||||||
- [x] Built TOML configuration system for server settings
|
- [x] Built TOML configuration system for server settings
|
||||||
|
|||||||
Reference in New Issue
Block a user