From 6b7cc74a7cd2e8575f0d3e82546d979f0a0c6cbb Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 19 Aug 2025 15:28:17 -0400 Subject: [PATCH] Add ConnectionHandler::on_post_batch --- src/connection.hpp | 1 + src/connection_handler.hpp | 14 ++++++++++++++ src/server.cpp | 27 +++++++++++++++++++-------- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/connection.hpp b/src/connection.hpp index 07ee1d4..50f3301 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -51,6 +51,7 @@ struct Connection { } return result; } + void *user_data = nullptr; // Note: To release connection back to server, use // Server::releaseBackToServer(std::move(connection_ptr)) diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 6e0ec8c..6ceec8b 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -1,6 +1,7 @@ #pragma once #include +#include #include // Forward declaration to avoid circular dependency @@ -68,4 +69,17 @@ public: * @note May be called from an arbitrary accept thread. */ virtual void on_connection_closed(Connection &) {} + + /** + * @brief Called after a batch of connections has been processed. + * + * This hook is called after on_data_arrived or on_write_progress has been + * called for each connection in the batch. The handler can take ownership of + * the connections by moving the unique_ptr out of the span. Any connections + * left in the span will remain owned by the server. + * + * @param batch A span of unique_ptrs to the connections in the batch. + */ + virtual void on_post_batch(std::span> /*batch*/) { + } }; diff --git a/src/server.cpp b/src/server.cpp index 678ff79..2122845 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -228,10 +228,11 @@ void Server::start_network_threads() { pthread_setname_np(pthread_self(), ("network-" + std::to_string(thread_id)).c_str()); - std::vector events(config_.server.event_batch_size); + struct epoll_event events[config_.server.event_batch_size]; + std::unique_ptr batch[config_.server.event_batch_size]; for (;;) { - int event_count = epoll_wait(network_epollfd_, events.data(), + int event_count = epoll_wait(network_epollfd_, events, config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { @@ -241,6 +242,7 @@ void Server::start_network_threads() { abort(); } + int batch_count = 0; for (int i = 0; i < event_count; ++i) { // Check for shutdown event if (events[i].data.fd == shutdown_pipe_[0]) { @@ -252,7 +254,6 @@ void Server::start_network_threads() { static_cast(events[i].data.ptr)}; conn->tsan_acquire(); events[i].data.ptr = nullptr; - const int fd = conn->getFd(); if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { continue; // Connection closed - unique_ptr destructor cleans up @@ -299,21 +300,31 @@ void Server::start_network_threads() { continue; } } + batch[batch_count++] = std::move(conn); + } + handler_.on_post_batch({batch, (size_t)batch_count}); + + for (int i = 0; i < batch_count; ++i) { + auto &conn = batch[i]; + if (!conn) { + continue; + } // Determine next epoll interest + struct epoll_event event{}; if (!conn->hasMessages()) { - events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { - events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; } // Transfer ownership back to epoll conn->tsan_release(); Connection *raw_conn = conn.release(); - events[i].data.ptr = raw_conn; + event.data.ptr = raw_conn; - if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, fd, &events[i]) == - -1) { + if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, raw_conn->getFd(), + &event) == -1) { perror("epoll_ctl MOD"); delete raw_conn; continue;