Updates for Claude

This commit is contained in:
2025-08-18 13:45:04 -04:00
parent faaf750f87
commit 224d2cf708

View File

@@ -80,9 +80,10 @@ int getListenFd(const char *node, const char *service) {
return sfd; return sfd;
} }
int getAcceptFd(int listenFd, struct sockaddr *addr) { int getAcceptFd(int listenFd, struct sockaddr_storage *addr) {
socklen_t addrlen = sizeof(sockaddr); // Use sockaddr_storage (not sockaddr) to handle both IPv4 and IPv6
int fd = accept4(listenFd, addr, &addrlen, SOCK_NONBLOCK); socklen_t addrlen = sizeof(sockaddr_storage);
int fd = accept4(listenFd, (struct sockaddr *)addr, &addrlen, SOCK_NONBLOCK);
return fd; return fd;
} }
@@ -95,12 +96,18 @@ int getAcceptFd(int listenFd, struct sockaddr *addr) {
// //
// Since only one thread owns a connection at a time, no synchronization is // Since only one thread owns a connection at a time, no synchronization is
// necessary // necessary
// Connection ownership model:
// - Created by accept thread, transferred to epoll via raw pointer
// - Network threads claim ownership by wrapping raw pointer in unique_ptr
// - Network threads transfer back to epoll by releasing unique_ptr to raw
// pointer
// - RAII cleanup happens if network thread doesn't transfer back
struct Connection { struct Connection {
const int fd; const int fd;
const int64_t id; const int64_t id;
struct sockaddr addr; struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6
Connection(struct sockaddr addr, int fd, int64_t id) Connection(struct sockaddr_storage addr, int fd, int64_t id)
: fd(fd), id(id), addr(addr) {} : fd(fd), id(id), addr(addr) {}
~Connection() { ~Connection() {
@@ -156,7 +163,7 @@ struct Connection {
front.s.size() - front.written); front.s.size() - front.written);
if (w == -1) { if (w == -1) {
if (errno == EINTR) { if (errno == EINTR) {
continue; continue; // Standard practice: retry on signal interruption
} }
if (errno == EAGAIN) { if (errno == EAGAIN) {
return false; return false;
@@ -255,24 +262,18 @@ int main(int argc, char *argv[]) {
} }
for (int i = 0; i < eventCount; ++i) { for (int i = 0; i < eventCount; ++i) {
// Take ownership from epoll: raw pointer -> unique_ptr
std::unique_ptr<Connection> conn{ std::unique_ptr<Connection> conn{
static_cast<Connection *>(events[i].data.ptr)}; static_cast<Connection *>(events[i].data.ptr)};
conn->tsan_acquire(); conn->tsan_acquire();
events[i].data.ptr = nullptr; events[i].data.ptr = nullptr; // Clear epoll pointer (we own it now)
const int fd = conn->fd; const int fd = conn->fd;
if (events[i].events & EPOLLERR) { if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
// Done with connection // Connection closed or error occurred - unique_ptr destructor
// cleans up
continue; continue;
} }
if (events[i].events & EPOLLOUT) {
// Write bytes, maybe close connection
bool finished = conn->writeBytes();
if (finished) {
// Done with connection
continue;
}
}
if (events[i].events & EPOLLIN) { if (events[i].events & EPOLLIN) {
conn->readBytes(); conn->readBytes();
@@ -288,16 +289,17 @@ int main(int argc, char *argv[]) {
if (conn->tasks.empty()) { if (conn->tasks.empty()) {
// Transfer back to epoll instance. This thread or another thread // Transfer back to epoll instance. This thread or another thread
// will wake when fd is ready // will wake when fd is ready
events[i].events = EPOLLIN | EPOLLONESHOT; events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP;
} else { } else {
events[i].events = EPOLLOUT | EPOLLONESHOT; events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;
} }
// Transfer ownership back to epoll: unique_ptr -> raw pointer
conn->tsan_release(); conn->tsan_release();
events[i].data.ptr = conn.release(); events[i].data.ptr = conn.release(); // epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]);
if (e == -1) { if (e == -1) {
perror("epoll_ctl"); perror("epoll_ctl");
abort(); abort(); // Process termination - OS cleans up leaked connection
} }
} }
} }
@@ -314,7 +316,7 @@ int main(int argc, char *argv[]) {
("accept-" + std::to_string(i)).c_str()); ("accept-" + std::to_string(i)).c_str());
// Call accept in a loop // Call accept in a loop
for (;;) { for (;;) {
struct sockaddr addr; struct sockaddr_storage addr;
int fd = getAcceptFd(sockfd, &addr); int fd = getAcceptFd(sockfd, &addr);
if (fd == -1) { if (fd == -1) {
perror("accept4"); perror("accept4");
@@ -322,15 +324,16 @@ int main(int argc, char *argv[]) {
} }
auto conn = std::make_unique<Connection>( auto conn = std::make_unique<Connection>(
addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed)); addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed));
// Post to epoll instance // Transfer new connection to epoll ownership
struct epoll_event event{}; struct epoll_event event{};
event.events = EPOLLIN | EPOLLONESHOT; event.events = EPOLLIN | EPOLLONESHOT |
EPOLLRDHUP; // Listen for reads and disconnects
conn->tsan_release(); conn->tsan_release();
event.data.ptr = conn.release(); event.data.ptr = conn.release(); // epoll now owns the connection
int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event);
if (e == -1) { if (e == -1) {
perror("epoll_ctl"); perror("epoll_ctl");
abort(); abort(); // Process termination - OS cleans up leaked connection
} }
} }
}); });