Use a pipe instead of an eventfd
This commit is contained in:
45
src/main.cpp
45
src/main.cpp
@@ -12,7 +12,6 @@
|
|||||||
#include <netdb.h>
|
#include <netdb.h>
|
||||||
#include <netinet/tcp.h>
|
#include <netinet/tcp.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
#include <sys/eventfd.h>
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <sys/un.h>
|
#include <sys/un.h>
|
||||||
@@ -21,7 +20,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
std::atomic<int> activeConnections{0};
|
std::atomic<int> activeConnections{0};
|
||||||
int shutdown_eventfd = -1;
|
int shutdown_pipe[2] = {-1, -1};
|
||||||
|
|
||||||
#ifndef __has_feature
|
#ifndef __has_feature
|
||||||
#define __has_feature(x) 0
|
#define __has_feature(x) 0
|
||||||
@@ -29,11 +28,11 @@ int shutdown_eventfd = -1;
|
|||||||
|
|
||||||
void signal_handler(int sig) {
|
void signal_handler(int sig) {
|
||||||
if (sig == SIGTERM || sig == SIGINT) {
|
if (sig == SIGTERM || sig == SIGINT) {
|
||||||
if (shutdown_eventfd != -1) {
|
if (shutdown_pipe[1] != -1) {
|
||||||
char val = 1;
|
char val = 1;
|
||||||
// write() is async-signal-safe per POSIX - safe to use in signal handler
|
// write() is async-signal-safe per POSIX - safe to use in signal handler
|
||||||
// Write single byte to avoid partial write complexity
|
// Write single byte to avoid partial write complexity
|
||||||
while (write(shutdown_eventfd, &val, 1) == -1) {
|
while (write(shutdown_pipe[1], &val, 1) == -1) {
|
||||||
if (errno != EINTR) {
|
if (errno != EINTR) {
|
||||||
abort(); // graceful shutdown didn't work. Let's go ungraceful.
|
abort(); // graceful shutdown didn't work. Let's go ungraceful.
|
||||||
}
|
}
|
||||||
@@ -266,10 +265,15 @@ int main(int argc, char *argv[]) {
|
|||||||
<< config->subscription.keepalive_interval.count() << " seconds"
|
<< config->subscription.keepalive_interval.count() << " seconds"
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
|
||||||
// Create shutdown eventfd for graceful shutdown
|
// Create shutdown pipe for graceful shutdown
|
||||||
shutdown_eventfd = eventfd(0, EFD_CLOEXEC);
|
if (pipe(shutdown_pipe) == -1) {
|
||||||
if (shutdown_eventfd == -1) {
|
perror("pipe");
|
||||||
perror("eventfd");
|
abort();
|
||||||
|
}
|
||||||
|
// Set both ends to close-on-exec
|
||||||
|
if (fcntl(shutdown_pipe[0], F_SETFD, FD_CLOEXEC) == -1 ||
|
||||||
|
fcntl(shutdown_pipe[1], F_SETFD, FD_CLOEXEC) == -1) {
|
||||||
|
perror("fcntl FD_CLOEXEC");
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -285,11 +289,11 @@ int main(int argc, char *argv[]) {
|
|||||||
perror("epoll_create");
|
perror("epoll_create");
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
// Add shutdown eventfd to network thread epoll
|
// Add shutdown pipe read end to network thread epoll
|
||||||
struct epoll_event shutdown_event;
|
struct epoll_event shutdown_event;
|
||||||
shutdown_event.events = EPOLLIN;
|
shutdown_event.events = EPOLLIN;
|
||||||
shutdown_event.data.fd = shutdown_eventfd;
|
shutdown_event.data.fd = shutdown_pipe[0];
|
||||||
if (epoll_ctl(network_epollfd, EPOLL_CTL_ADD, shutdown_eventfd,
|
if (epoll_ctl(network_epollfd, EPOLL_CTL_ADD, shutdown_pipe[0],
|
||||||
&shutdown_event) == -1) {
|
&shutdown_event) == -1) {
|
||||||
perror("epoll_ctl add shutdown event");
|
perror("epoll_ctl add shutdown event");
|
||||||
abort();
|
abort();
|
||||||
@@ -323,8 +327,8 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
for (int i = 0; i < eventCount; ++i) {
|
for (int i = 0; i < eventCount; ++i) {
|
||||||
// Check for shutdown event
|
// Check for shutdown event
|
||||||
if (events[i].data.fd == shutdown_eventfd) {
|
if (events[i].data.fd == shutdown_pipe[0]) {
|
||||||
// Don't read eventfd - all threads need to see shutdown signal
|
// Don't read pipe - all threads need to see shutdown signal
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -383,10 +387,10 @@ int main(int argc, char *argv[]) {
|
|||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add shutdown eventfd to accept epoll
|
// Add shutdown pipe read end to accept epoll
|
||||||
if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_eventfd,
|
if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_pipe[0],
|
||||||
&shutdown_event) == -1) {
|
&shutdown_event) == -1) {
|
||||||
perror("epoll_ctl shutdown eventfd");
|
perror("epoll_ctl shutdown pipe");
|
||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -409,7 +413,7 @@ int main(int argc, char *argv[]) {
|
|||||||
("accept-" + std::to_string(acceptThreadId)).c_str());
|
("accept-" + std::to_string(acceptThreadId)).c_str());
|
||||||
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
struct epoll_event events[2]; // listen socket + shutdown eventfd
|
struct epoll_event events[2]; // listen socket + shutdown pipe
|
||||||
int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */);
|
int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */);
|
||||||
|
|
||||||
if (ready == -1) {
|
if (ready == -1) {
|
||||||
@@ -420,8 +424,8 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < ready; ++i) {
|
for (int i = 0; i < ready; ++i) {
|
||||||
if (events[i].data.fd == shutdown_eventfd) {
|
if (events[i].data.fd == shutdown_pipe[0]) {
|
||||||
// Don't read eventfd - all threads need to see shutdown signal
|
// Don't read pipe - all threads need to see shutdown signal
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -483,7 +487,8 @@ int main(int argc, char *argv[]) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
close(shutdown_eventfd);
|
close(shutdown_pipe[0]);
|
||||||
|
close(shutdown_pipe[1]);
|
||||||
close(accept_epollfd);
|
close(accept_epollfd);
|
||||||
close(network_epollfd);
|
close(network_epollfd);
|
||||||
close(sockfd);
|
close(sockfd);
|
||||||
|
|||||||
Reference in New Issue
Block a user