diff --git a/design.md b/design.md index e48329c..b4897ed 100644 --- a/design.md +++ b/design.md @@ -259,37 +259,37 @@ CommitRequest { > **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written. -#### Threading Model and EPOLLONESHOT +#### Threading Model and Server-Owned Connections -**EPOLLONESHOT Design Rationale:** -WeaselDB uses `EPOLLONESHOT` for all connection file descriptors to enable safe multi-threaded ownership transfer without complex synchronization: +**Server-Owned Connection Design:** +WeaselDB uses a server-owned connection model where the server retains ownership of all connections while providing safe concurrent access to handlers: **Key Benefits:** -1. **Automatic fd disarming** - When epoll triggers an event, the fd is automatically removed from epoll monitoring -1. **Race-free ownership transfer** - Handlers can safely take connection ownership and move to other threads -1. **Zero-coordination async processing** - No manual synchronization needed between network threads and handler threads +1. **Simplified ownership** - Server always owns connections, eliminating complex ownership transfers +1. **Safe concurrent access** - Connection mutexes synchronize access between I/O threads and handlers +1. **WeakRef pattern** - Handlers use WeakRef for safe async processing without ownership **Threading Flow:** -1. **Event Trigger**: Network thread gets epoll event → connection auto-disarmed via ONESHOT -1. **Safe Transfer**: Handler can take ownership (`std::move(conn_ptr)`) with no epoll interference -1. **Async Processing**: Connection processed on handler thread while epoll cannot trigger spurious events -1. **Return & Re-arm**: Internal server method re-arms fd with `epoll_ctl(EPOLL_CTL_MOD)` via `Server::release_back_to_server()` +1. **Event Trigger**: Network thread gets epoll event and processes data +1. **Handler Invocation**: Handler receives Connection& reference - server retains ownership +1. **Async Processing**: Handler obtains WeakRef for safe background processing +1. **Connection Cleanup**: Server manages connection lifecycle including file descriptor operations -**Performance Trade-off:** +**Performance Benefits:** -- **Cost**: One `epoll_ctl(MOD)` syscall per connection return (~100-200ns) -- **Benefit**: Eliminates complex thread synchronization and prevents race conditions -- **Alternative cost**: Manual `EPOLL_CTL_DEL`/`ADD` + locking would be significantly higher +- **Reduced syscalls**: Eliminates epoll_ctl(MOD) calls needed for ownership transfer +- **Simplified synchronization**: Connection mutexes provide clear concurrent access patterns +- **Memory efficiency**: No unique_ptr overhead for ownership management -**Without EPOLLONESHOT risks:** +**Safe Async Processing:** -- Multiple threads processing same fd simultaneously -- Use-after-move when network thread accesses transferred connection -- Complex synchronization between epoll events and ownership transfers +- WeakRef prevents use-after-free in background threads +- Connection mutex ensures thread-safe access to connection state +- Server handles all file descriptor management automatically -This design enables the async handler pattern where connections can be safely moved between threads for background processing while maintaining high performance and thread safety. +This design provides high performance concurrent processing while maintaining thread safety through clear ownership boundaries and synchronization primitives. ### API Endpoints @@ -317,13 +317,13 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions. - **Server Creation**: Always use `Server::create()` factory method - direct construction is impossible - **Connection Creation**: Only the Server can create connections - no public constructor or factory method -- **Connection Ownership**: Use unique_ptr semantics for safe ownership transfer between components +- **Connection Ownership**: Server retains ownership, handlers use Connection& references - **Arena Allocator Pattern**: Always use `Arena` for temporary allocations within request processing - **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory -- **Ownership Transfer**: Use `Server::release_back_to_server()` for returning connections to server from handlers +- **Async Processing**: Use `conn.get_weak_ref()` for safe background processing without ownership - **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition - **Base64 Handling**: Always use simdutf for base64 encoding/decoding for performance -- **Thread Safety**: Connection ownership transfers are designed to be thread-safe with proper RAII cleanup +- **Thread Safety**: Connection mutexes provide safe concurrent access between threads ### Project Structure @@ -338,12 +338,12 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions. #### Adding New Protocol Handlers 1. Inherit from `ConnectionHandler` in `src/connection_handler.hpp` -1. Implement `on_data_arrived()` with proper ownership semantics -1. Use connection's arena allocator for temporary allocations: `conn->get_arena()` +1. Implement `on_data_arrived()` using Connection& reference parameter +1. Use connection's arena allocator for temporary allocations: `conn.get_arena()` 1. Handle partial messages and streaming protocols appropriately -1. Use `Server::release_back_to_server()` if taking ownership for async processing +1. Use `conn.get_weak_ref()` for safe async processing without ownership transfer 1. Add corresponding test cases and integration tests -1. Consider performance implications of ownership transfers +1. Consider performance implications of concurrent access patterns #### Adding New Parsers @@ -395,33 +395,32 @@ Only Server can create connections (using private constructor via friend access) ```cpp class HttpHandler : public ConnectionHandler { public: - void on_data_arrived(std::string_view data, std::unique_ptr& conn_ptr) override { + void on_data_arrived(std::string_view data, Connection& conn) override { // Parse HTTP request using connection's arena - Arena& arena = conn_ptr->get_arena(); + Arena& arena = conn.get_arena(); // Generate response - conn_ptr->append_message("HTTP/1.1 200 OK\r\n\r\nHello World"); + conn.append_message("HTTP/1.1 200 OK\r\n\r\nHello World"); // Server retains ownership } }; ``` -#### Async Handler with Ownership Transfer +#### Async Handler with WeakRef ```cpp class AsyncHandler : public ConnectionHandler { public: - void on_data_arrived(std::string_view data, std::unique_ptr& conn_ptr) override { - // Take ownership for async processing - auto connection = std::move(conn_ptr); // conn_ptr is now null + void on_data_arrived(std::string_view data, Connection& conn) override { + // Get weak reference for async processing + auto weak_conn = conn.get_weak_ref(); - work_queue.push([connection = std::move(connection)](std::string_view data) mutable { - // Process asynchronously - connection->append_message("Async response"); - - // Return ownership to server when done - Server::release_back_to_server(std::move(connection)); + work_queue.push([weak_conn, data = std::string(data)]() { + // Process asynchronously - connection may be closed + if (auto conn_ref = weak_conn.lock()) { + conn_ref->append_message("Async response"); + } }); } }; @@ -442,21 +441,20 @@ public: delete static_cast(conn.user_data); } - void on_data_arrived(std::string_view data, - std::unique_ptr &conn_ptr) override { + void on_data_arrived(std::string_view data, Connection& conn) override { // Process data and maybe store some results in the user_data - auto* proto_data = static_cast(conn_ptr->user_data); + auto* proto_data = static_cast(conn.user_data); proto_data->process(data); } - void on_batch_complete(std::span> batch) override { + void on_batch_complete(std::span batch) override { // Process a batch of connections - for (auto& conn_ptr : batch) { - if (conn_ptr) { - auto* proto_data = static_cast(conn_ptr->user_data); + for (auto* conn : batch) { + if (conn) { + auto* proto_data = static_cast(conn->user_data); if (proto_data->is_ready()) { - // This connection is ready for the next stage, move it to the pipeline - pipeline_.push(std::move(conn_ptr)); + // This connection is ready for the next stage, get weak ref for pipeline + pipeline_.push(conn->get_weak_ref()); } } } @@ -477,12 +475,12 @@ public: conn.append_message("y\n"); } - void on_write_progress(std::unique_ptr &conn) override { - if (conn->outgoing_bytes_queued() == 0) { + void on_write_progress(Connection &conn) override { + if (conn.outgoing_bytes_queued() == 0) { // Don't use an unbounded amount of memory - conn->reset(); + conn.reset(); // Write "y\n" repeatedly - conn->append_message("y\n"); + conn.append_message("y\n"); } } }; @@ -500,19 +498,19 @@ std::string_view process_json_key(const char* data, Arena& arena); std::string process_json_key(const char* data); ``` -#### Safe Connection Ownership Transfer +#### Safe Async Connection Processing ```cpp -// In handler - take ownership for background processing -Connection* raw_conn = conn_ptr.release(); +// In handler - get weak reference for background processing +auto weak_conn = conn.get_weak_ref(); // Process on worker thread -background_processor.submit([raw_conn]() { +background_processor.submit([weak_conn]() { // Do work... - raw_conn->append_message("Background result"); - - // Return to server safely (handles server destruction) - Server::release_back_to_server(std::unique_ptr(raw_conn)); + if (auto conn_ref = weak_conn.lock()) { + conn_ref->append_message("Background result"); + } + // Connection automatically cleaned up by server }); ``` diff --git a/threading_performance_report.md b/threading_performance_report.md index 8c715fe..ce52578 100644 --- a/threading_performance_report.md +++ b/threading_performance_report.md @@ -69,7 +69,7 @@ I/O Threads (8) → HttpHandler::on_batch_complete() → Commit Pipeline | Stage 3: Release (connection return) | (optimized futex wake) | ↓ - └─────────────────────── Server::release_back_to_server() + └─────────────────────── Connection returned to server pool ``` ## Test Configuration diff --git a/todo.md b/todo.md index f5a6632..ac04186 100644 --- a/todo.md +++ b/todo.md @@ -4,10 +4,11 @@ ### Core Database Features -- [ ] Design commit pipeline architecture with three-stage processing - - [ ] Stage 1: Version assignment and precondition validation thread - - [ ] Stage 2: Transaction persistence and subscriber streaming thread - - [ ] Stage 3: Connection return to server thread +- [ ] Design commit pipeline architecture with four-stage processing + - [ ] Stage 0: Sequence assignment and request validation + - [ ] Stage 1: Precondition resolution and conflict detection + - [ ] Stage 2: Transaction persistence and subscriber streaming + - [ ] Stage 3: Response generation and connection cleanup - [ ] Use ThreadPipeline for inter-stage communication - [ ] Design persistence interface for pluggable storage backends (S3, local disk) - [ ] Integrate https://git.weaselab.dev/weaselab/conflict-set for optimistic concurrency control @@ -84,6 +85,6 @@ - [x] Built streaming JSON parser for commit requests with high-performance parsing - [x] Implemented HTTP server with multi-threaded networking using multiple epoll instances - [x] Created threading model with pipeline for serial request processing for optimistic concurrency control -- [x] Designed connection ownership transfer system to enable the serial processing model +- [x] Implemented server-owned connection model with WeakRef pattern for safe concurrent access - [x] Implemented arena-per-connection memory model for clean memory lifetime management - [x] Built TOML configuration system for server settings