Allow multiple epoll instances on server

This commit is contained in:
2025-08-21 14:13:11 -04:00
parent 22e638e1f9
commit 1cce8d9950
5 changed files with 98 additions and 40 deletions

View File

@@ -13,6 +13,7 @@
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
extern std::atomic<int> activeConnections;
@@ -33,32 +34,7 @@ void Server::run() {
listen_sockfd_ = create_listen_socket();
// Create single epoll instance
epollfd_ = epoll_create1(EPOLL_CLOEXEC);
if (epollfd_ == -1) {
perror("epoll_create");
throw std::runtime_error("Failed to create epoll instance");
}
// Add shutdown pipe to epoll instance
struct epoll_event shutdown_event;
shutdown_event.events = EPOLLIN;
shutdown_event.data.ptr = const_cast<char *>(&shutdown_pipe_tag);
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) ==
-1) {
perror("epoll_ctl shutdown pipe");
throw std::runtime_error("Failed to add shutdown pipe to epoll");
}
// Add listen socket to epoll with EPOLLEXCLUSIVE to prevent thundering herd
struct epoll_event listen_event;
listen_event.events = EPOLLIN | EPOLLEXCLUSIVE;
listen_event.data.ptr = const_cast<char *>(&listen_socket_tag);
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) {
perror("epoll_ctl listen socket");
throw std::runtime_error("Failed to add listen socket to epoll");
}
create_epoll_instances();
start_io_threads();
@@ -110,7 +86,13 @@ void Server::receiveConnectionBack(Connection *connection) {
connection->tsan_release();
event.data.ptr = connection;
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) {
// Distribute connections round-robin across epoll instances
size_t epoll_index =
connection_distribution_counter_.fetch_add(1, std::memory_order_relaxed) %
epoll_fds_.size();
int epollfd = epoll_fds_[epoll_index];
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) {
perror("epoll_ctl ADD in receiveConnectionBack");
delete connection; // Clean up on failure
}
@@ -263,6 +245,46 @@ int Server::create_listen_socket() {
return sfd;
}
void Server::create_epoll_instances() {
// Create multiple epoll instances to reduce contention
epoll_fds_.resize(config_.server.epoll_instances);
for (int i = 0; i < config_.server.epoll_instances; ++i) {
epoll_fds_[i] = epoll_create1(EPOLL_CLOEXEC);
if (epoll_fds_[i] == -1) {
perror("epoll_create");
throw std::runtime_error("Failed to create epoll instance");
}
// Add shutdown pipe to each epoll instance
struct epoll_event shutdown_event;
shutdown_event.events = EPOLLIN;
shutdown_event.data.ptr = const_cast<char *>(&shutdown_pipe_tag);
if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, shutdown_pipe_[0],
&shutdown_event) == -1) {
perror("epoll_ctl shutdown pipe");
throw std::runtime_error("Failed to add shutdown pipe to epoll");
}
// Add listen socket to each epoll instance with EPOLLEXCLUSIVE to prevent
// thundering herd
struct epoll_event listen_event;
listen_event.events = EPOLLIN | EPOLLEXCLUSIVE;
listen_event.data.ptr = const_cast<char *>(&listen_socket_tag);
if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, listen_sockfd_,
&listen_event) == -1) {
perror("epoll_ctl listen socket");
throw std::runtime_error("Failed to add listen socket to epoll");
}
}
}
int Server::get_epoll_for_thread(int thread_id) const {
// Round-robin assignment of threads to epoll instances
return epoll_fds_[thread_id % epoll_fds_.size()];
}
void Server::start_io_threads() {
int io_threads = config_.server.io_threads;
@@ -271,13 +293,16 @@ void Server::start_io_threads() {
pthread_setname_np(pthread_self(),
("io-" + std::to_string(thread_id)).c_str());
// Each thread uses its assigned epoll instance (round-robin)
int epollfd = get_epoll_for_thread(thread_id);
struct epoll_event events[config_.server.event_batch_size];
std::unique_ptr<Connection> batch[config_.server.event_batch_size];
int batch_events[config_.server.event_batch_size];
for (;;) {
int event_count =
epoll_wait(epollfd_, events, config_.server.event_batch_size, -1);
epoll_wait(epollfd, events, config_.server.event_batch_size, -1);
if (event_count == -1) {
if (errno == EINTR) {
continue;
@@ -317,7 +342,7 @@ void Server::start_io_threads() {
// Process existing connections in batch
if (batch_count > 0) {
process_connection_batch({batch, (size_t)batch_count},
process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, false);
}
@@ -364,7 +389,7 @@ void Server::start_io_threads() {
// Process batch if full
if (batch_count == config_.server.event_batch_size) {
process_connection_batch({batch, (size_t)batch_count},
process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count},
true);
batch_count = 0;
@@ -373,7 +398,7 @@ void Server::start_io_threads() {
// Process remaining accepted connections
if (batch_count > 0) {
process_connection_batch({batch, (size_t)batch_count},
process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, true);
batch_count = 0;
}
@@ -439,8 +464,8 @@ void Server::process_connection_io(std::unique_ptr<Connection> &conn,
}
void Server::process_connection_batch(
std::span<std::unique_ptr<Connection>> batch, std::span<const int> events,
bool is_new) {
int epollfd, std::span<std::unique_ptr<Connection>> batch,
std::span<const int> events, bool is_new) {
// First process I/O for each connection
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) {
@@ -467,7 +492,7 @@ void Server::process_connection_batch(
event.data.ptr = raw_conn;
int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
if (epoll_ctl(epollfd, epoll_op, fd, &event) == -1) {
perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
}
@@ -484,10 +509,15 @@ void Server::cleanup_resources() {
close(shutdown_pipe_[1]);
shutdown_pipe_[1] = -1;
}
if (epollfd_ != -1) {
close(epollfd_);
epollfd_ = -1;
// Close all epoll instances
for (int epollfd : epoll_fds_) {
if (epollfd != -1) {
close(epollfd);
}
}
epoll_fds_.clear();
if (listen_sockfd_ != -1) {
close(listen_sockfd_);
listen_sockfd_ = -1;