diff --git a/src/main.cpp b/src/main.cpp index 2f17ac3..20b372d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -80,9 +80,10 @@ int getListenFd(const char *node, const char *service) { return sfd; } -int getAcceptFd(int listenFd, struct sockaddr *addr) { - socklen_t addrlen = sizeof(sockaddr); - int fd = accept4(listenFd, addr, &addrlen, SOCK_NONBLOCK); +int getAcceptFd(int listenFd, struct sockaddr_storage *addr) { + // Use sockaddr_storage (not sockaddr) to handle both IPv4 and IPv6 + socklen_t addrlen = sizeof(sockaddr_storage); + int fd = accept4(listenFd, (struct sockaddr *)addr, &addrlen, SOCK_NONBLOCK); return fd; } @@ -95,12 +96,18 @@ int getAcceptFd(int listenFd, struct sockaddr *addr) { // // Since only one thread owns a connection at a time, no synchronization is // necessary +// Connection ownership model: +// - Created by accept thread, transferred to epoll via raw pointer +// - Network threads claim ownership by wrapping raw pointer in unique_ptr +// - Network threads transfer back to epoll by releasing unique_ptr to raw +// pointer +// - RAII cleanup happens if network thread doesn't transfer back struct Connection { const int fd; const int64_t id; - struct sockaddr addr; + struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6 - Connection(struct sockaddr addr, int fd, int64_t id) + Connection(struct sockaddr_storage addr, int fd, int64_t id) : fd(fd), id(id), addr(addr) {} ~Connection() { @@ -156,7 +163,7 @@ struct Connection { front.s.size() - front.written); if (w == -1) { if (errno == EINTR) { - continue; + continue; // Standard practice: retry on signal interruption } if (errno == EAGAIN) { return false; @@ -255,24 +262,18 @@ int main(int argc, char *argv[]) { } for (int i = 0; i < eventCount; ++i) { + // Take ownership from epoll: raw pointer -> unique_ptr std::unique_ptr conn{ static_cast(events[i].data.ptr)}; conn->tsan_acquire(); - events[i].data.ptr = nullptr; + events[i].data.ptr = nullptr; // Clear epoll pointer (we own it now) const int fd = conn->fd; - if (events[i].events & EPOLLERR) { - // Done with connection + if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + // Connection closed or error occurred - unique_ptr destructor + // cleans up continue; } - if (events[i].events & EPOLLOUT) { - // Write bytes, maybe close connection - bool finished = conn->writeBytes(); - if (finished) { - // Done with connection - continue; - } - } if (events[i].events & EPOLLIN) { conn->readBytes(); @@ -288,16 +289,17 @@ int main(int argc, char *argv[]) { if (conn->tasks.empty()) { // Transfer back to epoll instance. This thread or another thread // will wake when fd is ready - events[i].events = EPOLLIN | EPOLLONESHOT; + events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { - events[i].events = EPOLLOUT | EPOLLONESHOT; + events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; } + // Transfer ownership back to epoll: unique_ptr -> raw pointer conn->tsan_release(); - events[i].data.ptr = conn.release(); + events[i].data.ptr = conn.release(); // epoll now owns the connection int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); if (e == -1) { perror("epoll_ctl"); - abort(); + abort(); // Process termination - OS cleans up leaked connection } } } @@ -314,7 +316,7 @@ int main(int argc, char *argv[]) { ("accept-" + std::to_string(i)).c_str()); // Call accept in a loop for (;;) { - struct sockaddr addr; + struct sockaddr_storage addr; int fd = getAcceptFd(sockfd, &addr); if (fd == -1) { perror("accept4"); @@ -322,15 +324,16 @@ int main(int argc, char *argv[]) { } auto conn = std::make_unique( addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed)); - // Post to epoll instance + // Transfer new connection to epoll ownership struct epoll_event event{}; - event.events = EPOLLIN | EPOLLONESHOT; + event.events = EPOLLIN | EPOLLONESHOT | + EPOLLRDHUP; // Listen for reads and disconnects conn->tsan_release(); - event.data.ptr = conn.release(); + event.data.ptr = conn.release(); // epoll now owns the connection int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); if (e == -1) { perror("epoll_ctl"); - abort(); + abort(); // Process termination - OS cleans up leaked connection } } });