From 7e28e6503d8730c7241b93af7369ac1ac96bb82c Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 20 Aug 2025 16:24:09 -0400 Subject: [PATCH] Probably going to merge accept and network threads --- src/connection.cpp | 28 +++--- src/connection.hpp | 2 +- src/connection_handler.hpp | 8 +- src/server.cpp | 192 ++++++++++++++++++++++++++----------- src/server.hpp | 4 + test_config.toml | 4 +- 6 files changed, 166 insertions(+), 72 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 1039b3a..97ba56c 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -47,19 +47,25 @@ void Connection::appendMessage(std::string_view s, bool copyToArena) { } } -std::string_view Connection::readBytes(char *buf, size_t buffer_size) { - int r = read(fd_, buf, buffer_size); - if (r == -1) { - if (errno == EINTR || errno == EAGAIN) { - return {}; // Empty string_view indicates no data or would block +int Connection::readBytes(char *buf, size_t buffer_size) { + int r; + for (;;) { + r = read(fd_, buf, buffer_size); + if (r == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + return 0; + } + perror("read"); + return -1; } - perror("read"); - return {}; // Error - let server handle connection cleanup + if (r == 0) { + return -1; + } + return r; } - if (r == 0) { - return {}; // EOF - let server handle connection cleanup - } - return {buf, size_t(r)}; } bool Connection::writeBytes() { diff --git a/src/connection.hpp b/src/connection.hpp index a7cd794..59ba54c 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -349,7 +349,7 @@ private: ConnectionHandler *handler, std::weak_ptr server); // Networking interface - only accessible by Server - std::string_view readBytes(char *buf, size_t buffer_size); + int readBytes(char *buf, size_t buffer_size); bool writeBytes(); void tsan_acquire(); void tsan_release(); diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 0b32083..d05622f 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -37,14 +37,14 @@ public: * when done * @note `data` is *not* owned by the connection arena, and its lifetime ends * after the call to on_data_arrived. - * @note May be called from an arbitrary network thread. + * @note May be called from an arbitrary server thread. */ virtual void on_data_arrived(std::string_view /*data*/, std::unique_ptr &) {}; /** * Successfully wrote data on the connection. - * @note May be called from an arbitrary network thread. + * @note May be called from an arbitrary server thread. */ virtual void on_write_progress(std::unique_ptr &) {} @@ -55,7 +55,7 @@ public: * * Use this for: * - Connection-specific initialization. - * @note May be called from an arbitrary accept thread. + * @note May be called from an arbitrary server thread. */ virtual void on_connection_established(Connection &) {} @@ -66,7 +66,7 @@ public: * * Use this for: * - Cleanup of connection-specific resources. - * @note May be called from an arbitrary accept thread. + * @note May be called from an arbitrary server thread. */ virtual void on_connection_closed(Connection &) {} diff --git a/src/server.cpp b/src/server.cpp index 3eb92ca..637a0cb 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -317,49 +317,10 @@ void Server::start_network_threads() { continue; // Connection closed - unique_ptr destructor cleans up } - if (events[i].events & EPOLLIN) { - auto buf_size = config_.server.read_buffer_size; - char buf[buf_size]; - std::string_view data = conn->readBytes(buf, buf_size); - - if (data.empty()) { - // No data, error, or EOF - connection should be closed - continue; - } - - // Call handler with unique_ptr - handler can take ownership if - // needed - handler_.on_data_arrived(data, conn); - - // If handler took ownership (conn is now null), don't continue - // processing - if (!conn) { - continue; - } - } - - // Send immediately if we already already have outgoing messages from - // read callbacks. - if ((events[i].events & EPOLLOUT) || - ((events[i].events & EPOLLIN) && conn->hasMessages())) { - bool error = conn->writeBytes(); - if (error) { - continue; - } - - // Call handler with unique_ptr - handler can take ownership if - // needed - handler_.on_write_progress(conn); - // If handler took ownership (conn is now null), don't continue - // processing - if (!conn) { - continue; - } - - // Check if we should close the connection according to application - if (!conn->hasMessages() && conn->closeConnection_) { - continue; - } + // Process I/O using shared helper function + bool should_continue = process_connection_io(conn, events[i].events); + if (!should_continue || !conn) { + continue; // Connection closed or handler took ownership } batch[batch_count++] = std::move(conn); } @@ -421,7 +382,10 @@ void Server::start_accept_threads() { } if (events[i].data.fd == listen_sockfd_) { - // Accept new connections + // Accept new connections and batch them + std::unique_ptr batch[config_.server.event_batch_size]; + int batch_count = 0; + for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); @@ -457,19 +421,86 @@ void Server::start_accept_threads() { connection_id_.fetch_add(1, std::memory_order_relaxed), &handler_, weak_from_this()); - // Transfer to network epoll - struct epoll_event event{}; - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - conn->tsan_release(); - Connection *raw_conn = conn.release(); - event.data.ptr = raw_conn; + // Try to process I/O once in the accept thread before handing off + // to network threads + bool should_continue = process_connection_io(conn, EPOLLIN); - if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == - -1) { - perror("epoll_ctl ADD"); - delete raw_conn; + if (!should_continue) { + // Connection should be closed (error or application decision) continue; } + + // Add to batch if we still have the connection + if (conn) { + batch[batch_count++] = std::move(conn); + + // If batch is full, process it and start a new batch + if (batch_count >= config_.server.event_batch_size) { + handler_.on_post_batch({batch, (size_t)batch_count}); + + // Transfer all batched connections to network epoll + for (int j = 0; j < batch_count; ++j) { + auto &batched_conn = batch[j]; + if (!batched_conn) + continue; + + struct epoll_event event{}; + // Determine next epoll interest based on whether we have + // messages to send + if (!batched_conn->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + int fd = batched_conn->getFd(); + batched_conn->tsan_release(); + Connection *raw_conn = batched_conn.release(); + event.data.ptr = raw_conn; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, + &event) == -1) { + perror("epoll_ctl ADD"); + delete raw_conn; + continue; + } + } + batch_count = 0; // Reset batch + } + } + } + + // Process any remaining connections in the batch + if (batch_count > 0) { + handler_.on_post_batch({batch, (size_t)batch_count}); + + // Transfer remaining batched connections to network epoll + for (int j = 0; j < batch_count; ++j) { + auto &batched_conn = batch[j]; + if (!batched_conn) + continue; + + struct epoll_event event{}; + // Determine next epoll interest based on whether we have + // messages to send + if (!batched_conn->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + int fd = batched_conn->getFd(); + batched_conn->tsan_release(); + Connection *raw_conn = batched_conn.release(); + event.data.ptr = raw_conn; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == + -1) { + perror("epoll_ctl ADD"); + delete raw_conn; + continue; + } + } } } } @@ -478,6 +509,59 @@ void Server::start_accept_threads() { } } +bool Server::process_connection_io(std::unique_ptr &conn, + int events) { + // Handle EPOLLIN - read data and process it + if (events & EPOLLIN) { + auto buf_size = config_.server.read_buffer_size; + char buf[buf_size]; + int r = conn->readBytes(buf, buf_size); + + if (r < 0) { + // Error or EOF - connection should be closed + return false; + } + + if (r == 0) { + // No data available (EAGAIN) - skip read processing but continue + return true; + } + + // Call handler with unique_ptr - handler can take ownership if needed + handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn); + + // If handler took ownership (conn is now null), return true to indicate + // processing is done + if (!conn) { + return true; + } + } + + // Send immediately if we have outgoing messages (either from EPOLLOUT or + // after reading) + if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { + bool error = conn->writeBytes(); + if (error) { + return false; // Connection should be closed + } + + // Call handler with unique_ptr - handler can take ownership if needed + handler_.on_write_progress(conn); + // If handler took ownership (conn is now null), return true to indicate + // processing is done + if (!conn) { + return true; + } + + // Check if we should close the connection according to application + if (!conn->hasMessages() && conn->closeConnection_) { + return false; // Connection should be closed + } + } + + return true; // Connection should continue +} + void Server::cleanup_resources() { if (shutdown_pipe_[0] != -1) { close(shutdown_pipe_[0]); diff --git a/src/server.hpp b/src/server.hpp index 96dd448..600aeaf 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -118,6 +118,10 @@ private: void start_accept_threads(); void cleanup_resources(); + // Helper for processing connection I/O (shared between accept and network + // threads) + bool process_connection_io(std::unique_ptr &conn, int events); + /** * Called internally to return ownership to the server. * diff --git a/test_config.toml b/test_config.toml index 33f2ff2..196063a 100644 --- a/test_config.toml +++ b/test_config.toml @@ -7,9 +7,9 @@ port = 8080 # Maximum request size in bytes (for 413 Content Too Large responses) max_request_size_bytes = 1048576 # 1MB # Number of accept threads for handling incoming connections -accept_threads = 2 +accept_threads = 4 # Number of network I/O threads for epoll processing -network_threads = 8 +network_threads = 4 # Event batch size for epoll processing event_batch_size = 32