#include "connection.hpp" #include #include #include #include #include #include "metric.hpp" #include "server.hpp" // Need this for server reference namespace { // Thread-local metric instances thread_local auto connections_total = metric::create_counter("weaseldb_connections_total", "Total number of connections accepted") .create({}); thread_local auto connections_active = metric::create_gauge("weaseldb_connections_active", "Number of currently active connections") .create({}); thread_local auto bytes_read = metric::create_counter("weaseldb_bytes_read_total", "Total number of bytes read from clients") .create({}); thread_local auto bytes_written = metric::create_counter("weaseldb_bytes_written_total", "Total number of bytes written to clients") .create({}); thread_local auto write_eagain_failures = metric::create_counter( "weaseldb_write_eagain_failures_total", "Total number of write operations that failed with EAGAIN") .create({}); } // namespace // Static thread-local storage for iovec buffer static thread_local std::vector g_iovec_buffer{IOV_MAX}; // Thread-local storage for arenas to be freed after unlocking static thread_local std::vector g_arenas_to_free; Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, size_t epoll_index, ConnectionHandler *handler, WeakRef server) : id_(id), epoll_index_(epoll_index), addr_(addr), handler_(handler), server_(std::move(server)), fd_(fd) { auto server_ref = server_.lock(); // Should only be called from the io thread assert(server_ref); server_ref->active_connections_.fetch_add(1, std::memory_order_relaxed); // Increment connection metrics using thread-local instances connections_total.inc(); connections_active.inc(); assert(handler_); handler_->on_connection_established(*this); } Connection::~Connection() { handler_->on_connection_closed(*this); if (fd_ >= 0) { int e = ::close(fd_); if (e == -1 && errno != EINTR) { perror("close"); std::abort(); } // EINTR ignored - fd is guaranteed closed on Linux } } void Connection::close() { std::lock_guard lock{mutex_}; auto server_ptr = server_.lock(); // Should only be called from the io thread assert(server_ptr); server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); assert(fd_ >= 0); int e = ::close(fd_); if (e == -1 && errno != EINTR) { perror("close"); std::abort(); } // EINTR ignored - fd is guaranteed closed on Linux fd_ = -1; // Decrement active connections gauge connections_active.dec(); } // Called from I/O thread only void Connection::append_bytes(std::span data_parts, Arena arena, ConnectionShutdown shutdown_mode) { // Calculate total bytes for this message. Don't need to hold the lock yet. size_t total_bytes = 0; for (const auto &part : data_parts) { total_bytes += part.size(); } std::unique_lock lock(mutex_); // Prevent queueing messages after shutdown has been requested if (shutdown_requested_ != ConnectionShutdown::None) { return; } // Check if queue was empty to determine if we need to enable EPOLLOUT bool was_empty = message_queue_.empty(); // Set shutdown mode if requested if (shutdown_mode != ConnectionShutdown::None) { shutdown_requested_ = shutdown_mode; } // Add message to queue // TODO this allocates while holding the connection lock message_queue_.emplace_back(Message{std::move(arena), data_parts}); outgoing_bytes_queued_ += total_bytes; // If queue was empty, we need to add EPOLLOUT interest. if (was_empty) { auto server = server_.lock(); if (fd_ >= 0 && server) { // Add EPOLLOUT interest - pipeline thread manages epoll struct epoll_event event; event.data.fd = fd_; event.events = EPOLLIN | EPOLLOUT; tsan_release(); // I think we have to call epoll_ctl while holding mutex_. Otherwise a // call that clears the write interest could get reordered with one that // sets it and we would hang. epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } } } void Connection::send_response(void *protocol_context, std::string_view response_json, Arena arena) { std::unique_lock lock(mutex_); // Prevent queueing responses after shutdown has been requested if (shutdown_requested_ != ConnectionShutdown::None) { return; } // Store response in queue for protocol handler processing pending_response_queue_.emplace_back( PendingResponse{protocol_context, response_json, std::move(arena)}); // Trigger epoll interest if this is the first pending response if (pending_response_queue_.size() == 1) { auto server = server_.lock(); if (fd_ >= 0 && server) { // Add EPOLLOUT interest to trigger on_preprocess_writes struct epoll_event event; event.data.fd = fd_; event.events = EPOLLIN | EPOLLOUT; tsan_release(); epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } } } int Connection::readBytes(char *buf, size_t buffer_size) { int r; for (;;) { r = read(fd_, buf, buffer_size); if (r == -1) { if (errno == EINTR) { continue; } if (errno == EAGAIN) { return 0; } perror("read"); return -1; } if (r == 0) { return -1; } // Increment bytes read metric assert(r > 0); bytes_read.inc(r); return r; } } uint32_t Connection::write_bytes() { ssize_t total_bytes_written = 0; uint32_t result = 0; while (true) { // Build iovec array while holding mutex using thread-local buffer int iov_count = 0; { std::lock_guard lock(mutex_); if (message_queue_.empty()) { break; } // Build iovec array up to IOV_MAX limit using thread-local vector assert(g_iovec_buffer.size() == IOV_MAX); struct iovec *iov = g_iovec_buffer.data(); for (auto &message : message_queue_) { if (iov_count >= IOV_MAX) break; for (const auto &part : message.data_parts) { if (iov_count >= IOV_MAX) break; if (part.empty()) continue; iov[iov_count] = { const_cast(static_cast(part.data())), part.size()}; iov_count++; } } if (iov_count == 0) break; } // Release mutex during I/O // Perform I/O without holding mutex ssize_t w; for (;;) { struct msghdr msg = {}; msg.msg_iov = g_iovec_buffer.data(); msg.msg_iovlen = iov_count; w = sendmsg(fd_, &msg, MSG_NOSIGNAL); if (w == -1) { if (errno == EINTR) { continue; // Standard practice: retry on signal interruption } if (errno == EAGAIN) { // Increment EAGAIN failure metric write_eagain_failures.inc(); bytes_written.inc(total_bytes_written); return result; } perror("sendmsg"); result |= Error; return result; } break; } result |= Progress; assert(w > 0); total_bytes_written += w; // Handle partial writes by updating message data_parts { std::lock_guard lock(mutex_); outgoing_bytes_queued_ -= w; size_t bytes_remaining = static_cast(w); while (bytes_remaining > 0 && !message_queue_.empty()) { auto &front_message = message_queue_.front(); for (auto &part : front_message.data_parts) { if (part.empty()) continue; if (bytes_remaining >= part.size()) { // This part is completely written bytes_remaining -= part.size(); part = std::string_view(); // Mark as consumed } else { // Partial write of this part part = std::string_view(part.data() + bytes_remaining, part.size() - bytes_remaining); bytes_remaining = 0; break; } } // Move arena to thread-local vector for deferred cleanup g_arenas_to_free.emplace_back(std::move(front_message.arena)); message_queue_.pop_front(); if (result & Close) { break; } } } } // Check if queue is empty and remove EPOLLOUT interest { std::lock_guard lock(mutex_); if (message_queue_.empty() && pending_response_queue_.empty()) { auto server = server_.lock(); if (server) { struct epoll_event event; event.data.fd = fd_; event.events = EPOLLIN; // Remove EPOLLOUT tsan_release(); // I think we have to call epoll_ctl while holding mutex_. Otherwise a // call that clears the write interest could get reordered with one that // sets it and we would hang. epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } // Handle shutdown modes after all messages are sent if (shutdown_requested_ == ConnectionShutdown::WriteOnly) { // Shutdown write side but keep connection alive for reading shutdown(fd_, SHUT_WR); } else if (shutdown_requested_ == ConnectionShutdown::Full) { result |= Close; } } } // Increment bytes written metric bytes_written.inc(total_bytes_written); // Clean up arenas after all mutex operations are complete // This avoids holding the connection mutex while calling free() g_arenas_to_free.clear(); return result; }