Instrument connections

This commit is contained in:
2025-09-03 13:57:23 -04:00
parent 6d480487da
commit 2fa5b3e960
2 changed files with 48 additions and 0 deletions

View File

@@ -5,8 +5,29 @@
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include "metric.hpp"
#include "server.hpp" // Need this for release_back_to_server implementation #include "server.hpp" // Need this for release_back_to_server implementation
namespace {
// Thread-local metric instances
thread_local auto connections_total =
metric::create_counter("weaseldb_connections_total",
"Total number of connections accepted")
.create({});
thread_local auto connections_active =
metric::create_gauge("weaseldb_connections_active",
"Number of currently active connections")
.create({});
thread_local auto bytes_read =
metric::create_counter("weaseldb_bytes_read_total",
"Total number of bytes read from clients")
.create({});
thread_local auto bytes_written =
metric::create_counter("weaseldb_bytes_written_total",
"Total number of bytes written to clients")
.create({});
} // namespace
// Static thread-local storage for iovec buffer // Static thread-local storage for iovec buffer
static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX}; static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
@@ -16,6 +37,11 @@ Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(), : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
handler_(handler), server_(server.weak_from_this()) { handler_(handler), server_(server.weak_from_this()) {
server.active_connections_.fetch_add(1, std::memory_order_relaxed); server.active_connections_.fetch_add(1, std::memory_order_relaxed);
// Increment connection metrics using thread-local instances
connections_total.inc();
connections_active.inc();
assert(handler_); assert(handler_);
handler_->on_connection_established(*this); handler_->on_connection_established(*this);
} }
@@ -27,6 +53,10 @@ Connection::~Connection() {
if (auto server_ptr = server_.lock()) { if (auto server_ptr = server_.lock()) {
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
} }
// Decrement active connections gauge
connections_active.dec();
int e = close(fd_); int e = close(fd_);
if (e == -1 && errno != EINTR) { if (e == -1 && errno != EINTR) {
perror("close"); perror("close");
@@ -63,11 +93,18 @@ int Connection::readBytes(char *buf, size_t buffer_size) {
if (r == 0) { if (r == 0) {
return -1; return -1;
} }
// Increment bytes read metric
if (r > 0) {
bytes_read.inc(r);
}
return r; return r;
} }
} }
bool Connection::writeBytes() { bool Connection::writeBytes() {
ssize_t total_bytes_written = 0;
while (!messages_.empty()) { while (!messages_.empty()) {
// Build iovec array up to IOV_MAX limit using thread-local vector // Build iovec array up to IOV_MAX limit using thread-local vector
assert(g_iovec_buffer.size() == IOV_MAX); assert(g_iovec_buffer.size() == IOV_MAX);
@@ -93,6 +130,10 @@ bool Connection::writeBytes() {
continue; // Standard practice: retry on signal interruption continue; // Standard practice: retry on signal interruption
} }
if (errno == EAGAIN) { if (errno == EAGAIN) {
// Increment bytes written metric before returning
if (total_bytes_written > 0) {
bytes_written.inc(total_bytes_written);
}
return false; return false;
} }
perror("writev"); perror("writev");
@@ -102,6 +143,7 @@ bool Connection::writeBytes() {
} }
assert(w > 0); assert(w > 0);
total_bytes_written += w;
// Handle partial writes by updating string_view data/size // Handle partial writes by updating string_view data/size
size_t bytes_written = static_cast<size_t>(w); size_t bytes_written = static_cast<size_t>(w);
@@ -123,5 +165,10 @@ bool Connection::writeBytes() {
} }
assert(messages_.empty()); assert(messages_.empty());
// Increment bytes written metric
if (total_bytes_written > 0) {
bytes_written.inc(total_bytes_written);
}
return false; return false;
} }

View File

@@ -487,6 +487,7 @@ void Server::process_connection_writes(std::unique_ptr<Connection> &conn,
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
bool had_messages = conn->hasMessages(); bool had_messages = conn->hasMessages();
bool error = conn->writeBytes(); bool error = conn->writeBytes();
if (error) { if (error) {
conn.reset(); // Connection should be closed conn.reset(); // Connection should be closed
return; return;