From 2fa5b3e9600b80589e8e5240a3e6904244fc885d Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 3 Sep 2025 13:57:23 -0400 Subject: [PATCH] Instrument connections --- src/connection.cpp | 47 ++++++++++++++++++++++++++++++++++++++++++++++ src/server.cpp | 1 + 2 files changed, 48 insertions(+) diff --git a/src/connection.cpp b/src/connection.cpp index ff14a7d..b2c462c 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -5,8 +5,29 @@ #include #include +#include "metric.hpp" #include "server.hpp" // Need this for release_back_to_server implementation +namespace { +// Thread-local metric instances +thread_local auto connections_total = + metric::create_counter("weaseldb_connections_total", + "Total number of connections accepted") + .create({}); +thread_local auto connections_active = + metric::create_gauge("weaseldb_connections_active", + "Number of currently active connections") + .create({}); +thread_local auto bytes_read = + metric::create_counter("weaseldb_bytes_read_total", + "Total number of bytes read from clients") + .create({}); +thread_local auto bytes_written = + metric::create_counter("weaseldb_bytes_written_total", + "Total number of bytes written to clients") + .create({}); +} // namespace + // Static thread-local storage for iovec buffer static thread_local std::vector g_iovec_buffer{IOV_MAX}; @@ -16,6 +37,11 @@ Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(), handler_(handler), server_(server.weak_from_this()) { server.active_connections_.fetch_add(1, std::memory_order_relaxed); + + // Increment connection metrics using thread-local instances + connections_total.inc(); + connections_active.inc(); + assert(handler_); handler_->on_connection_established(*this); } @@ -27,6 +53,10 @@ Connection::~Connection() { if (auto server_ptr = server_.lock()) { server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); } + + // Decrement active connections gauge + connections_active.dec(); + int e = close(fd_); if (e == -1 && errno != EINTR) { perror("close"); @@ -63,11 +93,18 @@ int Connection::readBytes(char *buf, size_t buffer_size) { if (r == 0) { return -1; } + + // Increment bytes read metric + if (r > 0) { + bytes_read.inc(r); + } + return r; } } bool Connection::writeBytes() { + ssize_t total_bytes_written = 0; while (!messages_.empty()) { // Build iovec array up to IOV_MAX limit using thread-local vector assert(g_iovec_buffer.size() == IOV_MAX); @@ -93,6 +130,10 @@ bool Connection::writeBytes() { continue; // Standard practice: retry on signal interruption } if (errno == EAGAIN) { + // Increment bytes written metric before returning + if (total_bytes_written > 0) { + bytes_written.inc(total_bytes_written); + } return false; } perror("writev"); @@ -102,6 +143,7 @@ bool Connection::writeBytes() { } assert(w > 0); + total_bytes_written += w; // Handle partial writes by updating string_view data/size size_t bytes_written = static_cast(w); @@ -123,5 +165,10 @@ bool Connection::writeBytes() { } assert(messages_.empty()); + // Increment bytes written metric + if (total_bytes_written > 0) { + bytes_written.inc(total_bytes_written); + } + return false; } diff --git a/src/server.cpp b/src/server.cpp index 3a02a77..77fde1f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -487,6 +487,7 @@ void Server::process_connection_writes(std::unique_ptr &conn, if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { bool had_messages = conn->hasMessages(); bool error = conn->writeBytes(); + if (error) { conn.reset(); // Connection should be closed return;