Probably going to merge accept and network threads

This commit is contained in:
2025-08-20 16:24:09 -04:00
parent 24a1157f0d
commit 7e28e6503d
6 changed files with 166 additions and 72 deletions

View File

@@ -47,19 +47,25 @@ void Connection::appendMessage(std::string_view s, bool copyToArena) {
} }
} }
std::string_view Connection::readBytes(char *buf, size_t buffer_size) { int Connection::readBytes(char *buf, size_t buffer_size) {
int r = read(fd_, buf, buffer_size); int r;
if (r == -1) { for (;;) {
if (errno == EINTR || errno == EAGAIN) { r = read(fd_, buf, buffer_size);
return {}; // Empty string_view indicates no data or would block if (r == -1) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN) {
return 0;
}
perror("read");
return -1;
} }
perror("read"); if (r == 0) {
return {}; // Error - let server handle connection cleanup return -1;
}
return r;
} }
if (r == 0) {
return {}; // EOF - let server handle connection cleanup
}
return {buf, size_t(r)};
} }
bool Connection::writeBytes() { bool Connection::writeBytes() {

View File

@@ -349,7 +349,7 @@ private:
ConnectionHandler *handler, std::weak_ptr<Server> server); ConnectionHandler *handler, std::weak_ptr<Server> server);
// Networking interface - only accessible by Server // Networking interface - only accessible by Server
std::string_view readBytes(char *buf, size_t buffer_size); int readBytes(char *buf, size_t buffer_size);
bool writeBytes(); bool writeBytes();
void tsan_acquire(); void tsan_acquire();
void tsan_release(); void tsan_release();

View File

@@ -37,14 +37,14 @@ public:
* when done * when done
* @note `data` is *not* owned by the connection arena, and its lifetime ends * @note `data` is *not* owned by the connection arena, and its lifetime ends
* after the call to on_data_arrived. * after the call to on_data_arrived.
* @note May be called from an arbitrary network thread. * @note May be called from an arbitrary server thread.
*/ */
virtual void on_data_arrived(std::string_view /*data*/, virtual void on_data_arrived(std::string_view /*data*/,
std::unique_ptr<Connection> &) {}; std::unique_ptr<Connection> &) {};
/** /**
* Successfully wrote data on the connection. * Successfully wrote data on the connection.
* @note May be called from an arbitrary network thread. * @note May be called from an arbitrary server thread.
*/ */
virtual void on_write_progress(std::unique_ptr<Connection> &) {} virtual void on_write_progress(std::unique_ptr<Connection> &) {}
@@ -55,7 +55,7 @@ public:
* *
* Use this for: * Use this for:
* - Connection-specific initialization. * - Connection-specific initialization.
* @note May be called from an arbitrary accept thread. * @note May be called from an arbitrary server thread.
*/ */
virtual void on_connection_established(Connection &) {} virtual void on_connection_established(Connection &) {}
@@ -66,7 +66,7 @@ public:
* *
* Use this for: * Use this for:
* - Cleanup of connection-specific resources. * - Cleanup of connection-specific resources.
* @note May be called from an arbitrary accept thread. * @note May be called from an arbitrary server thread.
*/ */
virtual void on_connection_closed(Connection &) {} virtual void on_connection_closed(Connection &) {}

View File

