From 130ff2062a88e50064aac771778c6323c166694d Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 20 Aug 2025 16:50:54 -0400 Subject: [PATCH] Unify accept and network threads into io threads --- config.md | 16 +-- config.toml | 6 +- design.md | 8 +- src/config.cpp | 19 +-- src/config.hpp | 8 +- src/connection.hpp | 13 ++- src/main.cpp | 4 +- src/server.cpp | 282 ++++++++++++++++++++------------------------- src/server.hpp | 11 +- test_config.toml | 6 +- 10 files changed, 157 insertions(+), 216 deletions(-) diff --git a/config.md b/config.md index b67f293..87e4db7 100644 --- a/config.md +++ b/config.md @@ -21,10 +21,9 @@ Controls server networking, threading, and request handling behavior. | `bind_address` | string | `"127.0.0.1"` | IP address to bind the server to | | `port` | integer | `8080` | Port number to listen on | | `max_request_size_bytes` | integer | `1048576` (1MB) | Maximum size for incoming requests. Requests exceeding this limit receive a `413 Content Too Large` response | -| `accept_threads` | integer | `1` | Number of dedicated threads for accepting incoming connections | -| `network_threads` | integer | `1` | Number of threads for epoll-based network I/O processing | +| `io_threads` | integer | `1` | Number of I/O threads for handling connections and network events | | `event_batch_size` | integer | `32` | Number of events to process in each epoll batch | -| `max_connections` | integer | `1000` | Maximum number of concurrent connections (0 = unlimited) | +| `max_connections` | integer | `50000` | Maximum number of concurrent connections (0 = unlimited). Note: Due to race conditions between connection acceptance and cleanup, it's possible to trip this limit without actually having that many concurrent connections, especially under high connection churn. | ### Commit Configuration (`[commit]`) @@ -54,10 +53,9 @@ Controls behavior of the `/v1/subscribe` endpoint and SSE streaming. bind_address = "0.0.0.0" port = 8080 max_request_size_bytes = 2097152 # 2MB -accept_threads = 2 -network_threads = 8 +io_threads = 8 event_batch_size = 64 -max_connections = 10000 +max_connections = 50000 [commit] min_request_id_length = 32 @@ -84,8 +82,7 @@ WeaselDB uses the `toml11` library for configuration parsing with robust error h These configuration parameters directly affect server and API behavior: **Server Performance:** -- **`accept_threads`**: Controls parallelism for accepting new connections. More threads can handle higher connection rates -- **`network_threads`**: Controls I/O processing parallelism. Should typically match CPU core count for optimal performance +- **`io_threads`**: Controls parallelism for both accepting new connections and I/O processing. Should typically match CPU core count for optimal performance - **`event_batch_size`**: Larger batches reduce syscall overhead but may increase latency under light load - **`max_connections`**: Prevents resource exhaustion by limiting concurrent connections @@ -107,8 +104,7 @@ The configuration system includes comprehensive validation with specific bounds ### Server Configuration Limits - **`port`**: Must be between 1 and 65535 - **`max_request_size_bytes`**: Must be > 0 and ≤ 100MB -- **`accept_threads`**: Must be between 1 and 100 -- **`network_threads`**: Must be between 1 and 1000 +- **`io_threads`**: Must be between 1 and 1000 - **`event_batch_size`**: Must be between 1 and 10000 - **`max_connections`**: Must be between 0 and 100000 (0 = unlimited) diff --git a/config.toml b/config.toml index 6556a33..0f4148e 100644 --- a/config.toml +++ b/config.toml @@ -5,10 +5,8 @@ bind_address = "127.0.0.1" port = 8080 # Maximum request size in bytes (for 413 Content Too Large responses) max_request_size_bytes = 1048576 # 1MB -# Number of accept threads for handling incoming connections -accept_threads = 1 -# Number of network I/O threads for epoll processing -network_threads = 1 +# Number of I/O threads for handling connections and network events +io_threads = 1 # Event batch size for epoll processing event_batch_size = 32 diff --git a/design.md b/design.md index 4b8bdac..4046888 100644 --- a/design.md +++ b/design.md @@ -19,7 +19,7 @@ WeaselDB is a high-performance write-side database component designed for system - **Ultra-fast arena allocation** (~1ns vs ~20-270ns for malloc) - **High-performance JSON parsing** with streaming support and SIMD optimization -- **Multi-threaded networking** using epoll with thread pools +- **Multi-threaded networking** using epoll with unified I/O thread pool - **Zero-copy design** throughout the pipeline - **Factory pattern safety** ensuring correct object lifecycle management @@ -90,13 +90,13 @@ Ultra-fast memory allocator optimized for request/response patterns: #### **Networking Layer** **Server** (`src/server.{hpp,cpp}`): -- **High-performance multi-threaded networking** using epoll with thread pools +- **High-performance multi-threaded networking** using epoll with unified I/O thread pool - **Factory pattern construction** via `Server::create()` ensures proper shared_ptr semantics - **Safe shutdown mechanism** with async-signal-safe shutdown() method - **Connection ownership management** with automatic cleanup on server destruction - **Pluggable protocol handlers** via ConnectionHandler interface -- **Multi-threaded architecture:** separate accept and network thread pools -- **EPOLL_EXCLUSIVE** load balancing across accept threads +- **Unified I/O architecture:** single thread pool handles both connection acceptance and I/O processing +- **EPOLL_EXCLUSIVE** on listen socket prevents thundering herd across I/O threads **Connection** (`src/connection.{hpp,cpp}`): - **Efficient per-connection state management** with arena-based memory allocation diff --git a/src/config.cpp b/src/config.cpp index 339e195..ac9612d 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -83,8 +83,7 @@ void ConfigParser::parse_server_config(const auto &toml_data, parse_field(srv, "port", config.port); parse_field(srv, "unix_socket_path", config.unix_socket_path); parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes); - parse_field(srv, "accept_threads", config.accept_threads); - parse_field(srv, "network_threads", config.network_threads); + parse_field(srv, "io_threads", config.io_threads); parse_field(srv, "event_batch_size", config.event_batch_size); parse_field(srv, "max_connections", config.max_connections); parse_field(srv, "read_buffer_size", config.read_buffer_size); @@ -149,18 +148,10 @@ bool ConfigParser::validate_config(const Config &config) { valid = false; } - if (config.server.accept_threads < 1 || config.server.accept_threads > 100) { - std::cerr << "Configuration error: server.accept_threads must be between 1 " - "and 100, got " - << config.server.accept_threads << std::endl; - valid = false; - } - - if (config.server.network_threads < 1 || - config.server.network_threads > 1000) { - std::cerr << "Configuration error: server.network_threads must be between " - "1 and 1000, got " - << config.server.network_threads << std::endl; + if (config.server.io_threads < 1 || config.server.io_threads > 1000) { + std::cerr << "Configuration error: server.io_threads must be between 1 " + "and 1000, got " + << config.server.io_threads << std::endl; valid = false; } diff --git a/src/config.hpp b/src/config.hpp index 59b9c42..44c9386 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -18,14 +18,12 @@ struct ServerConfig { std::string unix_socket_path; /// Maximum size in bytes for incoming HTTP requests (default: 1MB) size_t max_request_size_bytes = 1024 * 1024; - /// Number of accept threads for handling incoming connections - int accept_threads = 1; - /// Number of network I/O threads for epoll processing - int network_threads = 1; + /// Number of I/O threads for handling connections and network events + int io_threads = 1; /// Event batch size for epoll processing int event_batch_size = 32; /// Maximum number of concurrent connections (0 = unlimited) - int max_connections = 1000; + int max_connections = 50000; /// Buffer size for reading from socket connections (default: 16KB) size_t read_buffer_size = 16 * 1024; }; diff --git a/src/connection.hpp b/src/connection.hpp index 59ba54c..fec1fa6 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -21,12 +21,13 @@ extern std::atomic activeConnections; * Represents a single client connection with efficient memory management. * * Connection ownership model: - * - Created by accept thread, transferred to epoll via raw pointer - * - Network threads claim ownership by wrapping raw pointer in unique_ptr - * - Network thread optionally passes ownership to a thread pipeline + * - Created by I/O thread, processed immediately, then transferred to epoll via + * raw pointer + * - I/O threads claim ownership by wrapping raw pointer in unique_ptr + * - I/O thread optionally passes ownership to a thread pipeline * - Owner eventually transfers back to epoll by releasing unique_ptr to raw * pointer - * - RAII cleanup happens if network thread doesn't transfer back + * - RAII cleanup happens if I/O thread doesn't transfer back * * Arena allocator thread safety: * Each Connection contains its own ArenaAllocator instance that is accessed @@ -68,7 +69,7 @@ struct Connection { * @brief Queue a message to be sent to the client. * * Adds data to the connection's outgoing message queue. The data will be sent - * asynchronously by the server's network threads using efficient vectored + * asynchronously by the server's I/O threads using efficient vectored * I/O. * * @param s The data to send (string view for zero-copy efficiency) @@ -342,7 +343,7 @@ private: * @return std::unique_ptr to the newly created connection * * @note This method is only accessible to the Server class and should be used - * exclusively by accept threads when new connections arrive. + * exclusively by I/O threads when new connections arrive. */ static std::unique_ptr createForServer(struct sockaddr_storage addr, int fd, int64_t id, diff --git a/src/main.cpp b/src/main.cpp index 4590542..5cd45b8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -100,9 +100,7 @@ int main(int argc, char *argv[]) { } std::cout << "Max request size: " << config->server.max_request_size_bytes << " bytes" << std::endl; - std::cout << "Accept threads: " << config->server.accept_threads << std::endl; - std::cout << "Network threads: " << config->server.network_threads - << std::endl; + std::cout << "I/O threads: " << config->server.io_threads << std::endl; std::cout << "Event batch size: " << config->server.event_batch_size << std::endl; std::cout << "Max connections: " << config->server.max_connections diff --git a/src/server.cpp b/src/server.cpp index 637a0cb..5bfb0a0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -33,48 +33,34 @@ void Server::run() { listen_sockfd_ = create_listen_socket(); - // Create epoll instances - network_epollfd_ = epoll_create1(EPOLL_CLOEXEC); - if (network_epollfd_ == -1) { - perror("epoll_create network"); - throw std::runtime_error("Failed to create network epoll instance"); + // Create single epoll instance + epollfd_ = epoll_create1(EPOLL_CLOEXEC); + if (epollfd_ == -1) { + perror("epoll_create"); + throw std::runtime_error("Failed to create epoll instance"); } - accept_epollfd_ = epoll_create1(EPOLL_CLOEXEC); - if (accept_epollfd_ == -1) { - perror("epoll_create accept"); - throw std::runtime_error("Failed to create accept epoll instance"); - } - - // Add shutdown pipe to both epoll instances + // Add shutdown pipe to epoll instance struct epoll_event shutdown_event; shutdown_event.events = EPOLLIN; shutdown_event.data.fd = shutdown_pipe_[0]; - if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], - &shutdown_event) == -1) { - perror("epoll_ctl add shutdown to network"); - throw std::runtime_error("Failed to add shutdown pipe to network epoll"); + if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == + -1) { + perror("epoll_ctl shutdown pipe"); + throw std::runtime_error("Failed to add shutdown pipe to epoll"); } - if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], - &shutdown_event) == -1) { - perror("epoll_ctl add shutdown to accept"); - throw std::runtime_error("Failed to add shutdown pipe to accept epoll"); - } - - // Add listen socket to accept epoll + // Add listen socket to epoll with EPOLLEXCLUSIVE to prevent thundering herd struct epoll_event listen_event; listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; listen_event.data.fd = listen_sockfd_; - if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, listen_sockfd_, - &listen_event) == -1) { - perror("epoll_ctl add listen socket"); - throw std::runtime_error("Failed to add listen socket to accept epoll"); + if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) { + perror("epoll_ctl listen socket"); + throw std::runtime_error("Failed to add listen socket to epoll"); } - start_network_threads(); - start_accept_threads(); + start_io_threads(); // Wait for all threads to complete for (auto &thread : threads_) { @@ -124,8 +110,7 @@ void Server::receiveConnectionBack(Connection *connection) { connection->tsan_release(); event.data.ptr = connection; - if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == - -1) { + if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) { perror("epoll_ctl ADD in receiveConnectionBack"); delete connection; // Clean up on failure } @@ -278,20 +263,23 @@ int Server::create_listen_socket() { return sfd; } -void Server::start_network_threads() { - int network_threads = config_.server.network_threads; +void Server::start_io_threads() { + int io_threads = config_.server.io_threads; - for (int thread_id = 0; thread_id < network_threads; ++thread_id) { + for (int thread_id = 0; thread_id < io_threads; ++thread_id) { threads_.emplace_back([this, thread_id]() { pthread_setname_np(pthread_self(), - ("network-" + std::to_string(thread_id)).c_str()); + ("io-" + std::to_string(thread_id)).c_str()); struct epoll_event events[config_.server.event_batch_size]; std::unique_ptr batch[config_.server.event_batch_size]; + bool batch_is_new[config_.server.event_batch_size]; // Track if connection + // is new (ADD) or + // existing (MOD) for (;;) { - int event_count = epoll_wait(network_epollfd_, events, - config_.server.event_batch_size, -1); + int event_count = + epoll_wait(epollfd_, events, config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { continue; @@ -307,85 +295,9 @@ void Server::start_network_threads() { return; } - // Take ownership from epoll: raw pointer -> unique_ptr - std::unique_ptr conn{ - static_cast(events[i].data.ptr)}; - conn->tsan_acquire(); - events[i].data.ptr = nullptr; - - if (events[i].events & EPOLLERR) { - continue; // Connection closed - unique_ptr destructor cleans up - } - - // Process I/O using shared helper function - bool should_continue = process_connection_io(conn, events[i].events); - if (!should_continue || !conn) { - continue; // Connection closed or handler took ownership - } - batch[batch_count++] = std::move(conn); - } - - handler_.on_post_batch({batch, (size_t)batch_count}); - - for (int i = 0; i < batch_count; ++i) { - auto &conn = batch[i]; - if (!conn) { - continue; - } - // Determine next epoll interest - struct epoll_event event{}; - if (!conn->hasMessages()) { - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } - - // Transfer ownership back to epoll - conn->tsan_release(); - Connection *raw_conn = conn.release(); - event.data.ptr = raw_conn; - - if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, raw_conn->getFd(), - &event) == -1) { - perror("epoll_ctl MOD"); - delete raw_conn; - continue; - } - } - } - }); - } -} - -void Server::start_accept_threads() { - int accept_threads = config_.server.accept_threads; - - for (int thread_id = 0; thread_id < accept_threads; ++thread_id) { - threads_.emplace_back([this, thread_id]() { - pthread_setname_np(pthread_self(), - ("accept-" + std::to_string(thread_id)).c_str()); - - for (;;) { - struct epoll_event events[2]; // listen socket + shutdown pipe - int ready = epoll_wait(accept_epollfd_, events, 2, -1); - - if (ready == -1) { - if (errno == EINTR) - continue; - perror("epoll_wait accept"); - abort(); - } - - for (int i = 0; i < ready; ++i) { - if (events[i].data.fd == shutdown_pipe_[0]) { - return; // Shutdown signal - } - + // Handle listen socket events (new connections) if (events[i].data.fd == listen_sockfd_) { // Accept new connections and batch them - std::unique_ptr batch[config_.server.event_batch_size]; - int batch_count = 0; - for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -421,8 +333,7 @@ void Server::start_accept_threads() { connection_id_.fetch_add(1, std::memory_order_relaxed), &handler_, weak_from_this()); - // Try to process I/O once in the accept thread before handing off - // to network threads + // Try to process I/O once immediately for better latency bool should_continue = process_connection_io(conn, EPOLLIN); if (!should_continue) { @@ -432,21 +343,17 @@ void Server::start_accept_threads() { // Add to batch if we still have the connection if (conn) { - batch[batch_count++] = std::move(conn); - - // If batch is full, process it and start a new batch + // If batch is full, process it first if (batch_count >= config_.server.event_batch_size) { handler_.on_post_batch({batch, (size_t)batch_count}); - // Transfer all batched connections to network epoll + // Re-add all batched connections to epoll for (int j = 0; j < batch_count; ++j) { auto &batched_conn = batch[j]; if (!batched_conn) continue; struct epoll_event event{}; - // Determine next epoll interest based on whether we have - // messages to send if (!batched_conn->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { @@ -458,49 +365,110 @@ void Server::start_accept_threads() { Connection *raw_conn = batched_conn.release(); event.data.ptr = raw_conn; - if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, - &event) == -1) { - perror("epoll_ctl ADD"); + int epoll_op = + batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { + perror(batch_is_new[j] ? "epoll_ctl ADD" + : "epoll_ctl MOD"); delete raw_conn; - continue; } } batch_count = 0; // Reset batch } + + // Add current connection to batch + batch[batch_count] = std::move(conn); + batch_is_new[batch_count] = + true; // New connection, needs EPOLL_CTL_ADD + batch_count++; } } + continue; + } - // Process any remaining connections in the batch - if (batch_count > 0) { - handler_.on_post_batch({batch, (size_t)batch_count}); + // Handle existing connection events + std::unique_ptr conn{ + static_cast(events[i].data.ptr)}; + conn->tsan_acquire(); + events[i].data.ptr = nullptr; - // Transfer remaining batched connections to network epoll - for (int j = 0; j < batch_count; ++j) { - auto &batched_conn = batch[j]; - if (!batched_conn) - continue; + if (events[i].events & EPOLLERR) { + continue; // Connection closed - unique_ptr destructor cleans up + } - struct epoll_event event{}; - // Determine next epoll interest based on whether we have - // messages to send - if (!batched_conn->hasMessages()) { - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } + // Process I/O using shared helper function + bool should_continue = process_connection_io(conn, events[i].events); + if (!should_continue || !conn) { + continue; // Connection closed or handler took ownership + } - int fd = batched_conn->getFd(); - batched_conn->tsan_release(); - Connection *raw_conn = batched_conn.release(); - event.data.ptr = raw_conn; + // If batch is full, process it first + if (batch_count >= config_.server.event_batch_size) { + handler_.on_post_batch({batch, (size_t)batch_count}); - if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == - -1) { - perror("epoll_ctl ADD"); - delete raw_conn; - continue; - } + // Re-add all batched connections to epoll + for (int j = 0; j < batch_count; ++j) { + auto &batched_conn = batch[j]; + if (!batched_conn) + continue; + + struct epoll_event event{}; + if (!batched_conn->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; } + + int fd = batched_conn->getFd(); + batched_conn->tsan_release(); + Connection *raw_conn = batched_conn.release(); + event.data.ptr = raw_conn; + + int epoll_op = batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { + perror(batch_is_new[j] ? "epoll_ctl ADD" : "epoll_ctl MOD"); + delete raw_conn; + } + } + batch_count = 0; // Reset batch + } + + // Add current connection to batch + batch[batch_count] = std::move(conn); + batch_is_new[batch_count] = + false; // Existing connection, needs EPOLL_CTL_MOD + batch_count++; + } + + // Process batch if we have any connections + if (batch_count > 0) { + handler_.on_post_batch({batch, (size_t)batch_count}); + + for (int i = 0; i < batch_count; ++i) { + auto &conn = batch[i]; + if (!conn) { + continue; + } + // Determine next epoll interest + struct epoll_event event{}; + if (!conn->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + // Transfer ownership back to epoll + int fd = conn->getFd(); + conn->tsan_release(); + Connection *raw_conn = conn.release(); + event.data.ptr = raw_conn; + + // Use ADD for new connections, MOD for existing connections + int epoll_op = batch_is_new[i] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { + perror(batch_is_new[i] ? "epoll_ctl ADD" : "epoll_ctl MOD"); + delete raw_conn; + continue; } } } @@ -571,13 +539,9 @@ void Server::cleanup_resources() { close(shutdown_pipe_[1]); shutdown_pipe_[1] = -1; } - if (network_epollfd_ != -1) { - close(network_epollfd_); - network_epollfd_ = -1; - } - if (accept_epollfd_ != -1) { - close(accept_epollfd_); - accept_epollfd_ = -1; + if (epollfd_ != -1) { + close(epollfd_); + epollfd_ = -1; } if (listen_sockfd_ != -1) { close(listen_sockfd_); diff --git a/src/server.hpp b/src/server.hpp index 600aeaf..4fdb615 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -105,21 +105,18 @@ private: // Shutdown coordination int shutdown_pipe_[2] = {-1, -1}; - // Epoll file descriptors - int network_epollfd_ = -1; - int accept_epollfd_ = -1; + // Epoll file descriptor + int epollfd_ = -1; int listen_sockfd_ = -1; // Private helper methods void setup_shutdown_pipe(); void setup_signal_handling(); int create_listen_socket(); - void start_network_threads(); - void start_accept_threads(); + void start_io_threads(); void cleanup_resources(); - // Helper for processing connection I/O (shared between accept and network - // threads) + // Helper for processing connection I/O bool process_connection_io(std::unique_ptr &conn, int events); /** diff --git a/test_config.toml b/test_config.toml index 196063a..3ca32d9 100644 --- a/test_config.toml +++ b/test_config.toml @@ -6,10 +6,8 @@ bind_address = "127.0.0.1" port = 8080 # Maximum request size in bytes (for 413 Content Too Large responses) max_request_size_bytes = 1048576 # 1MB -# Number of accept threads for handling incoming connections -accept_threads = 4 -# Number of network I/O threads for epoll processing -network_threads = 4 +# Number of I/O threads for handling connections and network events +io_threads = 8 # Event batch size for epoll processing event_batch_size = 32