From 368ec721d531f5b649ded2121b272c416ecd096a Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 18 Aug 2025 15:43:45 -0400 Subject: [PATCH] Full pass over src/main.cpp --- src/main.cpp | 132 +++++++++++++++++++++------------------------------ 1 file changed, 54 insertions(+), 78 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 06fba1c..1c178fb 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -110,19 +110,13 @@ int getAcceptFd(int listenFd, struct sockaddr_storage *addr) { return fd; } -// Connection lifecycle. Only one of these is the case at a time -// - Created on an accept thread from a call to accept -// - Waiting on connection fd to be readable/writable -// - Owned by a network thread, which drains readable and writable bytes -// - Owned by a thread in the request processing pipeline -// - Closed by a network thread according to http protocol -// // Since only one thread owns a connection at a time, no synchronization is // necessary // Connection ownership model: // - Created by accept thread, transferred to epoll via raw pointer // - Network threads claim ownership by wrapping raw pointer in unique_ptr -// - Network threads transfer back to epoll by releasing unique_ptr to raw +// - Network thread optionally passes ownership to a thread pipeline +// - Owner eventually transfers back to epoll by releasing unique_ptr to raw // pointer // - RAII cleanup happens if network thread doesn't transfer back struct Connection { @@ -268,7 +262,7 @@ int main(int argc, char *argv[]) { shutdown_eventfd = eventfd(0, EFD_CLOEXEC); if (shutdown_eventfd == -1) { perror("eventfd"); - return 1; + abort(); } signal(SIGPIPE, SIG_IGN); @@ -278,8 +272,8 @@ int main(int argc, char *argv[]) { int sockfd = getListenFd(config->server.bind_address.c_str(), std::to_string(config->server.port).c_str()); std::vector threads; - int epollfd = epoll_create1(EPOLL_CLOEXEC); - if (epollfd == -1) { + int network_epollfd = epoll_create1(EPOLL_CLOEXEC); + if (network_epollfd == -1) { perror("epoll_create"); abort(); } @@ -287,39 +281,30 @@ int main(int argc, char *argv[]) { struct epoll_event shutdown_event; shutdown_event.events = EPOLLIN; shutdown_event.data.fd = shutdown_eventfd; - epoll_ctl(epollfd, EPOLL_CTL_ADD, shutdown_eventfd, &shutdown_event); + epoll_ctl(network_epollfd, EPOLL_CTL_ADD, shutdown_eventfd, &shutdown_event); std::atomic connectionId{0}; // Network threads from configuration int networkThreads = config->server.network_threads; - // Event batch size from configuration for (int i = 0; i < networkThreads; ++i) { threads.emplace_back( - [epollfd, i, max_request_size = config->server.max_request_size_bytes, + [network_epollfd, i, + max_request_size = config->server.max_request_size_bytes, event_batch_size = config->server.event_batch_size]() { pthread_setname_np(pthread_self(), ("network-" + std::to_string(i)).c_str()); - while (true) { - std::vector events(event_batch_size); - int eventCount; - for (;;) { - eventCount = epoll_wait(epollfd, events.data(), event_batch_size, - -1 /* no timeout */); - if (eventCount == -1) { - if (errno == EINTR) { - continue; - } - perror("epoll_wait"); - abort(); + std::vector events(event_batch_size); + for (;;) { + int eventCount = epoll_wait(network_epollfd, events.data(), + event_batch_size, -1 /* no timeout */); + if (eventCount == -1) { + if (errno == EINTR) { + continue; } - break; - } - - if (eventCount == 0) { - // Timeout occurred, check shutdown flag again - continue; + perror("epoll_wait"); + abort(); } for (int i = 0; i < eventCount; ++i) { @@ -355,8 +340,6 @@ int main(int argc, char *argv[]) { } if (conn->tasks.empty()) { - // Transfer back to epoll instance. This thread or another - // thread will wake when fd is ready events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; @@ -365,7 +348,7 @@ int main(int argc, char *argv[]) { conn->tsan_release(); events[i].data.ptr = conn.release(); // epoll now owns the connection - int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); + int e = epoll_ctl(network_epollfd, EPOLL_CTL_MOD, fd, &events[i]); if (e == -1) { perror("epoll_ctl"); abort(); // Process termination - OS cleans up leaked connection @@ -377,42 +360,37 @@ int main(int argc, char *argv[]) { // Accept threads from configuration int acceptThreads = config->server.accept_threads; + // epoll instance for accept threads + int accept_epollfd = epoll_create1(EPOLL_CLOEXEC); + if (accept_epollfd == -1) { + perror("epoll_create1"); + abort(); + } + + // Add shutdown eventfd to accept epoll + if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_eventfd, + &shutdown_event) == -1) { + perror("epoll_ctl shutdown eventfd"); + abort(); + } + + // Add listen socket to accept epoll + struct epoll_event listen_event; + listen_event.events = EPOLLIN; + listen_event.data.fd = sockfd; + if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, sockfd, &listen_event) == -1) { + perror("epoll_ctl listen socket"); + abort(); + } + for (int i = 0; i < acceptThreads; ++i) { - threads.emplace_back([epollfd, i, sockfd, &connectionId, - max_connections = config->server.max_connections]() { + threads.emplace_back([network_epollfd, i, sockfd, &connectionId, + max_connections = config->server.max_connections, + accept_epollfd]() { pthread_setname_np(pthread_self(), ("accept-" + std::to_string(i)).c_str()); - // Create dedicated epoll instance for accept thread - int accept_epollfd = epoll_create1(EPOLL_CLOEXEC); - if (accept_epollfd == -1) { - perror("epoll_create1"); - return; - } - - // Add listen socket to accept epoll - struct epoll_event listen_event; - listen_event.events = EPOLLIN; - listen_event.data.fd = sockfd; - if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, sockfd, &listen_event) == - -1) { - perror("epoll_ctl listen socket"); - close(accept_epollfd); - return; - } - - // Add shutdown eventfd to accept epoll - struct epoll_event shutdown_event; - shutdown_event.events = EPOLLIN; - shutdown_event.data.fd = shutdown_eventfd; - if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_eventfd, - &shutdown_event) == -1) { - perror("epoll_ctl shutdown eventfd"); - close(accept_epollfd); - return; - } - - while (true) { + for (;;) { struct epoll_event events[2]; // listen socket + shutdown eventfd int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */); @@ -420,22 +398,18 @@ int main(int argc, char *argv[]) { if (errno == EINTR) continue; perror("epoll_wait"); - break; + abort(); } - if (ready == 0) - continue; // Timeout - check shutdown flag - for (int j = 0; j < ready; ++j) { if (events[j].data.fd == shutdown_eventfd) { // Don't read - let other threads see it too - close(accept_epollfd); return; } if (events[j].data.fd == sockfd) { // Listen socket ready - accept connections - while (true) { + for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); int fd = accept4(sockfd, (struct sockaddr *)&addr, &addrlen, @@ -447,10 +421,11 @@ int main(int argc, char *argv[]) { if (errno == EINTR) continue; perror("accept4"); - break; + abort(); } - // Check connection limit (0 means unlimited) + // Check connection limit (0 means unlimited). Limiting + // connections is best effort but it's the best we can do. if (max_connections > 0 && activeConnections.load(std::memory_order_relaxed) >= max_connections) { @@ -469,10 +444,10 @@ int main(int argc, char *argv[]) { conn->tsan_release(); event.data.ptr = conn.release(); // network epoll now owns the connection - int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); + int e = epoll_ctl(network_epollfd, EPOLL_CTL_ADD, fd, &event); if (e == -1) { perror("epoll_ctl"); - // TODO: Better error handling - connection will be leaked + abort(); } } } @@ -489,7 +464,8 @@ int main(int argc, char *argv[]) { // Cleanup close(shutdown_eventfd); - close(epollfd); + close(accept_epollfd); + close(network_epollfd); close(sockfd); return 0;