From 1cce8d995074a5aaa31049775445f6dee71c8ec8 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Thu, 21 Aug 2025 14:13:11 -0400 Subject: [PATCH] Allow multiple epoll instances on server --- src/config.cpp | 16 ++++++++ src/config.hpp | 3 ++ src/server.cpp | 104 ++++++++++++++++++++++++++++++----------------- src/server.hpp | 14 +++++-- test_config.toml | 1 + 5 files changed, 98 insertions(+), 40 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index ac9612d..328aa12 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -84,9 +84,15 @@ void ConfigParser::parse_server_config(const auto &toml_data, parse_field(srv, "unix_socket_path", config.unix_socket_path); parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes); parse_field(srv, "io_threads", config.io_threads); + parse_field(srv, "epoll_instances", config.epoll_instances); parse_field(srv, "event_batch_size", config.event_batch_size); parse_field(srv, "max_connections", config.max_connections); parse_field(srv, "read_buffer_size", config.read_buffer_size); + + // Clamp epoll_instances to not exceed io_threads + if (config.epoll_instances > config.io_threads) { + config.epoll_instances = config.io_threads; + } }); } @@ -155,6 +161,16 @@ bool ConfigParser::validate_config(const Config &config) { valid = false; } + if (config.server.epoll_instances < 1 || + config.server.epoll_instances > config.server.io_threads) { + std::cerr + << "Configuration error: server.epoll_instances must be between 1 " + "and io_threads (" + << config.server.io_threads << "), got " + << config.server.epoll_instances << std::endl; + valid = false; + } + if (config.server.event_batch_size < 1 || config.server.event_batch_size > 10000) { std::cerr << "Configuration error: server.event_batch_size must be between " diff --git a/src/config.hpp b/src/config.hpp index 44c9386..3802060 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -20,6 +20,9 @@ struct ServerConfig { size_t max_request_size_bytes = 1024 * 1024; /// Number of I/O threads for handling connections and network events int io_threads = 1; + /// Number of epoll instances to reduce epoll_ctl contention (default: 2, max: + /// io_threads) + int epoll_instances = 2; /// Event batch size for epoll processing int event_batch_size = 32; /// Maximum number of concurrent connections (0 = unlimited) diff --git a/src/server.cpp b/src/server.cpp index a36239c..2f4109f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -13,6 +13,7 @@ #include #include #include +#include extern std::atomic activeConnections; @@ -33,32 +34,7 @@ void Server::run() { listen_sockfd_ = create_listen_socket(); - // Create single epoll instance - epollfd_ = epoll_create1(EPOLL_CLOEXEC); - if (epollfd_ == -1) { - perror("epoll_create"); - throw std::runtime_error("Failed to create epoll instance"); - } - - // Add shutdown pipe to epoll instance - struct epoll_event shutdown_event; - shutdown_event.events = EPOLLIN; - shutdown_event.data.ptr = const_cast(&shutdown_pipe_tag); - - if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == - -1) { - perror("epoll_ctl shutdown pipe"); - throw std::runtime_error("Failed to add shutdown pipe to epoll"); - } - - // Add listen socket to epoll with EPOLLEXCLUSIVE to prevent thundering herd - struct epoll_event listen_event; - listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; - listen_event.data.ptr = const_cast(&listen_socket_tag); - if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) { - perror("epoll_ctl listen socket"); - throw std::runtime_error("Failed to add listen socket to epoll"); - } + create_epoll_instances(); start_io_threads(); @@ -110,7 +86,13 @@ void Server::receiveConnectionBack(Connection *connection) { connection->tsan_release(); event.data.ptr = connection; - if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) { + // Distribute connections round-robin across epoll instances + size_t epoll_index = + connection_distribution_counter_.fetch_add(1, std::memory_order_relaxed) % + epoll_fds_.size(); + int epollfd = epoll_fds_[epoll_index]; + + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) { perror("epoll_ctl ADD in receiveConnectionBack"); delete connection; // Clean up on failure } @@ -263,6 +245,46 @@ int Server::create_listen_socket() { return sfd; } +void Server::create_epoll_instances() { + // Create multiple epoll instances to reduce contention + epoll_fds_.resize(config_.server.epoll_instances); + + for (int i = 0; i < config_.server.epoll_instances; ++i) { + epoll_fds_[i] = epoll_create1(EPOLL_CLOEXEC); + if (epoll_fds_[i] == -1) { + perror("epoll_create"); + throw std::runtime_error("Failed to create epoll instance"); + } + + // Add shutdown pipe to each epoll instance + struct epoll_event shutdown_event; + shutdown_event.events = EPOLLIN; + shutdown_event.data.ptr = const_cast(&shutdown_pipe_tag); + + if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, shutdown_pipe_[0], + &shutdown_event) == -1) { + perror("epoll_ctl shutdown pipe"); + throw std::runtime_error("Failed to add shutdown pipe to epoll"); + } + + // Add listen socket to each epoll instance with EPOLLEXCLUSIVE to prevent + // thundering herd + struct epoll_event listen_event; + listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; + listen_event.data.ptr = const_cast(&listen_socket_tag); + if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, listen_sockfd_, + &listen_event) == -1) { + perror("epoll_ctl listen socket"); + throw std::runtime_error("Failed to add listen socket to epoll"); + } + } +} + +int Server::get_epoll_for_thread(int thread_id) const { + // Round-robin assignment of threads to epoll instances + return epoll_fds_[thread_id % epoll_fds_.size()]; +} + void Server::start_io_threads() { int io_threads = config_.server.io_threads; @@ -271,13 +293,16 @@ void Server::start_io_threads() { pthread_setname_np(pthread_self(), ("io-" + std::to_string(thread_id)).c_str()); + // Each thread uses its assigned epoll instance (round-robin) + int epollfd = get_epoll_for_thread(thread_id); + struct epoll_event events[config_.server.event_batch_size]; std::unique_ptr batch[config_.server.event_batch_size]; int batch_events[config_.server.event_batch_size]; for (;;) { int event_count = - epoll_wait(epollfd_, events, config_.server.event_batch_size, -1); + epoll_wait(epollfd, events, config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { continue; @@ -317,7 +342,7 @@ void Server::start_io_threads() { // Process existing connections in batch if (batch_count > 0) { - process_connection_batch({batch, (size_t)batch_count}, + process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, false); } @@ -364,7 +389,7 @@ void Server::start_io_threads() { // Process batch if full if (batch_count == config_.server.event_batch_size) { - process_connection_batch({batch, (size_t)batch_count}, + process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, true); batch_count = 0; @@ -373,7 +398,7 @@ void Server::start_io_threads() { // Process remaining accepted connections if (batch_count > 0) { - process_connection_batch({batch, (size_t)batch_count}, + process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, true); batch_count = 0; } @@ -439,8 +464,8 @@ void Server::process_connection_io(std::unique_ptr &conn, } void Server::process_connection_batch( - std::span> batch, std::span events, - bool is_new) { + int epollfd, std::span> batch, + std::span events, bool is_new) { // First process I/O for each connection for (size_t i = 0; i < batch.size(); ++i) { if (batch[i]) { @@ -467,7 +492,7 @@ void Server::process_connection_batch( event.data.ptr = raw_conn; int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { + if (epoll_ctl(epollfd, epoll_op, fd, &event) == -1) { perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD"); delete raw_conn; } @@ -484,10 +509,15 @@ void Server::cleanup_resources() { close(shutdown_pipe_[1]); shutdown_pipe_[1] = -1; } - if (epollfd_ != -1) { - close(epollfd_); - epollfd_ = -1; + + // Close all epoll instances + for (int epollfd : epoll_fds_) { + if (epollfd != -1) { + close(epollfd); + } } + epoll_fds_.clear(); + if (listen_sockfd_ != -1) { close(listen_sockfd_); listen_sockfd_ = -1; diff --git a/src/server.hpp b/src/server.hpp index 32ed013..9288421 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -103,11 +103,14 @@ private: std::vector threads_; std::atomic connection_id_{0}; + // Round-robin counter for connection distribution + std::atomic connection_distribution_counter_{0}; + // Shutdown coordination int shutdown_pipe_[2] = {-1, -1}; - // Epoll file descriptor - int epollfd_ = -1; + // Multiple epoll file descriptors to reduce contention + std::vector epoll_fds_; int listen_sockfd_ = -1; // Unique tags for special events to avoid type confusion in epoll data union @@ -118,14 +121,19 @@ private: void setup_shutdown_pipe(); void setup_signal_handling(); int create_listen_socket(); + void create_epoll_instances(); void start_io_threads(); void cleanup_resources(); + // Helper to get epoll fd for a thread using round-robin + int get_epoll_for_thread(int thread_id) const; + // Helper for processing connection I/O void process_connection_io(std::unique_ptr &conn, int events); // Helper for processing a batch of connections with their events - void process_connection_batch(std::span> batch, + void process_connection_batch(int epollfd, + std::span> batch, std::span events, bool is_new); /** diff --git a/test_config.toml b/test_config.toml index 9d17612..7f3d214 100644 --- a/test_config.toml +++ b/test_config.toml @@ -10,6 +10,7 @@ max_request_size_bytes = 1048576 # 1MB io_threads = 6 # Event batch size for epoll processing event_batch_size = 32 +epoll_instances = 2 [commit] # Minimum length for request_id to ensure sufficient entropy