Document per-connection locking strategy

This commit is contained in:
2025-09-15 22:33:52 -04:00
parent 0d76c73077
commit ba59a992dd
3 changed files with 9 additions and 74 deletions

View File

@@ -90,14 +90,6 @@ void Connection::close() {
// Called from I/O thread only // Called from I/O thread only
void Connection::append_bytes(std::span<std::string_view> data_parts, void Connection::append_bytes(std::span<std::string_view> data_parts,
Arena arena, ConnectionShutdown shutdown_mode) { Arena arena, ConnectionShutdown shutdown_mode) {
// Calculate total bytes for this message. Don't need to hold the lock yet.
size_t total_bytes = 0;
for (const auto &part : data_parts) {
total_bytes += part.size();
}
std::unique_lock lock(mutex_);
// Prevent queueing messages after shutdown has been requested // Prevent queueing messages after shutdown has been requested
if (shutdown_requested_ != ConnectionShutdown::None) { if (shutdown_requested_ != ConnectionShutdown::None) {
return; return;
@@ -114,7 +106,6 @@ void Connection::append_bytes(std::span<std::string_view> data_parts,
// Add message to queue // Add message to queue
// TODO this allocates while holding the connection lock // TODO this allocates while holding the connection lock
message_queue_.emplace_back(Message{std::move(arena), data_parts}); message_queue_.emplace_back(Message{std::move(arena), data_parts});
outgoing_bytes_queued_ += total_bytes;
// If queue was empty, we need to add EPOLLOUT interest. // If queue was empty, we need to add EPOLLOUT interest.
if (was_empty) { if (was_empty) {
@@ -133,6 +124,7 @@ void Connection::append_bytes(std::span<std::string_view> data_parts,
} }
} }
// May be called from a foreign thread!
void Connection::send_response(void *protocol_context, void Connection::send_response(void *protocol_context,
std::string_view response_json, Arena arena) { std::string_view response_json, Arena arena) {
std::unique_lock lock(mutex_); std::unique_lock lock(mutex_);
@@ -259,7 +251,6 @@ uint32_t Connection::write_bytes() {
// Handle partial writes by updating message data_parts // Handle partial writes by updating message data_parts
{ {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
outgoing_bytes_queued_ -= w;
size_t bytes_remaining = static_cast<size_t>(w); size_t bytes_remaining = static_cast<size_t>(w);
while (bytes_remaining > 0 && !message_queue_.empty()) { while (bytes_remaining > 0 && !message_queue_.empty()) {

View File

@@ -201,57 +201,6 @@ struct Connection : MessageSender {
*/ */
int64_t get_id() const { return id_; } int64_t get_id() const { return id_; }
/**
* @brief Get the number of bytes queued for transmission.
*
* Returns the total number of bytes in all messages currently
* queued for transmission to the client. This includes all data added via
* append_message() that has not yet been sent over the network.
*
* @return Total bytes queued for transmission
*
* @warning Thread Safety: Only call from the thread that currently owns this
* connection. Concurrent access to the message queue is not thread-safe.
*
* @note Performance: This method uses an O(1) counter for fast retrieval
* in release builds. In debug builds, validates counter accuracy.
*
* @note The count decreases as the server sends data via writeBytes() and
* removes completed messages from the queue.
*
* Use cases:
* ```cpp
* // Check if all data has been sent
* if (conn->outgoing_bytes_queued() == 0) {
* conn->reset(); // Safe to reset arena
* }
*
* // Implement backpressure
* if (conn->outgoing_bytes_queued() > MAX_BUFFER_SIZE) {
* // Stop adding more data until queue drains
* }
*
* // Logging/monitoring
* metrics.recordQueueDepth(conn->get_id(), conn->outgoing_bytes_queued());
* ```
*/
int64_t outgoing_bytes_queued() const {
std::lock_guard lock(mutex_);
#ifndef NDEBUG
// Debug build: validate counter accuracy
int64_t computed_total = 0;
for (const auto &message : message_queue_) {
for (const auto &part : message.data_parts) {
computed_total += part.size();
}
}
assert(
outgoing_bytes_queued_ == computed_total &&
"outgoing_bytes_queued_ counter is out of sync with actual queue size");
#endif
return outgoing_bytes_queued_;
}
/** /**
* @brief Protocol-specific data pointer for handler use. * @brief Protocol-specific data pointer for handler use.
* *
@@ -347,19 +296,14 @@ private:
WeakRef<Server> server_; // Weak reference to server for safe epoll_ctl calls WeakRef<Server> server_; // Weak reference to server for safe epoll_ctl calls
WeakRef<Connection> self_ref_; // WeakRef to self for get_weak_ref() WeakRef<Connection> self_ref_; // WeakRef to self for get_weak_ref()
// state shared with pipeline threads (protected by mutex_) // Only accessed from io thread
mutable std::mutex mutex_; // Protects all mutable state std::deque<Message> message_queue_;
std::deque<Message>
message_queue_; // Queue of messages to send. Protected by mutable std::mutex mutex_;
// mutex_, but if non-empty mutex_ can be
// dropped while server accesses existing elements.
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
std::deque<PendingResponse>
pending_response_queue_; // Responses awaiting protocol processing
ConnectionShutdown shutdown_requested_{ ConnectionShutdown shutdown_requested_{
ConnectionShutdown::None}; // Shutdown mode requested ConnectionShutdown::None}; // Protected by mutex_
// Set to a negative number in `close` std::deque<PendingResponse> pending_response_queue_; // Protected by mutex_
int fd_; int fd_; // Protected by mutex_
#if __has_feature(thread_sanitizer) #if __has_feature(thread_sanitizer)
void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); }

View File

@@ -29,4 +29,4 @@ max_buffer_size_bytes = 10485760 # 10MB
keepalive_interval_seconds = 30 keepalive_interval_seconds = 30
[benchmark] [benchmark]
ok_resolve_iterations = 0 ok_resolve_iterations = 4000