Graceful shutdown and config

This commit is contained in:
2025-08-18 14:03:51 -04:00
parent 224d2cf708
commit 78e3845130
4 changed files with 128 additions and 68 deletions

View File

@@ -5,6 +5,12 @@ bind_address = "127.0.0.1"
port = 8080 port = 8080
# Maximum request size in bytes (for 413 Content Too Large responses) # Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of accept threads for handling incoming connections
accept_threads = 1
# Number of network I/O threads for epoll processing (0 = use hardware concurrency)
network_threads = 0
# Event batch size for epoll processing
event_batch_size = 32
[commit] [commit]
# Minimum length for request_id to ensure sufficient entropy # Minimum length for request_id to ensure sufficient entropy

View File

@@ -74,6 +74,9 @@ void ConfigParser::parse_server_config(const auto &toml_data,
parse_field(srv, "bind_address", config.bind_address); parse_field(srv, "bind_address", config.bind_address);
parse_field(srv, "port", config.port); parse_field(srv, "port", config.port);
parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes); parse_field(srv, "max_request_size_bytes", config.max_request_size_bytes);
parse_field(srv, "accept_threads", config.accept_threads);
parse_field(srv, "network_threads", config.network_threads);
parse_field(srv, "event_batch_size", config.event_batch_size);
}); });
} }

View File

