Separate phases for processing existing and new connections

This commit is contained in:
2025-08-21 11:58:10 -04:00
parent 78a7549ce5
commit cfddaddb31
2 changed files with 115 additions and 163 deletions

View File

@@ -273,9 +273,7 @@ void Server::start_io_threads() {
struct epoll_event events[config_.server.event_batch_size];
std::unique_ptr<Connection> batch[config_.server.event_batch_size];
bool batch_is_new[config_.server.event_batch_size]; // Track if connection
// is new (ADD) or
// existing (MOD)
int batch_events[config_.server.event_batch_size];
for (;;) {
int event_count =
@@ -288,101 +286,18 @@ void Server::start_io_threads() {
abort();
}
bool listenReady = false;
int batch_count = 0;
for (int i = 0; i < event_count; ++i) {
// Check for shutdown event
// TODO type confusion in data union
if (events[i].data.fd == shutdown_pipe_[0]) {
return;
}
// Handle listen socket events (new connections)
// Check for new connections
// TODO type confusion in data union
if (events[i].data.fd == listen_sockfd_) {
// Accept new connections and batch them
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr,
&addrlen, SOCK_NONBLOCK);
if (fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
perror("accept4");
abort();
}
// Check connection limit
if (config_.server.max_connections > 0 &&
activeConnections.load(std::memory_order_relaxed) >=
config_.server.max_connections) {
close(fd);
continue;
}
// Enable keepalive
int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) == -1) {
perror("setsockopt SO_KEEPALIVE");
}
auto conn = Connection::createForServer(
addr, fd,
connection_id_.fetch_add(1, std::memory_order_relaxed),
&handler_, weak_from_this());
// Try to process I/O once immediately for better latency
bool should_continue = process_connection_io(conn, EPOLLIN);
if (!should_continue) {
// Connection should be closed (error or application decision)
continue;
}
// Add to batch if we still have the connection
if (conn) {
// If batch is full, process it first
if (batch_count >= config_.server.event_batch_size) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Re-add all batched connections to epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
int epoll_op =
batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[j] ? "epoll_ctl ADD"
: "epoll_ctl MOD");
delete raw_conn;
}
}
batch_count = 0; // Reset batch
}
// Add current connection to batch
batch[batch_count] = std::move(conn);
batch_is_new[batch_count] =
true; // New connection, needs EPOLL_CTL_ADD
batch_count++;
}
}
listenReady = true;
continue;
}
@@ -396,89 +311,83 @@ void Server::start_io_threads() {
continue; // Connection closed - unique_ptr destructor cleans up
}
// Process I/O using shared helper function
bool should_continue = process_connection_io(conn, events[i].events);
if (!should_continue || !conn) {
continue; // Connection closed or handler took ownership
}
// If batch is full, process it first
if (batch_count >= config_.server.event_batch_size) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Re-add all batched connections to epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
int epoll_op = batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[j] ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
}
}
batch_count = 0; // Reset batch
}
// Add current connection to batch
// Add to regular batch - I/O will be processed in batch
batch[batch_count] = std::move(conn);
batch_is_new[batch_count] =
false; // Existing connection, needs EPOLL_CTL_MOD
batch_events[batch_count] = events[i].events;
batch_count++;
}
// Process batch if we have any connections
// Process existing connections in batch
if (batch_count > 0) {
handler_.on_post_batch({batch, (size_t)batch_count});
process_connection_batch({batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, false);
}
for (int i = 0; i < batch_count; ++i) {
auto &conn = batch[i];
if (!conn) {
// Reuse same batch array for accepting connections
if (listenReady) {
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, &addrlen,
SOCK_NONBLOCK);
if (fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break;
if (errno == EINTR)
continue;
perror("accept4");
abort();
}
// Check connection limit
if (config_.server.max_connections > 0 &&
activeConnections.load(std::memory_order_relaxed) >=
config_.server.max_connections) {
close(fd);
continue;
}
// Determine next epoll interest
struct epoll_event event{};
if (!conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
// Enable keepalive
int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) == -1) {
perror("setsockopt SO_KEEPALIVE");
}
// Transfer ownership back to epoll
int fd = conn->getFd();
conn->tsan_release();
Connection *raw_conn = conn.release();
event.data.ptr = raw_conn;
// Add to batch - I/O will be processed in batch
batch[batch_count] = Connection::createForServer(
addr, fd,
connection_id_.fetch_add(1, std::memory_order_relaxed),
&handler_, weak_from_this());
batch_events[batch_count] =
EPOLLIN; // New connections always start with read
batch_count++;
// Use ADD for new connections, MOD for existing connections
int epoll_op = batch_is_new[i] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(batch_is_new[i] ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
continue;
// Process batch if full
if (batch_count == config_.server.event_batch_size) {
process_connection_batch({batch, (size_t)batch_count},
{batch_events, (size_t)batch_count},
true);
batch_count = 0;
}
}
// Process remaining accepted connections
if (batch_count > 0) {
process_connection_batch({batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, true);
batch_count = 0;
}
}
}
});
}
}
bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
void Server::process_connection_io(std::unique_ptr<Connection> &conn,
int events) {
assert(conn);
// Handle EPOLLIN - read data and process it
if (events & EPOLLIN) {
auto buf_size = config_.server.read_buffer_size;
@@ -487,12 +396,13 @@ bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
if (r < 0) {
// Error or EOF - connection should be closed
return false;
conn.reset();
return;
}
if (r == 0) {
// No data available (EAGAIN) - skip read processing but continue
return true;
return;
}
// Call handler with unique_ptr - handler can take ownership if needed
@@ -501,7 +411,7 @@ bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return true;
return;
}
}
@@ -510,7 +420,8 @@ bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
bool error = conn->writeBytes();
if (error) {
return false; // Connection should be closed
conn.reset(); // Connection should be closed
return;
}
// Call handler with unique_ptr - handler can take ownership if needed
@@ -518,16 +429,52 @@ bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return true;
return;
}
// Check if we should close the connection according to application
if (!conn->hasMessages() && conn->closeConnection_) {
return false; // Connection should be closed
conn.reset(); // Connection should be closed
return;
}
}
}
void Server::process_connection_batch(
std::span<std::unique_ptr<Connection>> batch, std::span<const int> events,
bool is_new) {
// First process I/O for each connection
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) {
process_connection_io(batch[i], events[i]);
}
}
return true; // Connection should continue
// Call post-batch handler
handler_.on_post_batch(batch);
// Transfer all remaining connections back to epoll
for (auto &conn : batch) {
if (conn) {
struct epoll_event event{};
if (!conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = conn->getFd();
conn->tsan_release();
Connection *raw_conn = conn.release();
event.data.ptr = raw_conn;
int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) {
perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD");
delete raw_conn;
}
}
}
}
void Server::cleanup_resources() {