Prevent queueing of messages on connection after it will be closed
This commit is contained in:
@@ -98,13 +98,22 @@ void Connection::append_bytes(std::span<std::string_view> data_parts,
|
|||||||
|
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
|
|
||||||
|
// Prevent queueing messages after close has been requested
|
||||||
|
if (close_requested_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Check if queue was empty to determine if we need to enable EPOLLOUT
|
// Check if queue was empty to determine if we need to enable EPOLLOUT
|
||||||
bool was_empty = message_queue_.empty();
|
bool was_empty = message_queue_.empty();
|
||||||
|
|
||||||
|
// Set close_requested flag if this message requests close
|
||||||
|
if (close_after_send) {
|
||||||
|
close_requested_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
// Add message to queue
|
// Add message to queue
|
||||||
// TODO this allocates while holding the connection lock
|
// TODO this allocates while holding the connection lock
|
||||||
message_queue_.emplace_back(
|
message_queue_.emplace_back(Message{std::move(arena), data_parts});
|
||||||
Message{std::move(arena), data_parts, close_after_send});
|
|
||||||
outgoing_bytes_queued_ += total_bytes;
|
outgoing_bytes_queued_ += total_bytes;
|
||||||
|
|
||||||
// If queue was empty, we need to add EPOLLOUT interest.
|
// If queue was empty, we need to add EPOLLOUT interest.
|
||||||
@@ -128,6 +137,11 @@ void Connection::send_response(void *protocol_context,
|
|||||||
std::string_view response_json, Arena arena) {
|
std::string_view response_json, Arena arena) {
|
||||||
std::unique_lock lock(mutex_);
|
std::unique_lock lock(mutex_);
|
||||||
|
|
||||||
|
// Prevent queueing responses after close has been requested
|
||||||
|
if (close_requested_) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// Store response in queue for protocol handler processing
|
// Store response in queue for protocol handler processing
|
||||||
pending_response_queue_.emplace_back(
|
pending_response_queue_.emplace_back(
|
||||||
PendingResponse{protocol_context, response_json, std::move(arena)});
|
PendingResponse{protocol_context, response_json, std::move(arena)});
|
||||||
@@ -206,10 +220,6 @@ uint32_t Connection::write_bytes() {
|
|||||||
part.size()};
|
part.size()};
|
||||||
iov_count++;
|
iov_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (message.close_after_send) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (iov_count == 0)
|
if (iov_count == 0)
|
||||||
@@ -272,9 +282,6 @@ uint32_t Connection::write_bytes() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (front_message.close_after_send) {
|
|
||||||
result |= Close;
|
|
||||||
}
|
|
||||||
// Move arena to thread-local vector for deferred cleanup
|
// Move arena to thread-local vector for deferred cleanup
|
||||||
g_arenas_to_free.emplace_back(std::move(front_message.arena));
|
g_arenas_to_free.emplace_back(std::move(front_message.arena));
|
||||||
message_queue_.pop_front();
|
message_queue_.pop_front();
|
||||||
@@ -300,6 +307,9 @@ uint32_t Connection::write_bytes() {
|
|||||||
// sets it and we would hang.
|
// sets it and we would hang.
|
||||||
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
||||||
}
|
}
|
||||||
|
if (close_requested_) {
|
||||||
|
result |= Close;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -109,11 +109,15 @@ struct Connection : MessageSender {
|
|||||||
*
|
*
|
||||||
* @param data_parts Span of string_views pointing to arena-allocated data
|
* @param data_parts Span of string_views pointing to arena-allocated data
|
||||||
* @param arena Arena that owns all the memory referenced by data_parts
|
* @param arena Arena that owns all the memory referenced by data_parts
|
||||||
* @param close_after_send Whether to close connection after sending
|
* @param close_after_send Whether to close connection after sending all
|
||||||
|
* queued data
|
||||||
*
|
*
|
||||||
* @note Thread Safety: Must be called from I/O thread only.
|
* @note Thread Safety: Must be called from I/O thread only.
|
||||||
* @note Ordering: Bytes are sent in the order calls are made.
|
* @note Ordering: Bytes are sent in the order calls are made.
|
||||||
* @note The memory referenced by the data_parts span, must outlive @p arena.
|
* @note The memory referenced by the data_parts span, must outlive @p arena.
|
||||||
|
* @note Close Request: To request connection close without sending data,
|
||||||
|
* pass empty data_parts span with close_after_send=true. This ensures
|
||||||
|
* all previously queued messages are sent before closing.
|
||||||
*
|
*
|
||||||
* Example usage (from ConnectionHandler::on_preprocess_writes):
|
* Example usage (from ConnectionHandler::on_preprocess_writes):
|
||||||
* ```cpp
|
* ```cpp
|
||||||
@@ -289,7 +293,6 @@ private:
|
|||||||
Arena arena; // Owns all the memory (movable)
|
Arena arena; // Owns all the memory (movable)
|
||||||
std::span<std::string_view> data_parts; // Points to arena-allocated memory
|
std::span<std::string_view> data_parts; // Points to arena-allocated memory
|
||||||
// (mutable for partial writes)
|
// (mutable for partial writes)
|
||||||
bool close_after_send = false; // Close connection after sending
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Server is a friend and can access all networking internals
|
// Server is a friend and can access all networking internals
|
||||||
@@ -343,7 +346,8 @@ private:
|
|||||||
// dropped while server accesses existing elements.
|
// dropped while server accesses existing elements.
|
||||||
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
|
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
|
||||||
std::deque<PendingResponse>
|
std::deque<PendingResponse>
|
||||||
pending_response_queue_; // Responses awaiting protocol processing
|
pending_response_queue_; // Responses awaiting protocol processing
|
||||||
|
bool close_requested_{false}; // True if close_after_send has been requested
|
||||||
// Set to a negative number in `close`
|
// Set to a negative number in `close`
|
||||||
int fd_;
|
int fd_;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user