diff --git a/src/connection.cpp b/src/connection.cpp index 6d412b1..91dfd32 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -9,18 +9,19 @@ std::unique_ptr Connection::createForServer(struct sockaddr_storage addr, int fd, int64_t id, - ConnectionHandler *handler, + size_t epoll_index, ConnectionHandler *handler, std::weak_ptr server) { // Use unique_ptr constructor with private access (friend function) // We can't use make_unique here because constructor is private return std::unique_ptr( - new Connection(addr, fd, id, handler, server)); + new Connection(addr, fd, id, epoll_index, handler, server)); } Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, - ConnectionHandler *handler, std::weak_ptr server) - : fd_(fd), id_(id), addr_(addr), arena_(), handler_(handler), - server_(server) { + size_t epoll_index, ConnectionHandler *handler, + std::weak_ptr server) + : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(), + handler_(handler), server_(server) { if (auto server_ptr = server_.lock()) { server_ptr->active_connections_.fetch_add(1, std::memory_order_relaxed); } diff --git a/src/connection.hpp b/src/connection.hpp index 37b2802..060075c 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -323,7 +323,8 @@ private: * @param server Weak reference to the server for safe cleanup */ Connection(struct sockaddr_storage addr, int fd, int64_t id, - ConnectionHandler *handler, std::weak_ptr server); + size_t epoll_index, ConnectionHandler *handler, + std::weak_ptr server); /** * @brief Server-only factory method for creating connections. @@ -345,7 +346,8 @@ private: */ static std::unique_ptr createForServer(struct sockaddr_storage addr, int fd, int64_t id, - ConnectionHandler *handler, std::weak_ptr server); + size_t epoll_index, ConnectionHandler *handler, + std::weak_ptr server); // Networking interface - only accessible by Server int readBytes(char *buf, size_t buffer_size); @@ -357,8 +359,10 @@ private: int getFd() const { return fd_; } bool hasMessages() const { return !messages_.empty(); } bool shouldClose() const { return closeConnection_; } + size_t getEpollIndex() const { return epoll_index_; } const int fd_; const int64_t id_; + const size_t epoll_index_; // Index of the epoll instance this connection uses struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6 ArenaAllocator arena_; ConnectionHandler *handler_; diff --git a/src/server.cpp b/src/server.cpp index 6775c93..337d348 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -100,16 +100,19 @@ void Server::releaseBackToServer(std::unique_ptr connection) { // Try to get the server from the connection's weak_ptr if (auto server = connection->server_.lock()) { - // Server still exists - release raw pointer and let server take over - Connection *raw_conn = connection.release(); - server->receiveConnectionBack(raw_conn); + // Server still exists - pass unique_ptr directly + server->receiveConnectionBack(std::move(connection)); } // If server is gone, connection will be automatically cleaned up when // unique_ptr destructs } -void Server::receiveConnectionBack(Connection *connection) { +void Server::receiveConnectionBack(std::unique_ptr connection) { + if (!connection) { + return; // Nothing to process + } + // Re-add the connection to epoll for continued processing struct epoll_event event{}; @@ -120,17 +123,19 @@ void Server::receiveConnectionBack(Connection *connection) { } connection->tsan_release(); - event.data.fd = connection->getFd(); + int fd = connection->getFd(); + event.data.fd = fd; - // Distribute connections round-robin across epoll instances - size_t epoll_index = - connection_distribution_counter_.fetch_add(1, std::memory_order_relaxed) % - epoll_fds_.size(); + // Store connection in registry before adding to epoll + // This mirrors the pattern used in process_connection_batch + size_t epoll_index = connection->getEpollIndex(); int epollfd = epoll_fds_[epoll_index]; + connection_registry_.store(fd, std::move(connection)); - if (epoll_ctl(epollfd, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) { - perror("epoll_ctl ADD in receiveConnectionBack"); - delete connection; // Clean up on failure + if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { + perror("epoll_ctl MOD in receiveConnectionBack"); + // Remove from registry and clean up on failure + (void)connection_registry_.remove(fd); } } @@ -417,10 +422,11 @@ void Server::start_io_threads(std::vector &threads) { } // Transfer ownership from registry to batch processing + size_t epoll_index = thread_id % epoll_fds_.size(); batch[batch_count] = std::unique_ptr(new Connection( addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), - &handler_, weak_from_this())); + epoll_index, &handler_, weak_from_this())); batch_events[batch_count] = EPOLLIN; // New connections always start with read batch_count++; diff --git a/src/server.hpp b/src/server.hpp index 6073f7f..373a7b3 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -142,9 +142,9 @@ private: * This method is thread-safe and can be called from any thread. * The connection will be re-added to the epoll for continued processing. * - * @param connection Raw pointer to the connection being released back + * @param connection Unique pointer to the connection being released back */ - void receiveConnectionBack(Connection *connection); + void receiveConnectionBack(std::unique_ptr connection); // Make non-copyable and non-movable Server(const Server &) = delete;