#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #define ENABLE_PERFETTO 1 #if ENABLE_PERFETTO #include #else #define PERFETTO_DEFINE_CATEGORIES(...) #define PERFETTO_TRACK_EVENT_STATIC_STORAGE(...) #define TRACE_EVENT(...) #endif #include "ThreadPipeline.h" #ifndef __has_feature #define __has_feature(x) 0 #endif PERFETTO_DEFINE_CATEGORIES(perfetto::Category("network").SetDescription( "Network upload and download statistics")); PERFETTO_TRACK_EVENT_STATIC_STORAGE(); namespace { constexpr int kConnectionQueueLgDepth = 13; constexpr int kDefaultPipelineBatchSize = 16; constexpr std::string_view kResponseFmt = "HTTP/1.1 204 No Content\r\nX-Response-Id: %" PRIu64 "\r\n\r\n"; constexpr int kAcceptThreads = 2; constexpr int kNetworkThreads = 8; constexpr int kEventBatchSize = 32; constexpr int kConnectionBufSize = 1024; constexpr uint32_t kMandatoryEpollFlags = EPOLLONESHOT; // Adapted from getaddrinfo man page int getListenFd(const char *node, const char *service) { struct addrinfo hints; struct addrinfo *result, *rp; int sfd, s; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ hints.ai_canonname = nullptr; hints.ai_addr = nullptr; hints.ai_next = nullptr; s = getaddrinfo(node, service, &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); abort(); } /* getaddrinfo() returns a list of address structures. Try each address until we successfully bind(2). If socket(2) (or bind(2)) fails, we (close the socket and) try the next address. */ for (rp = result; rp != nullptr; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } int val = 1; setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } close(sfd); } freeaddrinfo(result); /* No longer needed */ if (rp == nullptr) { /* No address succeeded */ fprintf(stderr, "Could not bind\n"); abort(); } int rv = listen(sfd, SOMAXCONN); if (rv) { perror("listen"); abort(); } return sfd; } int getUnixSocket(const char *socket_name) { int sfd = socket(AF_UNIX, SOCK_STREAM, 0); if (sfd == -1) { perror("socket"); abort(); } struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path) - 1); int e = bind(sfd, (struct sockaddr *)&addr, sizeof(addr)); if (e == -1) { perror("bind"); abort(); } e = listen(sfd, SOMAXCONN); if (e == -1) { perror("listen"); abort(); } return sfd; } int getAcceptFd(int listenFd, struct sockaddr *addr) { socklen_t addrlen = sizeof(sockaddr); int fd = accept4(listenFd, addr, &addrlen, SOCK_NONBLOCK); return fd; } double now() { struct timespec t; int e = clock_gettime(CLOCK_MONOTONIC_RAW, &t); if (e == -1) { perror("clock_gettime"); abort(); } return double(t.tv_sec) + (1e-9 * double(t.tv_nsec)); } } // namespace struct HttpRequest { bool closeConnection = false; int64_t id = 0; }; struct HttpResponse { HttpResponse(int64_t id, bool closeConnection) : id(id), closeConnection(closeConnection) { int len = snprintf(buf, sizeof(buf), kResponseFmt.data(), id); if (len == -1 || len > int(sizeof(buf))) { abort(); } response = std::string_view{buf, size_t(len)}; } int64_t const id; bool closeConnection = false; std::string_view response; private: char buf[kResponseFmt.size() + 64]; }; // Connection lifecycle. Only one of these is the case at a time // - Created on an accept thread from a call to accept // - Waiting on connection fd to be readable/writable // - Owned by a network thread, which drains all readable and writable bytes // - Owned by a thread in the request processing pipeline // - Closed by a network thread according to http protocol // // Since only one thread owns a connection at a time, no synchronization is // necessary struct Connection { static const llhttp_settings_t settings; std::deque requests; std::deque responses; Connection(struct sockaddr addr, int fd, int64_t id) : fd(fd), id(id), addr(addr) { llhttp_init(&parser, HTTP_REQUEST, &settings); parser.data = this; } ~Connection() { int e = close(fd); if (e == -1) { perror("close"); abort(); } } void readBytes() { TRACE_EVENT("network", "read", "connectionId", id); for (;;) { char buf[kConnectionBufSize]; int r = read(fd, buf, sizeof(buf)); if (r == -1) { if (errno == EINTR) { continue; } if (errno == EAGAIN) { return; } perror("read"); goto close_connection; } if (r == 0) { llhttp_finish(&parser); goto close_connection; } auto e = llhttp_execute(&parser, buf, r); if (e != HPE_OK) { fprintf(stderr, "Parse error: %s %s\n", llhttp_errno_name(e), llhttp_get_error_reason(&parser)); goto close_connection; } } close_connection: requests.emplace_back(); requests.back().closeConnection = true; } bool writeBytes() { TRACE_EVENT("network", "write", "connectionId", id); while (!responses.empty()) { auto &front = responses.front(); if (front.closeConnection) { return true; } int w; for (;;) { w = write(fd, front.response.data(), front.response.size()); if (w == -1) { if (errno == EINTR) { continue; } if (errno == EAGAIN) { return false; } perror("write"); return true; } break; } assert(w != 0); front.response = front.response.substr(w, front.response.size() - w); if (front.response.empty()) { TRACE_EVENT("network", "write response", perfetto::Flow::Global(front.id)); responses.pop_front(); } } return false; } const int fd; const int64_t id; #if __has_feature(thread_sanitizer) void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } void tsan_release() { tsan_sync.store(0, std::memory_order_release); } std::atomic tsan_sync; #else void tsan_acquire() {} void tsan_release() {} #endif private: template static int callback(llhttp_t *parser) { auto &self = *static_cast(parser->data); return (self.*Method)(); } template static int callback(llhttp_t *parser, const char *at, size_t length) { auto &self = *static_cast(parser->data); return (self.*Method)(at, length); } uint64_t requestId = 0; int on_header_value(const char *data, size_t s) { for (int i = 0; i < int(s); ++i) { requestId = requestId * 10 + data[i] - '0'; } return 0; } int on_message_complete() { requests.emplace_back(); requests.back().id = requestId; TRACE_EVENT("network", "read request", perfetto::Flow::Global(requestId)); requestId = 0; return 0; } int messages = 0; llhttp_t parser; struct sockaddr addr; }; const llhttp_settings_t Connection::settings = []() { llhttp_settings_t settings; llhttp_settings_init(&settings); settings.on_message_complete = callback<&Connection::on_message_complete>; settings.on_header_value = callback<&Connection::on_header_value>; return settings; }(); int main() { signal(SIGPIPE, SIG_IGN); #if ENABLE_PERFETTO perfetto::TracingInitArgs args; args.backends |= perfetto::kSystemBackend; perfetto::Tracing::Initialize(args); perfetto::TrackEvent::Register(); #endif // int sockfd = getListenFd("0.0.0.0", "4569"); int sockfd = getUnixSocket("multithread-epoll.socket"); std::vector threads; int epollfd = epoll_create(/*ignored*/ 1); if (epollfd == -1) { perror("epoll_create"); abort(); } ThreadPipeline> pipeline{kConnectionQueueLgDepth, {1, 1, 1}}; // Request processing pipeline threads threads.emplace_back([&pipeline]() { pthread_setname_np(pthread_self(), "pipeline-0-0"); for (;;) { auto guard = pipeline.acquire(0, 0, kDefaultPipelineBatchSize); for (auto &conn : guard.batch) { assert(!conn->requests.empty()); while (!conn->requests.empty()) { auto &front = conn->requests.front(); TRACE_EVENT("network", "forward", perfetto::Flow::Global(front.id)); conn->responses.emplace_back(front.id, front.closeConnection); conn->requests.pop_front(); } } } }); // Request processing pipeline threads threads.emplace_back([&pipeline]() { pthread_setname_np(pthread_self(), "pipeline-1-0"); std::deque< std::tuple>::StageGuard, double, int64_t>> queue; int64_t batchNum = 0; double lastBatch = now(); for (;;) { if (queue.size() < 10) { while (now() - lastBatch < 5e-3) { usleep(1000); } auto guard = pipeline.acquire(1, 0, /*maxBatch=*/0, /*mayBlock*/ queue.empty()); lastBatch = now(); if (!guard.batch.empty()) { queue.emplace_back(std::move(guard), now() + 50e-3, batchNum); TRACE_EVENT("network", "startBatch", perfetto::Flow::ProcessScoped(batchNum)); for ([[maybe_unused]] auto &conn : std::get<0>(queue.back()).batch) { for (auto const &r : conn->responses) { TRACE_EVENT("network", "start", perfetto::Flow::Global(r.id)); } } ++batchNum; } } if (queue.size() > 0 && std::get<1>(queue.front()) <= now()) { TRACE_EVENT("network", "finishBatch", perfetto::Flow::ProcessScoped(std::get<2>(queue.front()))); for ([[maybe_unused]] auto &conn : std::get<0>(queue.front()).batch) { for (auto const &r : conn->responses) { TRACE_EVENT("network", "finish", perfetto::Flow::Global(r.id)); } } queue.pop_front(); } } }); // Request processing pipeline threads threads.emplace_back([epollfd, &pipeline]() { pthread_setname_np(pthread_self(), "pipeline-2-0"); for (;;) { auto guard = pipeline.acquire(2, 0, kDefaultPipelineBatchSize); for (auto &conn : guard.batch) { for (auto const &r : conn->responses) { TRACE_EVENT("network", "forward", perfetto::Flow::Global(r.id)); } struct epoll_event event{}; assert(conn->requests.empty()); assert(!conn->responses.empty()); event.events = EPOLLIN | EPOLLOUT | kMandatoryEpollFlags; const int fd = conn->fd; auto *c = conn.release(); c->tsan_release(); event.data.ptr = c; int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event); if (e == -1) { perror("epoll_ctl"); abort(); } } } }); // Network threads for (int i = 0; i < kNetworkThreads; ++i) { threads.emplace_back([epollfd, i, &pipeline]() { pthread_setname_np(pthread_self(), ("network-" + std::to_string(i)).c_str()); std::vector> batch; int64_t requestsDropped = 0; for (;;) { batch.clear(); struct epoll_event events[kEventBatchSize]{}; int eventCount; for (;;) { eventCount = epoll_wait(epollfd, events, kEventBatchSize, /*no timeout*/ -1); if (eventCount == -1) { if (errno == EINTR) { continue; } perror("epoll_wait"); abort(); } break; } for (int i = 0; i < eventCount; ++i) { std::unique_ptr conn{ static_cast(events[i].data.ptr)}; conn->tsan_acquire(); events[i].data.ptr = nullptr; const int fd = conn->fd; if (events[i].events & EPOLLERR) { // Done with connection continue; } if (events[i].events & EPOLLOUT) { // Write bytes, maybe close connection bool finished = conn->writeBytes(); if (finished) { // Done with connection continue; } } if (events[i].events & EPOLLIN) { conn->readBytes(); } if (conn->requests.empty()) { // Transfer back to epoll instance. This thread or another thread // will wake when fd is ready events[i].events = EPOLLIN | kMandatoryEpollFlags; if (!conn->responses.empty()) { events[i].events |= EPOLLOUT; } conn->tsan_release(); events[i].data.ptr = conn.release(); int e = epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &events[i]); if (e == -1) { perror("epoll_ctl"); abort(); } continue; } assert(!conn->requests.empty()); // Transfer to request processing pipeline batch.push_back(std::move(conn)); } for (int moved = 0; moved < int(batch.size());) { auto guard = pipeline.push(batch.size() - moved, /*block=*/false); if (guard.batch.empty()) { requestsDropped += batch.size() - moved; printf("Network thread %d: Queue full. Total requests dropped: " "%" PRId64 "\n", i, requestsDropped); break; } std::move(batch.data() + moved, batch.data() + moved + guard.batch.size(), guard.batch.begin()); moved += guard.batch.size(); } } }); } std::atomic connectionId{0}; for (int i = 0; i < kAcceptThreads; ++i) { threads.emplace_back([epollfd, i, sockfd, &connectionId]() { pthread_setname_np(pthread_self(), ("accept-" + std::to_string(i)).c_str()); // Call accept in a loop for (;;) { struct sockaddr addr; int fd = getAcceptFd(sockfd, &addr); if (fd == -1) { perror("accept4"); continue; } auto conn = std::make_unique( addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed)); // Post to epoll instance struct epoll_event event{}; event.events = EPOLLIN | kMandatoryEpollFlags; conn->tsan_release(); event.data.ptr = conn.release(); int e = epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event); if (e == -1) { perror("epoll_ctl"); abort(); } } }); } for (auto &t : threads) { t.join(); } }