From ecfb7f33070def4503f9ff9c52c19e06f72f10e8 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 19 Aug 2025 14:23:17 -0400 Subject: [PATCH] Add on_write_progress --- design.md | 11 ++++++----- src/connection.cpp | 2 +- src/connection.hpp | 7 +++++++ src/connection_handler.hpp | 12 +++++++++--- src/main.cpp | 4 ++-- src/server.cpp | 20 +++++++++++++++++--- 6 files changed, 42 insertions(+), 14 deletions(-) diff --git a/design.md b/design.md index 790d176..0f35db0 100644 --- a/design.md +++ b/design.md @@ -105,7 +105,6 @@ See `config.md` for complete configuration documentation. - **Safe shutdown mechanism** with async-signal-safe shutdown() method - **Connection ownership management** with automatic cleanup on server destruction - **Pluggable protocol handlers** via ConnectionHandler interface -- TODO design a way to stream server sent events Key features: - Multi-threaded architecture: separate accept and network thread pools @@ -135,7 +134,7 @@ Key features: - **Connection lifecycle hooks** for initialization and cleanup Key features: -- process_data() with unique_ptr& for ownership transfer +- on_data_arrived()/on_write_progress() with unique_ptr& for ownership transfer - on_connection_established/closed() hooks for protocol state management - Zero-copy data processing with arena allocator integration - Thread-safe ownership transfer via Server::releaseBackToServer() @@ -282,7 +281,7 @@ The modular design allows each component to be optimized independently while mai ### Adding New Protocol Handlers - Inherit from `ConnectionHandler` in `src/connection_handler.hpp` -- Implement `process_data()` with proper ownership semantics +- Implement `on_data_arrived()` with proper ownership semantics - Use connection's arena allocator for temporary allocations: `conn->getArena()` - Handle partial messages and streaming protocols appropriately - Use `Server::releaseBackToServer()` if taking ownership for async processing @@ -338,7 +337,7 @@ auto server = Server::create(config, handler); ```cpp class HttpHandler : public ConnectionHandler { public: - void process_data(std::string_view data, std::unique_ptr& conn_ptr) override { + void on_data_arrived(std::string_view data, std::unique_ptr& conn_ptr) override { // Parse HTTP request using connection's arena ArenaAllocator& arena = conn_ptr->getArena(); @@ -354,7 +353,7 @@ public: ```cpp class AsyncHandler : public ConnectionHandler { public: - void process_data(std::string_view data, std::unique_ptr& conn_ptr) override { + void on_data_arrived(std::string_view data, std::unique_ptr& conn_ptr) override { // Take ownership for async processing auto connection = std::move(conn_ptr); // conn_ptr is now null @@ -369,6 +368,8 @@ public: }; ``` +TODO add example of streaming data + ### Arena-Based String Handling ```cpp // Preferred: Zero-copy string view with arena allocation diff --git a/src/connection.cpp b/src/connection.cpp index de3312f..7d8ce4d 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -122,7 +122,7 @@ bool Connection::writeBytes() { // This reclaims memory from request parsing and response generation arena_.reset(); - return closeConnection_; + return false; } void Connection::tsan_acquire() { diff --git a/src/connection.hpp b/src/connection.hpp index a37e58c..07ee1d4 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -44,6 +44,13 @@ struct Connection { void closeAfterSend() { closeConnection_ = true; } ArenaAllocator &getArena() { return arena_; } int64_t getId() const { return id_; } + int64_t outgoingBytesQueued() const { + int64_t result = 0; + for (auto s : messages_) { + result += s.size(); + } + return result; + } // Note: To release connection back to server, use // Server::releaseBackToServer(std::move(connection_ptr)) diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 17c816e..6e0ec8c 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -35,11 +35,17 @@ public: * - If ownership is taken, handler must call Server::releaseBackToServer() * when done * @note `data` is *not* owned by the connection arena, and its lifetime ends - * after the call to process_data. + * after the call to on_data_arrived. * @note May be called from an arbitrary network thread. */ - virtual void process_data(std::string_view data, - std::unique_ptr &conn_ptr) = 0; + virtual void on_data_arrived(std::string_view data, + std::unique_ptr &conn_ptr) = 0; + + /** + * Successfully wrote data on the connection. + * @note May be called from an arbitrary network thread. + */ + virtual void on_write_progress(std::unique_ptr &) {} /** * Called when a new connection is established. diff --git a/src/main.cpp b/src/main.cpp index 51b729a..6c3c90f 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -29,8 +29,8 @@ void signal_handler(int sig) { */ class EchoHandler : public ConnectionHandler { public: - void process_data(std::string_view data, - std::unique_ptr &conn_ptr) override { + void on_data_arrived(std::string_view data, + std::unique_ptr &conn_ptr) override { // Echo the received data back to the client conn_ptr->appendMessage(data); } diff --git a/src/server.cpp b/src/server.cpp index bca080f..678ff79 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -270,7 +270,7 @@ void Server::start_network_threads() { // Call handler with unique_ptr - handler can take ownership if // needed - handler_.process_data(data, conn); + handler_.on_data_arrived(data, conn); // If handler took ownership (conn is now null), don't continue // processing @@ -280,8 +280,22 @@ void Server::start_network_threads() { } if (events[i].events & EPOLLOUT) { - bool done = conn->writeBytes(); - if (done) { + bool error = conn->writeBytes(); + if (error) { + continue; + } + + // Call handler with unique_ptr - handler can take ownership if + // needed + handler_.on_write_progress(conn); + // If handler took ownership (conn is now null), don't continue + // processing + if (!conn) { + continue; + } + + // Check if we should close the connection according to application + if (!conn->hasMessages() && conn->closeConnection_) { continue; } }