Full pass over src/main.cpp

This commit is contained in:
2025-08-18 15:43:45 -04:00
parent 8ef118a17d
commit 368ec721d5

View File

@@ -110,19 +110,13 @@ int getAcceptFd(int listenFd, struct sockaddr_storage *addr) {
return fd; return fd;
} }
// Connection lifecycle. Only one of these is the case at a time
// - Created on an accept thread from a call to accept
// - Waiting on connection fd to be readable/writable
// - Owned by a network thread, which drains readable and writable bytes
// - Owned by a thread in the request processing pipeline
// - Closed by a network thread according to http protocol
//
// Since only one thread owns a connection at a time, no synchronization is // Since only one thread owns a connection at a time, no synchronization is
// necessary // necessary
// Connection ownership model: // Connection ownership model:
// - Created by accept thread, transferred to epoll via raw pointer // - Created by accept thread, transferred to epoll via raw pointer
// - Network threads claim ownership by wrapping raw pointer in unique_ptr // - Network threads claim ownership by wrapping raw pointer in unique_ptr
// - Network threads transfer back to epoll by releasing unique_ptr to raw // - Network thread optionally passes ownership to a thread pipeline
// - Owner eventually transfers back to epoll by releasing unique_ptr to raw
// pointer // pointer
// - RAII cleanup happens if network thread doesn't transfer back // - RAII cleanup happens if network thread doesn't transfer back
struct Connection { struct Connection {
@@ -268,7 +262,7 @@ int main(int argc, char *argv[]) {
shutdown_eventfd = eventfd(0, EFD_CLOEXEC); shutdown_eventfd = eventfd(0, EFD_CLOEXEC);
if (shutdown_eventfd == -1) { if (shutdown_eventfd == -1) {
perror("eventfd"); perror("eventfd");
return 1; abort();
} }
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
@@ -278,8 +272,8 @@ int main(int argc, char *argv[]) {
int sockfd = getListenFd(config->server.bind_address.c_str(), int sockfd = getListenFd(config->server.bind_address.c_str(),
std::to_string(config->server.port).c_str()); std::to_string(config->server.port).c_str());
std::vector<std::thread> threads; std::vector<std::thread> threads;
int epollfd = epoll_create1(EPOLL_CLOEXEC); int network_epollfd = epoll_create1(EPOLL_CLOEXEC);
if (epollfd == -1) { if (network_epollfd == -1) {
perror("epoll_create"); perror("epoll_create");
abort(); abort();
} }
@@ -287,26 +281,24 @@ int main(int argc, char *argv[]) {
struct epoll_event shutdown_event; struct epoll_event shutdown_event;
shutdown_event.events = EPOLLIN; shutdown_event.events = EPOLLIN;
shutdown_event.data.fd = shutdown_eventfd; shutdown_event.data.fd = shutdown_eventfd;
epoll_ctl(epollfd, EPOLL_CTL_ADD, shutdown_eventfd, &shutdown_event); epoll_ctl(network_epollfd, EPOLL_CTL_ADD, shutdown_eventfd, &shutdown_event);
std::atomic<int64_t> connectionId{0}; std::atomic<int64_t> connectionId{0};
// Network threads from configuration // Network threads from configuration
int networkThreads = config->server.network_threads; int networkThreads = config->server.network_threads;
// Event batch size from configuration
for (int i = 0; i < networkThreads; ++i) { for (int i = 0; i < networkThreads; ++i) {
threads.emplace_back( threads.emplace_back(
[epollfd, i, max_request_size = config->server.max_request_size_bytes, [network_epollfd, i,
max_request_size = config->server.max_request_size_bytes,
event_batch_size = config->server.event_batch_size]() { event_batch_size = config->server.event_batch_size]() {
pthread_setname_np(pthread_self(), pthread_setname_np(pthread_self(),
("network-" + std::to_string(i)).c_str()); ("network-" + std::to_string(i)).c_str());
while (true) {
std::vector<struct epoll_event> events(event_batch_size); std::vector<struct epoll_event> events(event_batch_size);
int eventCount;
for (;;) { for (;;) {
eventCount = epoll_wait(epollfd, events.data(), event_batch_size, int eventCount = epoll_wait(network_epollfd, events.data(),
-1 /* no timeout */); event_batch_size, -1 /* no timeout */);
if (eventCount == -1) { if (eventCount == -1) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@@ -314,13 +306,6 @@ int main(int argc, char *argv[]) {
perror("epoll_wait"); perror("epoll_wait");
abort(); abort();
} }
break;
}
if (eventCount == 0) {
// Timeout occurred, check shutdown flag again
continue;
}
for (int i = 0; i < eventCount; ++i) { for (int i = 0; i < eventCount; ++i) {
// Check for shutdown event // Check for shutdown event
@@ -355,8 +340,6 @@ int main(int argc, char *argv[]) {
} }
if (conn->tasks.empty()) { if (conn->tasks.empty()) {
// Transfer back to epoll instance. This thread or another
// thread will wake when fd is ready
events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else { } else {
events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
@@ -365,7 +348,7 @@ int main(int argc, char *argv[]) {
conn->tsan_release(); conn->tsan_release();
events[i].data.ptr = events[i].data.ptr =
conn.release(); // epoll now owns the connection conn.release(); // epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); int e = epoll_ctl(network_epollfd, EPOLL_CTL_MOD, fd, &events[i]);
if (e == -1) { if (e == -1) {
perror("epoll_ctl"); perror("epoll_ctl");
abort(); // Process termination - OS cleans up leaked connection abort(); // Process termination - OS cleans up leaked connection
@@ -377,42 +360,37 @@ int main(int argc, char *argv[]) {
// Accept threads from configuration // Accept threads from configuration
int acceptThreads = config->server.accept_threads; int acceptThreads = config->server.accept_threads;
for (int i = 0; i < acceptThreads; ++i) { // epoll instance for accept threads
threads.emplace_back([epollfd, i, sockfd, &connectionId,
max_connections = config->server.max_connections]() {
pthread_setname_np(pthread_self(),
("accept-" + std::to_string(i)).c_str());
// Create dedicated epoll instance for accept thread
int accept_epollfd = epoll_create1(EPOLL_CLOEXEC); int accept_epollfd = epoll_create1(EPOLL_CLOEXEC);
if (accept_epollfd == -1) { if (accept_epollfd == -1) {
perror("epoll_create1"); perror("epoll_create1");
return; abort();
}
// Add shutdown eventfd to accept epoll
if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_eventfd,
&shutdown_event) == -1) {
perror("epoll_ctl shutdown eventfd");
abort();
} }
// Add listen socket to accept epoll // Add listen socket to accept epoll
struct epoll_event listen_event; struct epoll_event listen_event;
listen_event.events = EPOLLIN; listen_event.events = EPOLLIN;
listen_event.data.fd = sockfd; listen_event.data.fd = sockfd;
if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, sockfd, &listen_event) == if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, sockfd, &listen_event) == -1) {
-1) {
perror("epoll_ctl listen socket"); perror("epoll_ctl listen socket");
close(accept_epollfd); abort();
return;
} }
// Add shutdown eventfd to accept epoll for (int i = 0; i < acceptThreads; ++i) {
struct epoll_event shutdown_event; threads.emplace_back([network_epollfd, i, sockfd, &connectionId,
shutdown_event.events = EPOLLIN; max_connections = config->server.max_connections,
shutdown_event.data.fd = shutdown_eventfd; accept_epollfd]() {
if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_eventfd, pthread_setname_np(pthread_self(),
&shutdown_event) == -1) { ("accept-" + std::to_string(i)).c_str());
perror("epoll_ctl shutdown eventfd");
close(accept_epollfd);
return;
}
while (true) { for (;;) {
struct epoll_event events[2]; // listen socket + shutdown eventfd struct epoll_event events[2]; // listen socket + shutdown eventfd
int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */); int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */);
@@ -420,22 +398,18 @@ int main(int argc, char *argv[]) {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
perror("epoll_wait"); perror("epoll_wait");
break; abort();
} }
if (ready == 0)
continue; // Timeout - check shutdown flag
for (int j = 0; j < ready; ++j) { for (int j = 0; j < ready; ++j) {
if (events[j].data.fd == shutdown_eventfd) { if (events[j].data.fd == shutdown_eventfd) {
// Don't read - let other threads see it too // Don't read - let other threads see it too
close(accept_epollfd);
return; return;
} }
if (events[j].data.fd == sockfd) { if (events[j].data.fd == sockfd) {
// Listen socket ready - accept connections // Listen socket ready - accept connections
while (true) { for (;;) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr); socklen_t addrlen = sizeof(addr);
int fd = accept4(sockfd, (struct sockaddr *)&addr, &addrlen, int fd = accept4(sockfd, (struct sockaddr *)&addr, &addrlen,
@@ -447,10 +421,11 @@ int main(int argc, char *argv[]) {
if (errno == EINTR) if (errno == EINTR)
continue; continue;
perror("accept4"); perror("accept4");
break; abort();
} }
// Check connection limit (0 means unlimited) // Check connection limit (0 means unlimited). Limiting
// connections is best effort but it's the best we can do.
if (max_connections > 0 && if (max_connections > 0 &&
activeConnections.load(std::memory_order_relaxed) >= activeConnections.load(std::memory_order_relaxed) >=
max_connections) { max_connections) {
@@ -469,10 +444,10 @@ int main(int argc, char *argv[]) {
conn->tsan_release(); conn->tsan_release();
event.data.ptr = event.data.ptr =
conn.release(); // network epoll now owns the connection conn.release(); // network epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); int e = epoll_ctl(network_epollfd, EPOLL_CTL_ADD, fd, &event);
if (e == -1) { if (e == -1) {
perror("epoll_ctl"); perror("epoll_ctl");
// TODO: Better error handling - connection will be leaked abort();
} }
} }
} }
@@ -489,7 +464,8 @@ int main(int argc, char *argv[]) {
// Cleanup // Cleanup
close(shutdown_eventfd); close(shutdown_eventfd);
close(epollfd); close(accept_epollfd);
close(network_epollfd);
close(sockfd); close(sockfd);
return 0; return 0;