From 5dc800a088778aaf3067f93cd2fdec2c2fb0a93d Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 19 Aug 2025 11:51:44 -0400 Subject: [PATCH] Add Connection::arena --- src/main.cpp | 58 +++++++++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 21 deletions(-) diff --git a/src/main.cpp b/src/main.cpp index 785a680..3c195ee 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,3 +1,4 @@ +#include "arena_allocator.hpp" #include "config.hpp" #include #include @@ -130,6 +131,7 @@ struct Connection { const int fd; const int64_t id; struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6 + ArenaAllocator arena; Connection(struct sockaddr_storage addr, int fd, int64_t id) : fd(fd), id(id), addr(addr) { @@ -145,15 +147,24 @@ struct Connection { } } - struct Task { - std::string s; - bool closeConnection{false}; + struct Message { + // Owned by Connection::arena + std::string_view s; int written = 0; }; + std::deque> messages{ + ArenaStlAllocator{&arena}}; + // Copies s into arena, and appends to messages + void appendMessage(std::string_view s) { + char *arena_str = arena.allocate(s.size()); + std::memcpy(arena_str, s.data(), s.size()); + messages.emplace_back(std::string_view(arena_str, s.size()), 0); + } + // Whether or not to close the connection after completing writing the + // response + bool closeConnection{false}; - std::deque tasks; - - void readBytes(size_t max_request_size) { + bool readBytes(size_t max_request_size) { // Use smaller buffer size but respect max request size // TODO revisit size_t buf_size = std::min(size_t(4096), max_request_size); @@ -165,28 +176,23 @@ struct Connection { continue; } if (errno == EAGAIN) { - return; + return false; } perror("read"); - goto close_connection; + return true; } if (r == 0) { - goto close_connection; + return true; } // "pump parser" // TODO revisit - tasks.emplace_back(std::string{buf.data(), size_t(r)}); + appendMessage({buf.data(), size_t(r)}); } - close_connection: - tasks.emplace_back(std::string{}, true); } bool writeBytes() { - while (!tasks.empty()) { - auto &front = tasks.front(); - if (front.closeConnection) { - return true; - } + while (!messages.empty()) { + auto &front = messages.front(); int w; for (;;) { w = write(fd, front.s.data() + front.written, @@ -206,10 +212,12 @@ struct Connection { assert(w != 0); front.written += w; if (front.written == int(front.s.size())) { - tasks.pop_front(); + messages.pop_front(); } } - return false; + assert(messages.empty()); + arena.reset(); + return closeConnection; } // This is necessary because tsan doesn't (yet?) understand that there's a @@ -346,8 +354,16 @@ int main(int argc, char *argv[]) { continue; } + // When we register our epoll interest, if we have something to + // write, we write. Otherwise we read. + assert(!((events[i].events & EPOLLIN) && + (events[i].events & EPOLLOUT))); + if (events[i].events & EPOLLIN) { - conn->readBytes(max_request_size); + bool done = conn->readBytes(max_request_size); + if (done) { + continue; + } } if (events[i].events & EPOLLOUT) { @@ -357,7 +373,7 @@ int main(int argc, char *argv[]) { } } - if (conn->tasks.empty()) { + if (conn->messages.empty()) { events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP;