Unify accept and network threads into io threads

This commit is contained in:
2025-08-20 16:50:54 -04:00
parent 7e28e6503d
commit 130ff2062a
10 changed files with 157 additions and 216 deletions

View File

@@ -83,8 +83,7 @@ void ConfigParser::parse_server_config(const auto &toml_data,
parse_field(srv, "port", config.port);
parse_field(srv, "unix_socket_path", config.unix_socket_path);
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
parse_field(srv, "accept_threads", config.accept_threads);
parse_field(srv, "network_threads", config.network_threads);
parse_field(srv, "io_threads", config.io_threads);
parse_field(srv, "event_batch_size", config.event_batch_size);
parse_field(srv, "max_connections", config.max_connections);
parse_field(srv, "read_buffer_size", config.read_buffer_size);
@@ -149,18 +148,10 @@ bool ConfigParser::validate_config(const Config &config) {
valid = false;
}
if (config.server.accept_threads < 1 || config.server.accept_threads > 100) {
std::cerr << "Configuration error: server.accept_threads must be between 1 "
"and 100, got "
<< config.server.accept_threads << std::endl;
valid = false;
}
if (config.server.network_threads < 1 ||
config.server.network_threads > 1000) {
std::cerr << "Configuration error: server.network_threads must be between "
"1 and 1000, got "
<< config.server.network_threads << std::endl;
if (config.server.io_threads < 1 || config.server.io_threads > 1000) {
std::cerr << "Configuration error: server.io_threads must be between 1 "
"and 1000, got "
<< config.server.io_threads << std::endl;
valid = false;
}

View File

@@ -18,14 +18,12 @@ struct ServerConfig {
std::string unix_socket_path;
/// Maximum size in bytes for incoming HTTP requests (default: 1MB)
size_t max_request_size_bytes = 1024 * 1024;
/// Number of accept threads for handling incoming connections
int accept_threads = 1;
/// Number of network I/O threads for epoll processing
int network_threads = 1;
/// Number of I/O threads for handling connections and network events
int io_threads = 1;
/// Event batch size for epoll processing
int event_batch_size = 32;
/// Maximum number of concurrent connections (0 = unlimited)
int max_connections = 1000;
int max_connections = 50000;
/// Buffer size for reading from socket connections (default: 16KB)
size_t read_buffer_size = 16 * 1024;
};

View File

@@ -21,12 +21,13 @@ extern std::atomic<int> activeConnections;
* Represents a single client connection with efficient memory management.
*
* Connection ownership model:
* - Created by accept thread, transferred to epoll via raw pointer
* - Network threads claim ownership by wrapping raw pointer in unique_ptr
* - Network thread optionally passes ownership to a thread pipeline
* - Created by I/O thread, processed immediately, then transferred to epoll via
* raw pointer
* - I/O threads claim ownership by wrapping raw pointer in unique_ptr
* - I/O thread optionally passes ownership to a thread pipeline
* - Owner eventually transfers back to epoll by releasing unique_ptr to raw
* pointer
* - RAII cleanup happens if network thread doesn't transfer back
* - RAII cleanup happens if I/O thread doesn't transfer back
*
* Arena allocator thread safety:
* Each Connection contains its own ArenaAllocator instance that is accessed
@@ -68,7 +69,7 @@ struct Connection {
* @brief Queue a message to be sent to the client.
*
* Adds data to the connection's outgoing message queue. The data will be sent
* asynchronously by the server's network threads using efficient vectored
* asynchronously by the server's I/O threads using efficient vectored
* I/O.
*
* @param s The data to send (string view for zero-copy efficiency)
@@ -342,7 +343,7 @@ private:
* @return std::unique_ptr<Connection> to the newly created connection
*
* @note This method is only accessible to the Server class and should be used
* exclusively by accept threads when new connections arrive.
* exclusively by I/O threads when new connections arrive.
*/
static std::unique_ptr<Connection>
createForServer(struct sockaddr_storage addr, int fd, int64_t id,

View File

@@ -100,9 +100,7 @@ int main(int argc, char *argv[]) {
}
std::cout << "Max request size: " << config->server.max_request_size_bytes
<< " bytes" << std::endl;
std::cout << "Accept threads: " << config->server.accept_threads << std::endl;
std::cout << "Network threads: " << config->server.network_threads
<< std::endl;
std::cout << "I/O threads: " << config->server.io_threads << std::endl;
std::cout << "Event batch size: " << config->server.event_batch_size
<< std::endl;
std::cout << "Max connections: " << config->server.max_connections

View File

@@ -33,48 +33,34 @@ void Server::run() {
listen_sockfd_ = create_listen_socket();
// Create epoll instances
network_epollfd_ = epoll_create1(EPOLL_CLOEXEC);
if (network_epollfd_ == -1) {
perror("epoll_create network");
throw std::runtime_error("Failed to create network epoll instance");
// Create single epoll instance
epollfd_ = epoll_create1(EPOLL_CLOEXEC);
if (epollfd_ == -1) {
perror("epoll_create");
throw std::runtime_error("Failed to create epoll instance");
}
accept_epollfd_ = epoll_create1(EPOLL_CLOEXEC);
if (accept_epollfd_ == -1) {
perror("epoll_create accept");
throw std::runtime_error("Failed to create accept epoll instance");
}
// Add shutdown pipe to both epoll instances
// Add shutdown pipe to epoll instance
struct epoll_event shutdown_event;
shutdown_event.events = EPOLLIN;
shutdown_event.data.fd = shutdown_pipe_[0];
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0],
&shutdown_event) == -1) {
perror("epoll_ctl add shutdown to network");
throw std::runtime_error("Failed to add shutdown pipe to network epoll");
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) ==
-1) {
perror("epoll_ctl shutdown pipe");
throw std::runtime_error("Failed to add shutdown pipe to epoll");
}
if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0],
&shutdown_event) == -1) {
perror("epoll_ctl add shutdown to accept");
throw std::runtime_error("Failed to add shutdown pipe to accept epoll");
}
// Add listen socket to accept epoll
// Add listen socket to epoll with EPOLLEXCLUSIVE to prevent thundering herd
struct epoll_event listen_event;
listen_event.events = EPOLLIN | EPOLLEXCLUSIVE;
listen_event.data.fd = listen_sockfd_;
if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, listen_sockfd_,
&listen_event) == -1) {
perror("epoll_ctl add listen socket");
throw std::runtime_error("Failed to add listen socket to accept epoll");
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) {
perror("epoll_ctl listen socket");
throw std::runtime_error("Failed to add listen socket to epoll");
}
start_network_threads();
start_accept_threads();
start_io_threads();
// Wait for all threads to complete
for (auto &thread : threads_) {
@@ -124,8 +110,7 @@ void Server::receiveConnectionBack(Connection *connection) {
connection->tsan_release();
event.data.ptr = connection;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) ==
-1) {
if (epoll_ctl(epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) {
perror("epoll_ctl ADD in receiveConnectionBack");
delete connection; // Clean up on failure
}
@@ -278,20 +263,23 @@ int Server::create_listen_socket() {
return sfd;
}
void Server::start_network_threads() {
int network_threads = config_.server.network_threads;
void Server::start_io_threads() {
int io_threads = config_.server.io_threads;
for (int thread_id = 0; thread_id < network_threads; ++thread_id) {
for (int thread_id = 0; thread_id < io_threads; ++thread_id) {
threads_.emplace_back([this, thread_id]() {
pthread_setname_np(pthread_self(),
("network-" + std::to_string(thread_id)).c_str());
("io-" + std::to_string(thread_id)).c_str());
struct epoll_event events[config_.server.event_batch_size];
std::unique_ptr<Connection> batch[config_.server.event_batch_size];
bool batch_is_new[config_.server.event_batch_size]; // Track if connection
// is new (ADD) or
// existing (MOD)
for (;;) {
int event_count = epoll_wait(network_epollfd_, events,
config_.server.event_batch_size, -1);
int event_count =
epoll_wait(epollfd_, events, config_.server.event_batch_size, -1);
if (event_count == -1) {
if (errno == EINTR) {
continue;
@@ -307,85 +295,9 @@ void Server::start_network_threads() {
return;
}
// Take ownership from epoll: raw pointer -> unique_ptr
std::unique_ptr<Connection> conn{
static_cast<Connection *>(events[i].data.ptr)};
conn->tsan_acquire();
events[i].data.ptr = nullptr;
if (events[i].events & EPOLLERR) {
continue; // Connection closed - unique_ptr destructor cleans up
}
// Process I/O using shared helper function
bool should_continue = process_connection_io(conn, events[i].events);
if (!should_continue || !conn) {
continue; // Connection closed or handler took ownership
}
batch[batch_count++] = std::move(conn);
}
handler_.on_post_batch({batch, (size_t)batch_count});
for (int i = 0; i < batch_count; ++i) {
auto &conn = batch[i];
if (!conn) {
continue;
}
// Determine next epoll interest
struct epoll_event event{};
if (!conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
// Transfer ownership back to epoll
conn->tsan_release();
Connection *raw_conn = conn.release();
event.data.ptr = raw_conn;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, raw_conn->getFd(),
&event) == -1) {
perror("epoll_ctl MOD");
delete raw_conn;
continue;
}
}
}
});
}
}
void Server::start_accept_threads() {
int accept_threads = config_.server.accept_threads;
for (int thread_id = 0; thread_id < accept_threads; ++thread_id) {
threads_.emplace_back([this, thread_id]() {
pthread_setname_np(pthread_self(),
("accept-" + std::to_string(thread_id)).c_str());
for (;;) {
struct epoll_event events[2]; // listen socket + shutdown pipe
int ready = epoll_wait(accept_epollfd_, events, 2, -1);
if (ready == -1) {
if (errno == EINTR)
continue;
perror("epoll_wait accept");
abort();
}
for (int i = 0; i < ready; ++i) {
if (events[i].data.fd == shutdown_pipe_[0]) {
return; // Shutdown signal
}
// Handle listen socket events (new connections)
if (events[i].data.fd == listen_sockfd_) {
// Accept new connections and batch them
std::unique_ptr<Connection> batch[config_.server.event_batch_size];
int batch_count = 0;
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
@@ -421,8 +333,7 @@ void Server::start_accept_threads() {
connection_id_.fetch_add(1, std::memory_order_relaxed),
&handler_, weak_from_this());
// Try to process I/O once in the accept thread before handing off
// to network threads
// Try to process I/O once immediately for better latency
bool should_continue = process_connection_io(conn, EPOLLIN);
if (!should_continue) {
@@ -432,21 +343,17 @@ void Server::start_accept_threads() {
// Add to batch if we still have the connection
if (conn) {
batch[batch_count++] = std::move(conn);
// If batch is full, process it and start a new batch
// If batch is full, process it first
if (batch_count >= config_.server.event_batch_size) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Transfer all batched connections to network epoll
// Re-add all batched connections to epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
// Determine next epoll interest based on whether we have
// messages to send
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
@@ -458,49 +365,110 @@ void Server::start_accept_threads() {
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd,
&event) == -1) {
perror("epoll_ctl ADD");
int epoll_op =
batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[j] ? "epoll_ctl ADD"
: "epoll_ctl MOD");
delete raw_conn;
continue;
}
}
batch_count = 0; // Reset batch
}
// Add current connection to batch
batch[batch_count] = std::move(conn);
batch_is_new[batch_count] =
true; // New connection, needs EPOLL_CTL_ADD
batch_count++;
}
}
continue;
}
// Process any remaining connections in the batch
if (batch_count > 0) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Handle existing connection events
std::unique_ptr<Connection> conn{
static_cast<Connection *>(events[i].data.ptr)};
conn->tsan_acquire();
events[i].data.ptr = nullptr;
// Transfer remaining batched connections to network epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
if (events[i].events & EPOLLERR) {
continue; // Connection closed - unique_ptr destructor cleans up
}
struct epoll_event event{};
// Determine next epoll interest based on whether we have
// messages to send
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
// Process I/O using shared helper function
bool should_continue = process_connection_io(conn, events[i].events);
if (!should_continue || !conn) {
continue; // Connection closed or handler took ownership
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
// If batch is full, process it first
if (batch_count >= config_.server.event_batch_size) {
handler_.on_post_batch({batch, (size_t)batch_count});
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) ==
-1) {
perror("epoll_ctl ADD");
delete raw_conn;
continue;
}
// Re-add all batched connections to epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
int epoll_op = batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[j] ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
}
}
batch_count = 0; // Reset batch
}
// Add current connection to batch
batch[batch_count] = std::move(conn);
batch_is_new[batch_count] =
false; // Existing connection, needs EPOLL_CTL_MOD
batch_count++;
}
// Process batch if we have any connections
if (batch_count > 0) {
handler_.on_post_batch({batch, (size_t)batch_count});
for (int i = 0; i < batch_count; ++i) {
auto &conn = batch[i];
if (!conn) {
continue;
}
// Determine next epoll interest
struct epoll_event event{};
if (!conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
// Transfer ownership back to epoll
int fd = conn->getFd();
conn->tsan_release();
Connection *raw_conn = conn.release();
event.data.ptr = raw_conn;
// Use ADD for new connections, MOD for existing connections
int epoll_op = batch_is_new[i] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[i] ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
continue;
}
}
}
@@ -571,13 +539,9 @@ void Server::cleanup_resources() {
close(shutdown_pipe_[1]);
shutdown_pipe_[1] = -1;
}
if (network_epollfd_ != -1) {
close(network_epollfd_);
network_epollfd_ = -1;
}
if (accept_epollfd_ != -1) {
close(accept_epollfd_);
accept_epollfd_ = -1;
if (epollfd_ != -1) {
close(epollfd_);
epollfd_ = -1;
}
if (listen_sockfd_ != -1) {
close(listen_sockfd_);

View File

@@ -105,21 +105,18 @@ private:
// Shutdown coordination
int shutdown_pipe_[2] = {-1, -1};
// Epoll file descriptors
int network_epollfd_ = -1;
int accept_epollfd_ = -1;
// Epoll file descriptor
int epollfd_ = -1;
int listen_sockfd_ = -1;
// Private helper methods
void setup_shutdown_pipe();
void setup_signal_handling();
int create_listen_socket();
void start_network_threads();
void start_accept_threads();
void start_io_threads();
void cleanup_resources();
// Helper for processing connection I/O (shared between accept and network
// threads)
// Helper for processing connection I/O
bool process_connection_io(std::unique_ptr<Connection> &conn, int events);
/**