diff --git a/src/server.cpp b/src/server.cpp index 5bfb0a0..6b9e57f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -273,9 +273,7 @@ void Server::start_io_threads() { struct epoll_event events[config_.server.event_batch_size]; std::unique_ptr batch[config_.server.event_batch_size]; - bool batch_is_new[config_.server.event_batch_size]; // Track if connection - // is new (ADD) or - // existing (MOD) + int batch_events[config_.server.event_batch_size]; for (;;) { int event_count = @@ -288,101 +286,18 @@ void Server::start_io_threads() { abort(); } + bool listenReady = false; int batch_count = 0; for (int i = 0; i < event_count; ++i) { // Check for shutdown event + // TODO type confusion in data union if (events[i].data.fd == shutdown_pipe_[0]) { return; } - - // Handle listen socket events (new connections) + // Check for new connections + // TODO type confusion in data union if (events[i].data.fd == listen_sockfd_) { - // Accept new connections and batch them - for (;;) { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); - int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, - &addrlen, SOCK_NONBLOCK); - - if (fd == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - break; - if (errno == EINTR) - continue; - perror("accept4"); - abort(); - } - - // Check connection limit - if (config_.server.max_connections > 0 && - activeConnections.load(std::memory_order_relaxed) >= - config_.server.max_connections) { - close(fd); - continue; - } - - // Enable keepalive - int keepalive = 1; - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, - sizeof(keepalive)) == -1) { - perror("setsockopt SO_KEEPALIVE"); - } - - auto conn = Connection::createForServer( - addr, fd, - connection_id_.fetch_add(1, std::memory_order_relaxed), - &handler_, weak_from_this()); - - // Try to process I/O once immediately for better latency - bool should_continue = process_connection_io(conn, EPOLLIN); - - if (!should_continue) { - // Connection should be closed (error or application decision) - continue; - } - - // Add to batch if we still have the connection - if (conn) { - // If batch is full, process it first - if (batch_count >= config_.server.event_batch_size) { - handler_.on_post_batch({batch, (size_t)batch_count}); - - // Re-add all batched connections to epoll - for (int j = 0; j < batch_count; ++j) { - auto &batched_conn = batch[j]; - if (!batched_conn) - continue; - - struct epoll_event event{}; - if (!batched_conn->hasMessages()) { - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } - - int fd = batched_conn->getFd(); - batched_conn->tsan_release(); - Connection *raw_conn = batched_conn.release(); - event.data.ptr = raw_conn; - - int epoll_op = - batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { - perror(batch_is_new[j] ? "epoll_ctl ADD" - : "epoll_ctl MOD"); - delete raw_conn; - } - } - batch_count = 0; // Reset batch - } - - // Add current connection to batch - batch[batch_count] = std::move(conn); - batch_is_new[batch_count] = - true; // New connection, needs EPOLL_CTL_ADD - batch_count++; - } - } + listenReady = true; continue; } @@ -396,89 +311,83 @@ void Server::start_io_threads() { continue; // Connection closed - unique_ptr destructor cleans up } - // Process I/O using shared helper function - bool should_continue = process_connection_io(conn, events[i].events); - if (!should_continue || !conn) { - continue; // Connection closed or handler took ownership - } - - // If batch is full, process it first - if (batch_count >= config_.server.event_batch_size) { - handler_.on_post_batch({batch, (size_t)batch_count}); - - // Re-add all batched connections to epoll - for (int j = 0; j < batch_count; ++j) { - auto &batched_conn = batch[j]; - if (!batched_conn) - continue; - - struct epoll_event event{}; - if (!batched_conn->hasMessages()) { - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } - - int fd = batched_conn->getFd(); - batched_conn->tsan_release(); - Connection *raw_conn = batched_conn.release(); - event.data.ptr = raw_conn; - - int epoll_op = batch_is_new[j] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { - perror(batch_is_new[j] ? "epoll_ctl ADD" : "epoll_ctl MOD"); - delete raw_conn; - } - } - batch_count = 0; // Reset batch - } - - // Add current connection to batch + // Add to regular batch - I/O will be processed in batch batch[batch_count] = std::move(conn); - batch_is_new[batch_count] = - false; // Existing connection, needs EPOLL_CTL_MOD + batch_events[batch_count] = events[i].events; batch_count++; } - // Process batch if we have any connections + // Process existing connections in batch if (batch_count > 0) { - handler_.on_post_batch({batch, (size_t)batch_count}); + process_connection_batch({batch, (size_t)batch_count}, + {batch_events, (size_t)batch_count}, false); + } - for (int i = 0; i < batch_count; ++i) { - auto &conn = batch[i]; - if (!conn) { + // Reuse same batch array for accepting connections + if (listenReady) { + for (;;) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, &addrlen, + SOCK_NONBLOCK); + + if (fd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + perror("accept4"); + abort(); + } + + // Check connection limit + if (config_.server.max_connections > 0 && + activeConnections.load(std::memory_order_relaxed) >= + config_.server.max_connections) { + close(fd); continue; } - // Determine next epoll interest - struct epoll_event event{}; - if (!conn->hasMessages()) { - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + + // Enable keepalive + int keepalive = 1; + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, + sizeof(keepalive)) == -1) { + perror("setsockopt SO_KEEPALIVE"); } - // Transfer ownership back to epoll - int fd = conn->getFd(); - conn->tsan_release(); - Connection *raw_conn = conn.release(); - event.data.ptr = raw_conn; + // Add to batch - I/O will be processed in batch + batch[batch_count] = Connection::createForServer( + addr, fd, + connection_id_.fetch_add(1, std::memory_order_relaxed), + &handler_, weak_from_this()); + batch_events[batch_count] = + EPOLLIN; // New connections always start with read + batch_count++; - // Use ADD for new connections, MOD for existing connections - int epoll_op = batch_is_new[i] ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { - perror(batch_is_new[i] ? "epoll_ctl ADD" : "epoll_ctl MOD"); - delete raw_conn; - continue; + // Process batch if full + if (batch_count == config_.server.event_batch_size) { + process_connection_batch({batch, (size_t)batch_count}, + {batch_events, (size_t)batch_count}, + true); + batch_count = 0; } } + + // Process remaining accepted connections + if (batch_count > 0) { + process_connection_batch({batch, (size_t)batch_count}, + {batch_events, (size_t)batch_count}, true); + batch_count = 0; + } } } }); } } -bool Server::process_connection_io(std::unique_ptr &conn, +void Server::process_connection_io(std::unique_ptr &conn, int events) { + assert(conn); // Handle EPOLLIN - read data and process it if (events & EPOLLIN) { auto buf_size = config_.server.read_buffer_size; @@ -487,12 +396,13 @@ bool Server::process_connection_io(std::unique_ptr &conn, if (r < 0) { // Error or EOF - connection should be closed - return false; + conn.reset(); + return; } if (r == 0) { // No data available (EAGAIN) - skip read processing but continue - return true; + return; } // Call handler with unique_ptr - handler can take ownership if needed @@ -501,7 +411,7 @@ bool Server::process_connection_io(std::unique_ptr &conn, // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { - return true; + return; } } @@ -510,7 +420,8 @@ bool Server::process_connection_io(std::unique_ptr &conn, if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { bool error = conn->writeBytes(); if (error) { - return false; // Connection should be closed + conn.reset(); // Connection should be closed + return; } // Call handler with unique_ptr - handler can take ownership if needed @@ -518,16 +429,52 @@ bool Server::process_connection_io(std::unique_ptr &conn, // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { - return true; + return; } // Check if we should close the connection according to application if (!conn->hasMessages() && conn->closeConnection_) { - return false; // Connection should be closed + conn.reset(); // Connection should be closed + return; + } + } +} + +void Server::process_connection_batch( + std::span> batch, std::span events, + bool is_new) { + // First process I/O for each connection + for (size_t i = 0; i < batch.size(); ++i) { + if (batch[i]) { + process_connection_io(batch[i], events[i]); } } - return true; // Connection should continue + // Call post-batch handler + handler_.on_post_batch(batch); + + // Transfer all remaining connections back to epoll + for (auto &conn : batch) { + if (conn) { + struct epoll_event event{}; + if (!conn->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + int fd = conn->getFd(); + conn->tsan_release(); + Connection *raw_conn = conn.release(); + event.data.ptr = raw_conn; + + int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; + if (epoll_ctl(epollfd_, epoll_op, fd, &event) == -1) { + perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD"); + delete raw_conn; + } + } + } } void Server::cleanup_resources() { diff --git a/src/server.hpp b/src/server.hpp index 4fdb615..b4d7880 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -4,6 +4,7 @@ #include "connection_handler.hpp" #include #include +#include #include #include @@ -117,7 +118,11 @@ private: void cleanup_resources(); // Helper for processing connection I/O - bool process_connection_io(std::unique_ptr &conn, int events); + void process_connection_io(std::unique_ptr &conn, int events); + + // Helper for processing a batch of connections with their events + void process_connection_batch(std::span> batch, + std::span events, bool is_new); /** * Called internally to return ownership to the server. @@ -134,4 +139,4 @@ private: Server &operator=(const Server &) = delete; Server(Server &&) = delete; Server &operator=(Server &&) = delete; -}; \ No newline at end of file +};