@@ -16,6 +16,12 @@ struct ServerConfig {
int port = 8080; int port = 8080;
/// Maximum size in bytes for incoming HTTP requests (default: 1MB) /// Maximum size in bytes for incoming HTTP requests (default: 1MB)
size_t max_request_size_bytes = 1024 * 1024; size_t max_request_size_bytes = 1024 * 1024;
/// Number of accept threads for handling incoming connections
int accept_threads = 1;
/// Number of network I/O threads for epoll processing
int network_threads = 0; // 0 means use hardware_concurrency
/// Event batch size for epoll processing
int event_batch_size = 32;
}; };
/** /**

View File

@@ -6,6 +6,7 @@
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <deque> #include <deque>
#include <fcntl.h>
#include <inttypes.h> #include <inttypes.h>
#include <iostream> #include <iostream>
#include <netdb.h> #include <netdb.h>
@@ -16,11 +17,20 @@
#include <sys/un.h> #include <sys/un.h>
#include <thread> #include <thread>
#include <unistd.h> #include <unistd.h>
#include <vector>
std::atomic<bool> shutdown_requested{false};
#ifndef __has_feature #ifndef __has_feature
#define __has_feature(x) 0 #define __has_feature(x) 0
#endif #endif
void signal_handler(int sig) {
if (sig == SIGTERM || sig == SIGINT) {
shutdown_requested.store(true, std::memory_order_relaxed);
}
}
// Adapted from getaddrinfo man page // Adapted from getaddrinfo man page
int getListenFd(const char *node, const char *service) { int getListenFd(const char *node, const char *service) {
@@ -57,6 +67,12 @@ int getListenFd(const char *node, const char *service) {
int val = 1; int val = 1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
// Set socket to non-blocking for graceful shutdown
int flags = fcntl(sfd, F_GETFL, 0);
if (flags != -1) {
fcntl(sfd, F_SETFL, flags | O_NONBLOCK);
}
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */ break; /* Success */
} }
@@ -126,11 +142,13 @@ struct Connection {
std::deque<Task> tasks; std::deque<Task> tasks;
void readBytes() { void readBytes(size_t max_request_size) {
// Use smaller buffer size but respect max request size
// TODO revisit
size_t buf_size = std::min(size_t(4096), max_request_size);
std::vector<char> buf(buf_size);
for (;;) { for (;;) {
// TODO make size configurable int r = read(fd, buf.data(), buf.size());
char buf[1024];
int r = read(fd, buf, sizeof(buf));
if (r == -1) { if (r == -1) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue;
@@ -144,8 +162,9 @@ struct Connection {
if (r == 0) { if (r == 0) {
goto close_connection; goto close_connection;
} }
// pump parser // "pump parser"
tasks.emplace_back(std::string{buf, size_t(r)}); // TODO revisit
tasks.emplace_back(std::string{buf.data(), size_t(r)});
} }
close_connection: close_connection:
tasks.emplace_back(std::string{}, true); tasks.emplace_back(std::string{}, true);
@@ -213,6 +232,11 @@ int main(int argc, char *argv[]) {
std::cout << "Server port: " << config->server.port << std::endl; std::cout << "Server port: " << config->server.port << std::endl;
std::cout << "Max request size: " << config->server.max_request_size_bytes std::cout << "Max request size: " << config->server.max_request_size_bytes
<< " bytes" << std::endl; << " bytes" << std::endl;
std::cout << "Accept threads: " << config->server.accept_threads << std::endl;
std::cout << "Network threads: " << config->server.network_threads
<< " (0 = auto)" << std::endl;
std::cout << "Event batch size: " << config->server.event_batch_size
<< std::endl;
std::cout << "Min request ID length: " << config->commit.min_request_id_length std::cout << "Min request ID length: " << config->commit.min_request_id_length
<< std::endl; << std::endl;
std::cout << "Request ID retention: " std::cout << "Request ID retention: "
@@ -226,6 +250,8 @@ int main(int argc, char *argv[]) {
<< std::endl; << std::endl;
signal(SIGPIPE, SIG_IGN); signal(SIGPIPE, SIG_IGN);
signal(SIGTERM, signal_handler);
signal(SIGINT, signal_handler);
int sockfd = getListenFd(config->server.bind_address.c_str(), int sockfd = getListenFd(config->server.bind_address.c_str(),
std::to_string(config->server.port).c_str()); std::to_string(config->server.port).c_str());
@@ -236,89 +262,108 @@ int main(int argc, char *argv[]) {
abort(); abort();
} }
// Network threads // Network threads - use config value, fallback to hardware concurrency
// TODO make configurable int networkThreads = config->server.network_threads;
int networkThreads = 1; if (networkThreads == 0) {
// TODO make configurable // TODO revisit
constexpr int kEventBatchSize = 10; networkThreads = std::thread::hardware_concurrency();
if (networkThreads == 0)
networkThreads = 1; // ultimate fallback
}
// Event batch size from configuration
for (int i = 0; i < networkThreads; ++i) { for (int i = 0; i < networkThreads; ++i) {
threads.emplace_back([epollfd, i]() { threads.emplace_back(
pthread_setname_np(pthread_self(), [epollfd, i, max_request_size = config->server.max_request_size_bytes,
("network-" + std::to_string(i)).c_str()); event_batch_size = config->server.event_batch_size]() {
for (;;) { pthread_setname_np(pthread_self(),
struct epoll_event events[kEventBatchSize]{}; ("network-" + std::to_string(i)).c_str());
int eventCount; while (!shutdown_requested.load(std::memory_order_relaxed)) {
for (;;) { std::vector<struct epoll_event> events(event_batch_size);
eventCount = int eventCount;
epoll_wait(epollfd, events, kEventBatchSize, /*no timeout*/ -1); for (;;) {
if (eventCount == -1) { eventCount = epoll_wait(epollfd, events.data(), event_batch_size,
if (errno == EINTR) { 1000 /* 1 second timeout */);
if (eventCount == -1) {
if (errno == EINTR) {
continue;
}
perror("epoll_wait");
abort();
}
break;
}
if (eventCount == 0) {
// Timeout occurred, check shutdown flag again
continue; continue;
} }
perror("epoll_wait");
abort();
}
break;
}
for (int i = 0; i < eventCount; ++i) { for (int i = 0; i < eventCount; ++i) {
// Take ownership from epoll: raw pointer -> unique_ptr // Take ownership from epoll: raw pointer -> unique_ptr
std::unique_ptr<Connection> conn{ std::unique_ptr<Connection> conn{
static_cast<Connection *>(events[i].data.ptr)}; static_cast<Connection *>(events[i].data.ptr)};
conn->tsan_acquire(); conn->tsan_acquire();
events[i].data.ptr = nullptr; // Clear epoll pointer (we own it now) events[i].data.ptr =
const int fd = conn->fd; nullptr; // Clear epoll pointer (we own it now)
const int fd = conn->fd;
if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
// Connection closed or error occurred - unique_ptr destructor // Connection closed or error occurred - unique_ptr destructor
// cleans up // cleans up
continue; continue;
} }
if (events[i].events & EPOLLIN) { if (events[i].events & EPOLLIN) {
conn->readBytes(); conn->readBytes(max_request_size);
} }
if (events[i].events & EPOLLOUT) { if (events[i].events & EPOLLOUT) {
bool done = conn->writeBytes(); bool done = conn->writeBytes();
if (done) { if (done) {
continue; continue;
}
}
if (conn->tasks.empty()) {
// Transfer back to epoll instance. This thread or another
// thread will wake when fd is ready
events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
// Transfer ownership back to epoll: unique_ptr -> raw pointer
conn->tsan_release();
events[i].data.ptr =
conn.release(); // epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]);
if (e == -1) {
perror("epoll_ctl");
abort(); // Process termination - OS cleans up leaked connection
}
} }
} }
});
if (conn->tasks.empty()) {
// Transfer back to epoll instance. This thread or another thread
// will wake when fd is ready
events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else {
events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
}
// Transfer ownership back to epoll: unique_ptr -> raw pointer
conn->tsan_release();
events[i].data.ptr = conn.release(); // epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]);
if (e == -1) {
perror("epoll_ctl");
abort(); // Process termination - OS cleans up leaked connection
}
}
}
});
} }
std::atomic<int64_t> connectionId{0}; std::atomic<int64_t> connectionId{0};
// TODO make configurable // Accept threads from configuration
int acceptThreads = 1; int acceptThreads = config->server.accept_threads;
for (int i = 0; i < acceptThreads; ++i) { for (int i = 0; i < acceptThreads; ++i) {
threads.emplace_back([epollfd, i, sockfd, &connectionId]() { threads.emplace_back([epollfd, i, sockfd, &connectionId]() {
pthread_setname_np(pthread_self(), pthread_setname_np(pthread_self(),
("accept-" + std::to_string(i)).c_str()); ("accept-" + std::to_string(i)).c_str());
// Call accept in a loop // Call accept in a loop
for (;;) { while (!shutdown_requested.load(std::memory_order_relaxed)) {
struct sockaddr_storage addr; struct sockaddr_storage addr;
int fd = getAcceptFd(sockfd, &addr); int fd = getAcceptFd(sockfd, &addr);
if (fd == -1) { if (fd == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// TODO revisit
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}
perror("accept4"); perror("accept4");
continue; continue;
} }