Shutdown network threads after connect threads

Fixes ctrl-c, which was previously hanging in sem_wait
This commit is contained in:
2025-08-20 14:56:36 -04:00
parent e342229743
commit 7f4c024efa

View File

@@ -23,6 +23,10 @@
#include <llhttp.h> #include <llhttp.h>
#ifndef __has_feature
#define __has_feature(x) 0
#endif
// Use shared perfetto categories // Use shared perfetto categories
#include "../src/perfetto_categories.hpp" #include "../src/perfetto_categories.hpp"
@@ -139,6 +143,7 @@ sem_t connectionLimit;
// Shutdown mechanism // Shutdown mechanism
std::atomic<bool> g_shutdown{false}; std::atomic<bool> g_shutdown{false};
std::atomic<int> g_connect_threads{0};
void signal_handler(int sig) { void signal_handler(int sig) {
if (sig == SIGTERM || sig == SIGINT) { if (sig == SIGTERM || sig == SIGINT) {
@@ -167,13 +172,44 @@ struct Connection {
uint64_t requestId = 0; uint64_t requestId = 0;
void initRequest() { void initRequest() {
requestId = nextRequestId.fetch_add(1, std::memory_order_relaxed); requestId = nextRequestId.fetch_add(1, std::memory_order_relaxed);
int len = snprintf(buf, sizeof(buf),
"GET /ok HTTP/1.1\r\nX-Request-Id: %" PRIu64 "\r\n\r\n", // Fast manual string construction - avoid snprintf overhead
requestId); const char *prefix = "GET /ok HTTP/1.1\r\nX-Request-Id: ";
if (len == -1 || len > int(sizeof(buf))) { const char *suffix = "\r\n\r\n";
// Copy prefix
char *p = buf;
const char *src = prefix;
while (*src)
*p++ = *src++;
// Convert requestId to string directly
if (requestId == 0) {
*p++ = '0';
} else {
char digits[20]; // uint64_t max is 20 digits
int digit_count = 0;
uint64_t temp = requestId;
while (temp > 0) {
digits[digit_count++] = '0' + (temp % 10);
temp /= 10;
}
// Copy digits in reverse order
for (int i = digit_count - 1; i >= 0; i--) {
*p++ = digits[i];
}
}
// Copy suffix
src = suffix;
while (*src)
*p++ = *src++;
size_t len = p - buf;
if (len >= sizeof(buf)) {
abort(); abort();
} }
request = std::string_view{buf, size_t(len)}; request = std::string_view{buf, len};
} }
Connection(int fd, int64_t id) : fd(fd), id(id) { Connection(int fd, int64_t id) : fd(fd), id(id) {
@@ -534,6 +570,7 @@ int main(int argc, char *argv[]) {
abort(); abort();
} }
std::atomic<int64_t> connectionId{0}; std::atomic<int64_t> connectionId{0};
g_connect_threads.store(g_config.connect_threads, std::memory_order_relaxed);
std::vector<std::thread> threads; std::vector<std::thread> threads;
@@ -541,7 +578,7 @@ int main(int argc, char *argv[]) {
threads.emplace_back([epollfd, i]() { threads.emplace_back([epollfd, i]() {
pthread_setname_np(pthread_self(), pthread_setname_np(pthread_self(),
("network-" + std::to_string(i)).c_str()); ("network-" + std::to_string(i)).c_str());
while (!g_shutdown.load(std::memory_order_relaxed)) { while (g_connect_threads.load() != 0) {
struct epoll_event events[256]; // Use a reasonable max size struct epoll_event events[256]; // Use a reasonable max size
int batch_size = std::min(int(sizeof(events) / sizeof(events[0])), int batch_size = std::min(int(sizeof(events) / sizeof(events[0])),
g_config.event_batch_size); g_config.event_batch_size);
@@ -660,6 +697,7 @@ int main(int argc, char *argv[]) {
continue; continue;
} }
} }
g_connect_threads.fetch_sub(1);
}); });
} }