@@ -317,49 +317,10 @@ void Server::start_network_threads() {
continue; // Connection closed - unique_ptr destructor cleans up continue; // Connection closed - unique_ptr destructor cleans up
} }
if (events[i].events & EPOLLIN) { // Process I/O using shared helper function
auto buf_size = config_.server.read_buffer_size; bool should_continue = process_connection_io(conn, events[i].events);
char buf[buf_size]; if (!should_continue || !conn) {
std::string_view data = conn->readBytes(buf, buf_size); continue; // Connection closed or handler took ownership
if (data.empty()) {
// No data, error, or EOF - connection should be closed
continue;
}
// Call handler with unique_ptr - handler can take ownership if
// needed
handler_.on_data_arrived(data, conn);
// If handler took ownership (conn is now null), don't continue
// processing
if (!conn) {
continue;
}
}
// Send immediately if we already already have outgoing messages from
// read callbacks.
if ((events[i].events & EPOLLOUT) ||
((events[i].events & EPOLLIN) && conn->hasMessages())) {
bool error = conn->writeBytes();
if (error) {
continue;
}
// Call handler with unique_ptr - handler can take ownership if
// needed
handler_.on_write_progress(conn);
// If handler took ownership (conn is now null), don't continue
// processing
if (!conn) {
continue;
}
// Check if we should close the connection according to application
if (!conn->hasMessages() && conn->closeConnection_) {
continue;
}
} }
batch[batch_count++] = std::move(conn); batch[batch_count++] = std::move(conn);
} }
@@ -421,7 +382,10 @@ void Server::start_accept_threads() {
} }
if (events[i].data.fd == listen_sockfd_) { if (events[i].data.fd == listen_sockfd_) {
// Accept new connections // Accept new connections and batch them
std::unique_ptr<Connection> batch[config_.server.event_batch_size];
int batch_count = 0;
for (;;) { for (;;) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr); socklen_t addrlen = sizeof(addr);
@@ -457,19 +421,86 @@ void Server::start_accept_threads() {
connection_id_.fetch_add(1, std::memory_order_relaxed), connection_id_.fetch_add(1, std::memory_order_relaxed),
&handler_, weak_from_this()); &handler_, weak_from_this());
// Transfer to network epoll // Try to process I/O once in the accept thread before handing off
struct epoll_event event{}; // to network threads
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; bool should_continue = process_connection_io(conn, EPOLLIN);
conn->tsan_release();
Connection *raw_conn = conn.release();
event.data.ptr = raw_conn;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == if (!should_continue) {
-1) { // Connection should be closed (error or application decision)
perror("epoll_ctl ADD");
delete raw_conn;
continue; continue;
} }
// Add to batch if we still have the connection
if (conn) {
batch[batch_count++] = std::move(conn);
// If batch is full, process it and start a new batch
if (batch_count >= config_.server.event_batch_size) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Transfer all batched connections to network epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
// Determine next epoll interest based on whether we have
// messages to send
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd,
&event) == -1) {
perror("epoll_ctl ADD");
delete raw_conn;
continue;
}
}
batch_count = 0; // Reset batch
}
}
}
// Process any remaining connections in the batch
if (batch_count > 0) {
handler_.on_post_batch({batch, (size_t)batch_count});
// Transfer remaining batched connections to network epoll
for (int j = 0; j < batch_count; ++j) {
auto &batched_conn = batch[j];
if (!batched_conn)
continue;
struct epoll_event event{};
// Determine next epoll interest based on whether we have
// messages to send
if (!batched_conn->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
int fd = batched_conn->getFd();
batched_conn->tsan_release();
Connection *raw_conn = batched_conn.release();
event.data.ptr = raw_conn;
if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) ==
-1) {
perror("epoll_ctl ADD");
delete raw_conn;
continue;
}
}
} }
} }
} }
@@ -478,6 +509,59 @@ void Server::start_accept_threads() {
} }
} }
bool Server::process_connection_io(std::unique_ptr<Connection> &conn,
int events) {
// Handle EPOLLIN - read data and process it
if (events & EPOLLIN) {
auto buf_size = config_.server.read_buffer_size;
char buf[buf_size];
int r = conn->readBytes(buf, buf_size);
if (r < 0) {
// Error or EOF - connection should be closed
return false;
}
if (r == 0) {
// No data available (EAGAIN) - skip read processing but continue
return true;
}
// Call handler with unique_ptr - handler can take ownership if needed
handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn);
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return true;
}
}
// Send immediately if we have outgoing messages (either from EPOLLOUT or
// after reading)
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
bool error = conn->writeBytes();
if (error) {
return false; // Connection should be closed
}
// Call handler with unique_ptr - handler can take ownership if needed
handler_.on_write_progress(conn);
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return true;
}
// Check if we should close the connection according to application
if (!conn->hasMessages() && conn->closeConnection_) {
return false; // Connection should be closed
}
}
return true; // Connection should continue
}
void Server::cleanup_resources() { void Server::cleanup_resources() {
if (shutdown_pipe_[0] != -1) { if (shutdown_pipe_[0] != -1) {
close(shutdown_pipe_[0]); close(shutdown_pipe_[0]);

View File

@@ -118,6 +118,10 @@ private:
void start_accept_threads(); void start_accept_threads();
void cleanup_resources(); void cleanup_resources();
// Helper for processing connection I/O (shared between accept and network
// threads)
bool process_connection_io(std::unique_ptr<Connection> &conn, int events);
/** /**
* Called internally to return ownership to the server. * Called internally to return ownership to the server.
* *

View File

@@ -7,9 +7,9 @@ port = 8080
# Maximum request size in bytes (for 413 Content Too Large responses) # Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of accept threads for handling incoming connections # Number of accept threads for handling incoming connections
accept_threads = 2 accept_threads = 4
# Number of network I/O threads for epoll processing # Number of network I/O threads for epoll processing
network_threads = 8 network_threads = 4
# Event batch size for epoll processing # Event batch size for epoll processing
event_batch_size = 32 event_batch_size = 32