From ba59a992ddfb5f35ada5320a7073957d29ffc2f0 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 22:33:52 -0400 Subject: [PATCH] Document per-connection locking strategy --- src/connection.cpp | 11 +------- src/connection.hpp | 70 +++++----------------------------------------- test_config.toml | 2 +- 3 files changed, 9 insertions(+), 74 deletions(-) diff --git a/src/connection.cpp b/src/connection.cpp index 31bd198..1d452a0 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -90,14 +90,6 @@ void Connection::close() { // Called from I/O thread only void Connection::append_bytes(std::span data_parts, Arena arena, ConnectionShutdown shutdown_mode) { - // Calculate total bytes for this message. Don't need to hold the lock yet. - size_t total_bytes = 0; - for (const auto &part : data_parts) { - total_bytes += part.size(); - } - - std::unique_lock lock(mutex_); - // Prevent queueing messages after shutdown has been requested if (shutdown_requested_ != ConnectionShutdown::None) { return; @@ -114,7 +106,6 @@ void Connection::append_bytes(std::span data_parts, // Add message to queue // TODO this allocates while holding the connection lock message_queue_.emplace_back(Message{std::move(arena), data_parts}); - outgoing_bytes_queued_ += total_bytes; // If queue was empty, we need to add EPOLLOUT interest. if (was_empty) { @@ -133,6 +124,7 @@ void Connection::append_bytes(std::span data_parts, } } +// May be called from a foreign thread! void Connection::send_response(void *protocol_context, std::string_view response_json, Arena arena) { std::unique_lock lock(mutex_); @@ -259,7 +251,6 @@ uint32_t Connection::write_bytes() { // Handle partial writes by updating message data_parts { std::lock_guard lock(mutex_); - outgoing_bytes_queued_ -= w; size_t bytes_remaining = static_cast(w); while (bytes_remaining > 0 && !message_queue_.empty()) { diff --git a/src/connection.hpp b/src/connection.hpp index dd85ea7..381acd4 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -201,57 +201,6 @@ struct Connection : MessageSender { */ int64_t get_id() const { return id_; } - /** - * @brief Get the number of bytes queued for transmission. - * - * Returns the total number of bytes in all messages currently - * queued for transmission to the client. This includes all data added via - * append_message() that has not yet been sent over the network. - * - * @return Total bytes queued for transmission - * - * @warning Thread Safety: Only call from the thread that currently owns this - * connection. Concurrent access to the message queue is not thread-safe. - * - * @note Performance: This method uses an O(1) counter for fast retrieval - * in release builds. In debug builds, validates counter accuracy. - * - * @note The count decreases as the server sends data via writeBytes() and - * removes completed messages from the queue. - * - * Use cases: - * ```cpp - * // Check if all data has been sent - * if (conn->outgoing_bytes_queued() == 0) { - * conn->reset(); // Safe to reset arena - * } - * - * // Implement backpressure - * if (conn->outgoing_bytes_queued() > MAX_BUFFER_SIZE) { - * // Stop adding more data until queue drains - * } - * - * // Logging/monitoring - * metrics.recordQueueDepth(conn->get_id(), conn->outgoing_bytes_queued()); - * ``` - */ - int64_t outgoing_bytes_queued() const { - std::lock_guard lock(mutex_); -#ifndef NDEBUG - // Debug build: validate counter accuracy - int64_t computed_total = 0; - for (const auto &message : message_queue_) { - for (const auto &part : message.data_parts) { - computed_total += part.size(); - } - } - assert( - outgoing_bytes_queued_ == computed_total && - "outgoing_bytes_queued_ counter is out of sync with actual queue size"); -#endif - return outgoing_bytes_queued_; - } - /** * @brief Protocol-specific data pointer for handler use. * @@ -347,19 +296,14 @@ private: WeakRef server_; // Weak reference to server for safe epoll_ctl calls WeakRef self_ref_; // WeakRef to self for get_weak_ref() - // state shared with pipeline threads (protected by mutex_) - mutable std::mutex mutex_; // Protects all mutable state - std::deque - message_queue_; // Queue of messages to send. Protected by - // mutex_, but if non-empty mutex_ can be - // dropped while server accesses existing elements. - int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes - std::deque - pending_response_queue_; // Responses awaiting protocol processing + // Only accessed from io thread + std::deque message_queue_; + + mutable std::mutex mutex_; ConnectionShutdown shutdown_requested_{ - ConnectionShutdown::None}; // Shutdown mode requested - // Set to a negative number in `close` - int fd_; + ConnectionShutdown::None}; // Protected by mutex_ + std::deque pending_response_queue_; // Protected by mutex_ + int fd_; // Protected by mutex_ #if __has_feature(thread_sanitizer) void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } diff --git a/test_config.toml b/test_config.toml index a216a35..1da241e 100644 --- a/test_config.toml +++ b/test_config.toml @@ -29,4 +29,4 @@ max_buffer_size_bytes = 10485760 # 10MB keepalive_interval_seconds = 30 [benchmark] -ok_resolve_iterations = 0 +ok_resolve_iterations = 4000