diff --git a/config.toml b/config.toml index 00aaef6..b61344f 100644 --- a/config.toml +++ b/config.toml @@ -5,6 +5,12 @@ bind_address = "127.0.0.1" port = 8080 # Maximum request size in bytes (for 413 Content Too Large responses) max_request_size_bytes = 1048576 # 1MB +# Number of accept threads for handling incoming connections +accept_threads = 1 +# Number of network I/O threads for epoll processing (0 = use hardware concurrency) +network_threads = 0 +# Event batch size for epoll processing +event_batch_size = 32 [commit] # Minimum length for request_id to ensure sufficient entropy diff --git a/src/config.cpp b/src/config.cpp index 940eb0e..82f7b92 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -74,6 +74,9 @@ void ConfigParser::parse_server_config(const auto &toml_data, parse_field(srv, "bind_address", config.bind_address); parse_field(srv, "port", config.port); parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes); + parse_field(srv, "accept_threads", config.accept_threads); + parse_field(srv, "network_threads", config.network_threads); + parse_field(srv, "event_batch_size", config.event_batch_size); }); } diff --git a/src/config.hpp b/src/config.hpp index c0bf44b..15c0085 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -16,6 +16,12 @@ struct ServerConfig { int port = 8080; /// Maximum size in bytes for incoming HTTP requests (default: 1MB) size_t max_request_size_bytes = 1024 * 1024; + /// Number of accept threads for handling incoming connections + int accept_threads = 1; + /// Number of network I/O threads for epoll processing + int network_threads = 0; // 0 means use hardware_concurrency + /// Event batch size for epoll processing + int event_batch_size = 32; }; /** diff --git a/src/main.cpp b/src/main.cpp index 20b372d..57fc10b 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -16,11 +17,20 @@ #include #include #include +#include + +std::atomic shutdown_requested{false}; #ifndef __has_feature #define __has_feature(x) 0 #endif +void signal_handler(int sig) { + if (sig == SIGTERM || sig == SIGINT) { + shutdown_requested.store(true, std::memory_order_relaxed); + } +} + // Adapted from getaddrinfo man page int getListenFd(const char *node, const char *service) { @@ -57,6 +67,12 @@ int getListenFd(const char *node, const char *service) { int val = 1; setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); + // Set socket to non-blocking for graceful shutdown + int flags = fcntl(sfd, F_GETFL, 0); + if (flags != -1) { + fcntl(sfd, F_SETFL, flags | O_NONBLOCK); + } + if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } @@ -126,11 +142,13 @@ struct Connection { std::deque tasks; - void readBytes() { + void readBytes(size_t max_request_size) { + // Use smaller buffer size but respect max request size + // TODO revisit + size_t buf_size = std::min(size_t(4096), max_request_size); + std::vector buf(buf_size); for (;;) { - // TODO make size configurable - char buf[1024]; - int r = read(fd, buf, sizeof(buf)); + int r = read(fd, buf.data(), buf.size()); if (r == -1) { if (errno == EINTR) { continue; @@ -144,8 +162,9 @@ struct Connection { if (r == 0) { goto close_connection; } - // pump parser - tasks.emplace_back(std::string{buf, size_t(r)}); + // "pump parser" + // TODO revisit + tasks.emplace_back(std::string{buf.data(), size_t(r)}); } close_connection: tasks.emplace_back(std::string{}, true); @@ -213,6 +232,11 @@ int main(int argc, char *argv[]) { std::cout << "Server port: " << config->server.port << std::endl; std::cout << "Max request size: " << config->server.max_request_size_bytes << " bytes" << std::endl; + std::cout << "Accept threads: " << config->server.accept_threads << std::endl; + std::cout << "Network threads: " << config->server.network_threads + << " (0 = auto)" << std::endl; + std::cout << "Event batch size: " << config->server.event_batch_size + << std::endl; std::cout << "Min request ID length: " << config->commit.min_request_id_length << std::endl; std::cout << "Request ID retention: " @@ -226,6 +250,8 @@ int main(int argc, char *argv[]) { << std::endl; signal(SIGPIPE, SIG_IGN); + signal(SIGTERM, signal_handler); + signal(SIGINT, signal_handler); int sockfd = getListenFd(config->server.bind_address.c_str(), std::to_string(config->server.port).c_str()); @@ -236,89 +262,108 @@ int main(int argc, char *argv[]) { abort(); } - // Network threads - // TODO make configurable - int networkThreads = 1; - // TODO make configurable - constexpr int kEventBatchSize = 10; + // Network threads - use config value, fallback to hardware concurrency + int networkThreads = config->server.network_threads; + if (networkThreads == 0) { + // TODO revisit + networkThreads = std::thread::hardware_concurrency(); + if (networkThreads == 0) + networkThreads = 1; // ultimate fallback + } + + // Event batch size from configuration for (int i = 0; i < networkThreads; ++i) { - threads.emplace_back([epollfd, i]() { - pthread_setname_np(pthread_self(), - ("network-" + std::to_string(i)).c_str()); - for (;;) { - struct epoll_event events[kEventBatchSize]{}; - int eventCount; - for (;;) { - eventCount = - epoll_wait(epollfd, events, kEventBatchSize, /*no timeout*/ -1); - if (eventCount == -1) { - if (errno == EINTR) { + threads.emplace_back( + [epollfd, i, max_request_size = config->server.max_request_size_bytes, + event_batch_size = config->server.event_batch_size]() { + pthread_setname_np(pthread_self(), + ("network-" + std::to_string(i)).c_str()); + while (!shutdown_requested.load(std::memory_order_relaxed)) { + std::vector events(event_batch_size); + int eventCount; + for (;;) { + eventCount = epoll_wait(epollfd, events.data(), event_batch_size, + 1000 /* 1 second timeout */); + if (eventCount == -1) { + if (errno == EINTR) { + continue; + } + perror("epoll_wait"); + abort(); + } + break; + } + + if (eventCount == 0) { + // Timeout occurred, check shutdown flag again continue; } - perror("epoll_wait"); - abort(); - } - break; - } - for (int i = 0; i < eventCount; ++i) { - // Take ownership from epoll: raw pointer -> unique_ptr - std::unique_ptr conn{ - static_cast(events[i].data.ptr)}; - conn->tsan_acquire(); - events[i].data.ptr = nullptr; // Clear epoll pointer (we own it now) - const int fd = conn->fd; + for (int i = 0; i < eventCount; ++i) { + // Take ownership from epoll: raw pointer -> unique_ptr + std::unique_ptr conn{ + static_cast(events[i].data.ptr)}; + conn->tsan_acquire(); + events[i].data.ptr = + nullptr; // Clear epoll pointer (we own it now) + const int fd = conn->fd; - if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { - // Connection closed or error occurred - unique_ptr destructor - // cleans up - continue; - } + if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + // Connection closed or error occurred - unique_ptr destructor + // cleans up + continue; + } - if (events[i].events & EPOLLIN) { - conn->readBytes(); - } + if (events[i].events & EPOLLIN) { + conn->readBytes(max_request_size); + } - if (events[i].events & EPOLLOUT) { - bool done = conn->writeBytes(); - if (done) { - continue; + if (events[i].events & EPOLLOUT) { + bool done = conn->writeBytes(); + if (done) { + continue; + } + } + + if (conn->tasks.empty()) { + // Transfer back to epoll instance. This thread or another + // thread will wake when fd is ready + events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + // Transfer ownership back to epoll: unique_ptr -> raw pointer + conn->tsan_release(); + events[i].data.ptr = + conn.release(); // epoll now owns the connection + int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); + if (e == -1) { + perror("epoll_ctl"); + abort(); // Process termination - OS cleans up leaked connection + } } } - - if (conn->tasks.empty()) { - // Transfer back to epoll instance. This thread or another thread - // will wake when fd is ready - events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } - // Transfer ownership back to epoll: unique_ptr -> raw pointer - conn->tsan_release(); - events[i].data.ptr = conn.release(); // epoll now owns the connection - int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); - if (e == -1) { - perror("epoll_ctl"); - abort(); // Process termination - OS cleans up leaked connection - } - } - } - }); + }); } std::atomic connectionId{0}; - // TODO make configurable - int acceptThreads = 1; + // Accept threads from configuration + int acceptThreads = config->server.accept_threads; for (int i = 0; i < acceptThreads; ++i) { threads.emplace_back([epollfd, i, sockfd, &connectionId]() { pthread_setname_np(pthread_self(), ("accept-" + std::to_string(i)).c_str()); // Call accept in a loop - for (;;) { + while (!shutdown_requested.load(std::memory_order_relaxed)) { struct sockaddr_storage addr; int fd = getAcceptFd(sockfd, &addr); if (fd == -1) { + if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { + // TODO revisit + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } perror("accept4"); continue; }