diff --git a/src/connection.cpp b/src/connection.cpp index 4400298..2004357 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -98,13 +98,22 @@ void Connection::append_bytes(std::span data_parts, std::unique_lock lock(mutex_); + // Prevent queueing messages after close has been requested + if (close_requested_) { + return; + } + // Check if queue was empty to determine if we need to enable EPOLLOUT bool was_empty = message_queue_.empty(); + // Set close_requested flag if this message requests close + if (close_after_send) { + close_requested_ = true; + } + // Add message to queue // TODO this allocates while holding the connection lock - message_queue_.emplace_back( - Message{std::move(arena), data_parts, close_after_send}); + message_queue_.emplace_back(Message{std::move(arena), data_parts}); outgoing_bytes_queued_ += total_bytes; // If queue was empty, we need to add EPOLLOUT interest. @@ -128,6 +137,11 @@ void Connection::send_response(void *protocol_context, std::string_view response_json, Arena arena) { std::unique_lock lock(mutex_); + // Prevent queueing responses after close has been requested + if (close_requested_) { + return; + } + // Store response in queue for protocol handler processing pending_response_queue_.emplace_back( PendingResponse{protocol_context, response_json, std::move(arena)}); @@ -206,10 +220,6 @@ uint32_t Connection::write_bytes() { part.size()}; iov_count++; } - - if (message.close_after_send) { - break; - } } if (iov_count == 0) @@ -272,9 +282,6 @@ uint32_t Connection::write_bytes() { } } - if (front_message.close_after_send) { - result |= Close; - } // Move arena to thread-local vector for deferred cleanup g_arenas_to_free.emplace_back(std::move(front_message.arena)); message_queue_.pop_front(); @@ -300,6 +307,9 @@ uint32_t Connection::write_bytes() { // sets it and we would hang. epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } + if (close_requested_) { + result |= Close; + } } } diff --git a/src/connection.hpp b/src/connection.hpp index 3fb9308..d57acbd 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -109,11 +109,15 @@ struct Connection : MessageSender { * * @param data_parts Span of string_views pointing to arena-allocated data * @param arena Arena that owns all the memory referenced by data_parts - * @param close_after_send Whether to close connection after sending + * @param close_after_send Whether to close connection after sending all + * queued data * * @note Thread Safety: Must be called from I/O thread only. * @note Ordering: Bytes are sent in the order calls are made. * @note The memory referenced by the data_parts span, must outlive @p arena. + * @note Close Request: To request connection close without sending data, + * pass empty data_parts span with close_after_send=true. This ensures + * all previously queued messages are sent before closing. * * Example usage (from ConnectionHandler::on_preprocess_writes): * ```cpp @@ -289,7 +293,6 @@ private: Arena arena; // Owns all the memory (movable) std::span data_parts; // Points to arena-allocated memory // (mutable for partial writes) - bool close_after_send = false; // Close connection after sending }; // Server is a friend and can access all networking internals @@ -343,7 +346,8 @@ private: // dropped while server accesses existing elements. int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes std::deque - pending_response_queue_; // Responses awaiting protocol processing + pending_response_queue_; // Responses awaiting protocol processing + bool close_requested_{false}; // True if close_after_send has been requested // Set to a negative number in `close` int fd_;