Compare commits

..

17 Commits

Author SHA1 Message Date
18b0a642bf Round out process collector 2025-09-03 15:34:55 -04:00
f0916d8269 Add process collector 2025-09-03 14:38:10 -04:00
2fa5b3e960 Instrument connections 2025-09-03 13:57:23 -04:00
6d480487da Use temp_arena for formatting instead of cached plan arena 2025-09-03 13:23:58 -04:00
54d06c654f Move PerThreadState to per-thread arenas 2025-09-03 13:16:35 -04:00
f067f4e85b Add weaseldb_metrics_memory_bytes 2025-09-03 13:06:34 -04:00
0ac4c31a53 Measure per metric in render scale bench 2025-09-03 12:51:49 -04:00
52b0cb3e6e Remove background thread from callback bench 2025-09-03 12:19:10 -04:00
76193f772c Tinker with benchmarks. Looking at render performance 2025-09-03 12:16:56 -04:00
0e4c526094 Fix realloc bug in static_format 2025-09-03 12:16:02 -04:00
f16cff9126 Don't copy static_text in render 2025-09-03 11:54:35 -04:00
13e4039ed6 Add performance note to header
Also improve implementation comments
2025-09-03 11:18:03 -04:00
17efcf318e Fix potential alignment issue and add more implementation comments 2025-09-03 11:12:01 -04:00
b3e48b904a Add some clarifying implementation comments 2025-09-03 11:01:01 -04:00
721f814785 Cache RenderPlan 2025-09-03 10:53:11 -04:00
8763daca8e Add arena to RenderPlan 2025-09-03 10:43:11 -04:00
a30020e960 Add Metric::registration_version
For cache invalidation
2025-09-03 10:18:19 -04:00
12 changed files with 773 additions and 644 deletions

View File

@@ -114,6 +114,7 @@ set(SOURCES
src/arena_allocator.cpp src/arena_allocator.cpp
src/format.cpp src/format.cpp
src/metric.cpp src/metric.cpp
src/process_collector.cpp
${CMAKE_BINARY_DIR}/json_tokens.cpp) ${CMAKE_BINARY_DIR}/json_tokens.cpp)
add_executable(weaseldb ${SOURCES}) add_executable(weaseldb ${SOURCES})

View File

@@ -15,7 +15,8 @@
// Test data for consistent benchmarks // Test data for consistent benchmarks
constexpr int TEST_INT = 42; constexpr int TEST_INT = 42;
constexpr double TEST_DOUBLE = 3.14159; constexpr double TEST_DOUBLE =
3.141592653589793; // Exact IEEE 754 representation of π
const std::string TEST_STRING = "Hello World"; const std::string TEST_STRING = "Hello World";
// Benchmark simple string concatenation: "Hello " + "World" + "!" // Benchmark simple string concatenation: "Hello " + "World" + "!"
@@ -23,20 +24,21 @@ void benchmark_simple_concatenation() {
std::cout << "\n=== Simple String Concatenation: 'Hello World!' ===\n"; std::cout << "\n=== Simple String Concatenation: 'Hello World!' ===\n";
ankerl::nanobench::Bench bench; ankerl::nanobench::Bench bench;
bench.title("Simple Concatenation").unit("op").warmup(100).epochs(1000); bench.title("Simple Concatenation").unit("op").warmup(100);
ArenaAllocator arena(64);
// Arena-based static_format // Arena-based static_format
bench.run("static_format", [&] { bench.run("static_format", [&] {
ArenaAllocator arena(64);
auto result = static_format(arena, "Hello ", "World", "!"); auto result = static_format(arena, "Hello ", "World", "!");
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// Arena-based format // Arena-based format
bench.run("format", [&] { bench.run("format", [&] {
ArenaAllocator arena(64);
auto result = format(arena, "Hello %s!", "World"); auto result = format(arena, "Hello %s!", "World");
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// std::stringstream // std::stringstream
@@ -54,16 +56,6 @@ void benchmark_simple_concatenation() {
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
}); });
#endif #endif
// Raw snprintf with malloc
bench.run("snprintf + malloc", [&] {
char buffer[64];
int len = std::snprintf(buffer, sizeof(buffer), "Hello %s!", "World");
char *result = static_cast<char *>(std::malloc(len + 1));
std::memcpy(result, buffer, len + 1);
ankerl::nanobench::doNotOptimizeAway(result);
std::free(result);
});
} }
// Benchmark mixed type formatting: "Count: 42, Rate: 3.14159" // Benchmark mixed type formatting: "Count: 42, Rate: 3.14159"
@@ -71,21 +63,22 @@ void benchmark_mixed_types() {
std::cout << "\n=== Mixed Type Formatting: 'Count: 42, Rate: 3.14159' ===\n"; std::cout << "\n=== Mixed Type Formatting: 'Count: 42, Rate: 3.14159' ===\n";
ankerl::nanobench::Bench bench; ankerl::nanobench::Bench bench;
bench.title("Mixed Types").unit("op").warmup(100).epochs(1000); bench.title("Mixed Types").unit("op").warmup(100);
ArenaAllocator arena(128);
// Arena-based static_format // Arena-based static_format
bench.run("static_format", [&] { bench.run("static_format", [&] {
ArenaAllocator arena(128);
auto result = auto result =
static_format(arena, "Count: ", TEST_INT, ", Rate: ", TEST_DOUBLE); static_format(arena, "Count: ", TEST_INT, ", Rate: ", TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// Arena-based format // Arena-based format
bench.run("format", [&] { bench.run("format", [&] {
ArenaAllocator arena(128);
auto result = format(arena, "Count: %d, Rate: %.5f", TEST_INT, TEST_DOUBLE); auto result = format(arena, "Count: %d, Rate: %.5f", TEST_INT, TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// std::stringstream // std::stringstream
@@ -104,17 +97,6 @@ void benchmark_mixed_types() {
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
}); });
#endif #endif
// Raw snprintf with malloc
bench.run("snprintf + malloc", [&] {
char buffer[128];
int len = std::snprintf(buffer, sizeof(buffer), "Count: %d, Rate: %.5f",
TEST_INT, TEST_DOUBLE);
char *result = static_cast<char *>(std::malloc(len + 1));
std::memcpy(result, buffer, len + 1);
ankerl::nanobench::doNotOptimizeAway(result);
std::free(result);
});
} }
// Benchmark complex formatting with precision and alignment // Benchmark complex formatting with precision and alignment
@@ -122,14 +104,15 @@ void benchmark_complex_formatting() {
std::cout << "\n=== Complex Formatting: '%-10s %5d %8.2f' ===\n"; std::cout << "\n=== Complex Formatting: '%-10s %5d %8.2f' ===\n";
ankerl::nanobench::Bench bench; ankerl::nanobench::Bench bench;
bench.title("Complex Formatting").unit("op").warmup(100).epochs(1000); bench.title("Complex Formatting").unit("op").warmup(100);
ArenaAllocator arena(128);
// Arena-based format (static_format doesn't support printf specifiers) // Arena-based format (static_format doesn't support printf specifiers)
bench.run("format", [&] { bench.run("format", [&] {
ArenaAllocator arena(128);
auto result = format(arena, "%-10s %5d %8.2f", TEST_STRING.c_str(), auto result = format(arena, "%-10s %5d %8.2f", TEST_STRING.c_str(),
TEST_INT, TEST_DOUBLE); TEST_INT, TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// std::stringstream // std::stringstream
@@ -150,17 +133,6 @@ void benchmark_complex_formatting() {
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
}); });
#endif #endif
// Raw snprintf with malloc
bench.run("snprintf + malloc", [&] {
char buffer[128];
int len = std::snprintf(buffer, sizeof(buffer), "%-10s %5d %8.2f",
TEST_STRING.c_str(), TEST_INT, TEST_DOUBLE);
char *result = static_cast<char *>(std::malloc(len + 1));
std::memcpy(result, buffer, len + 1);
ankerl::nanobench::doNotOptimizeAway(result);
std::free(result);
});
} }
// Benchmark error message formatting (common use case) // Benchmark error message formatting (common use case)
@@ -169,26 +141,27 @@ void benchmark_error_messages() {
"(line 123)' ===\n"; "(line 123)' ===\n";
ankerl::nanobench::Bench bench; ankerl::nanobench::Bench bench;
bench.title("Error Messages").unit("op").warmup(100).epochs(1000); bench.title("Error Messages").unit("op").warmup(100);
constexpr int error_code = 404; constexpr int error_code = 404;
constexpr int line_number = 123; constexpr int line_number = 123;
const std::string error_msg = "File not found"; const std::string error_msg = "File not found";
ArenaAllocator arena(128);
// Arena-based static_format (using string literals only) // Arena-based static_format (using string literals only)
bench.run("static_format", [&] { bench.run("static_format", [&] {
ArenaAllocator arena(128);
auto result = static_format(arena, "Error ", error_code, ": ", auto result = static_format(arena, "Error ", error_code, ": ",
"File not found", " (line ", line_number, ")"); "File not found", " (line ", line_number, ")");
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// Arena-based format // Arena-based format
bench.run("format", [&] { bench.run("format", [&] {
ArenaAllocator arena(128);
auto result = format(arena, "Error %d: %s (line %d)", error_code, auto result = format(arena, "Error %d: %s (line %d)", error_code,
error_msg.c_str(), line_number); error_msg.c_str(), line_number);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// std::stringstream // std::stringstream
@@ -210,37 +183,73 @@ void benchmark_error_messages() {
#endif #endif
} }
// Benchmark memory allocation overhead by testing arena reuse // Benchmark simple double formatting (common in metrics)
void benchmark_memory_reuse() { void benchmark_double_formatting() {
std::cout << "\n=== Memory Allocation Patterns ===\n"; std::cout << "\n=== Simple Double Formatting ===\n";
// Validate that all formatters produce identical output
ArenaAllocator arena(128);
auto static_result = static_format(arena, TEST_DOUBLE);
auto format_result = format(arena, "%.17g", TEST_DOUBLE);
std::stringstream ss;
ss << std::setprecision(17) << TEST_DOUBLE;
auto stringstream_result = ss.str();
#if HAS_STD_FORMAT
auto std_format_result = std::format("{}", TEST_DOUBLE);
#endif
std::cout << "Validation (note: precision algorithms may differ):\n";
std::cout << " static_format: '" << static_result
<< "' (length: " << static_result.length() << ")\n";
std::cout << " format(%.17g): '" << format_result
<< "' (length: " << format_result.length() << ")\n";
std::cout << " std::stringstream: '" << stringstream_result
<< "' (length: " << stringstream_result.length() << ")\n";
#if HAS_STD_FORMAT
std::cout << " std::format: '" << std_format_result
<< "' (length: " << std_format_result.length() << ")\n";
#endif
std::cout
<< "Note: Different formatters may use different precision algorithms\n";
std::cout << "Proceeding with performance comparison...\n";
ankerl::nanobench::Bench bench; ankerl::nanobench::Bench bench;
bench.title("Memory Patterns").unit("op").warmup(100).epochs(100); bench.title("Double Formatting").unit("op").warmup(100);
// Arena with fresh allocation each time (realistic usage) // Arena-based static_format (double only)
bench.run("fresh arena", [&] { bench.run("static_format(double)", [&] {
ArenaAllocator arena(128); auto result = static_format(arena, TEST_DOUBLE);
auto result = format(arena, "Test %d: %s %.2f", TEST_INT,
TEST_STRING.c_str(), TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// Pre-allocated arena (reuse scenario) // Arena-based format with equivalent precision
ArenaAllocator shared_arena(1024); bench.run("format(%.17g)", [&] {
bench.run("reused arena", [&] { // Use %.17g to match static_format's full precision behavior
auto result = format(shared_arena, "Test %d: %s %.2f", TEST_INT, auto result = format(arena, "%.17g", TEST_DOUBLE);
TEST_STRING.c_str(), TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
arena.reset();
}); });
// Fresh std::string allocations // std::stringstream (full precision)
bench.run("std::stringstream", [&] { bench.run("std::stringstream", [&] {
std::stringstream ss; std::stringstream ss;
ss << "Test " << TEST_INT << ": " << TEST_STRING << " " << std::fixed ss << std::setprecision(17) << TEST_DOUBLE;
<< std::setprecision(2) << TEST_DOUBLE;
auto result = ss.str(); auto result = ss.str();
ankerl::nanobench::doNotOptimizeAway(result); ankerl::nanobench::doNotOptimizeAway(result);
}); });
#if HAS_STD_FORMAT
// std::format (C++20) - default formatting
bench.run("std::format", [&] {
auto result = std::format("{}", TEST_DOUBLE);
ankerl::nanobench::doNotOptimizeAway(result);
});
#endif
} }
int main() { int main() {
@@ -257,7 +266,7 @@ int main() {
benchmark_mixed_types(); benchmark_mixed_types();
benchmark_complex_formatting(); benchmark_complex_formatting();
benchmark_error_messages(); benchmark_error_messages();
benchmark_memory_reuse(); benchmark_double_formatting();
std::cout << "\n=== Summary ===\n"; std::cout << "\n=== Summary ===\n";
std::cout std::cout
@@ -267,8 +276,6 @@ int main() {
std::cout std::cout
<< "* std::stringstream: Flexible but slower due to heap allocation\n"; << "* std::stringstream: Flexible but slower due to heap allocation\n";
std::cout << "* std::format: Modern C++20 alternative (if available)\n"; std::cout << "* std::format: Modern C++20 alternative (if available)\n";
std::cout << "* snprintf + malloc: Low-level but requires manual memory "
"management\n";
return 0; return 0;
} }

View File

@@ -158,7 +158,13 @@ int main() {
// Render performance scaling // Render performance scaling
{ {
bench.unit("metric");
bench.title("render performance");
// Test render performance as number of metrics increases // Test render performance as number of metrics increases
// Create varying numbers of metrics
for (int scale : {10, 100, 1000}) {
metric::reset_metrics_for_testing();
std::vector<metric::Counter> counters; std::vector<metric::Counter> counters;
std::vector<metric::Gauge> gauges; std::vector<metric::Gauge> gauges;
std::vector<metric::Histogram> histograms; std::vector<metric::Histogram> histograms;
@@ -166,13 +172,14 @@ int main() {
auto counter_family = auto counter_family =
metric::create_counter("scale_counter", "Scale counter"); metric::create_counter("scale_counter", "Scale counter");
auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge"); auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge");
auto buckets = std::initializer_list<double>{0.1, 0.5, 1.0, 2.5,
5.0, 10.0, 25.0, 50.0};
auto histogram_family = metric::create_histogram( auto histogram_family = metric::create_histogram(
"scale_histogram", "Scale histogram", "scale_histogram", "Scale histogram", buckets);
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
50.0});
// Create varying numbers of metrics std::atomic<double> counter_value{3.1415924654};
for (int scale : {10, 100, 1000}) { bench.batch(scale * (/*counter*/ 1 + /*gauge*/ 1 + /*callback*/ 1 +
/*histogram*/ (buckets.size() * 2 + 2)));
// Clear previous metrics by creating new families // Clear previous metrics by creating new families
// (Note: In real usage, metrics persist for application lifetime) // (Note: In real usage, metrics persist for application lifetime)
for (int i = 0; i < scale; ++i) { for (int i = 0; i < scale; ++i) {
@@ -186,6 +193,12 @@ int main() {
counters.back().inc(static_cast<double>(i)); counters.back().inc(static_cast<double>(i));
gauges.back().set(static_cast<double>(i * 2)); gauges.back().set(static_cast<double>(i * 2));
histograms.back().observe(static_cast<double>(i) * 0.1); histograms.back().observe(static_cast<double>(i) * 0.1);
// Register callbacks
counter_family.register_callback(
{{"type", "callback"}, {"id", std::to_string(i)}},
[&counter_value]() {
return counter_value.load(std::memory_order_relaxed);
});
} }
ArenaAllocator arena; ArenaAllocator arena;
@@ -200,50 +213,5 @@ int main() {
} }
} }
// Callback metrics performance
{
auto counter_family =
metric::create_counter("callback_counter", "Callback counter");
auto gauge_family =
metric::create_gauge("callback_gauge", "Callback gauge");
std::atomic<double> counter_value{0};
std::atomic<double> gauge_value{100};
// Register callbacks
counter_family.register_callback(
{{"type", "callback"}}, [&counter_value]() {
return counter_value.load(std::memory_order_relaxed);
});
gauge_family.register_callback({{"type", "callback"}}, [&gauge_value]() {
return gauge_value.load(std::memory_order_relaxed);
});
// Background thread updating callback values
std::atomic<bool> stop_callback{false};
std::latch start_latch{2}; // Background thread + benchmark thread
std::thread callback_updater([&]() {
start_latch.arrive_and_wait(); // Wait for benchmark to start
while (!stop_callback.load()) {
counter_value.fetch_add(1);
gauge_value.store(gauge_value.load() + 1);
}
});
ArenaAllocator arena;
start_latch.arrive_and_wait(); // Wait for background thread to be ready
bench.run("render() - with callback metrics", [&]() {
auto output = metric::render(arena);
ankerl::nanobench::doNotOptimizeAway(output);
arena.reset();
});
stop_callback.store(true);
callback_updater.join();
}
return 0; return 0;
} }

View File

@@ -5,8 +5,29 @@
#include <cstdio> #include <cstdio>
#include <cstdlib> #include <cstdlib>
#include "metric.hpp"
#include "server.hpp" // Need this for release_back_to_server implementation #include "server.hpp" // Need this for release_back_to_server implementation
namespace {
// Thread-local metric instances
thread_local auto connections_total =
metric::create_counter("weaseldb_connections_total",
"Total number of connections accepted")
.create({});
thread_local auto connections_active =
metric::create_gauge("weaseldb_connections_active",
"Number of currently active connections")
.create({});
thread_local auto bytes_read =
metric::create_counter("weaseldb_bytes_read_total",
"Total number of bytes read from clients")
.create({});
thread_local auto bytes_written =
metric::create_counter("weaseldb_bytes_written_total",
"Total number of bytes written to clients")
.create({});
} // namespace
// Static thread-local storage for iovec buffer // Static thread-local storage for iovec buffer
static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX}; static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
@@ -16,6 +37,11 @@ Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(), : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
handler_(handler), server_(server.weak_from_this()) { handler_(handler), server_(server.weak_from_this()) {
server.active_connections_.fetch_add(1, std::memory_order_relaxed); server.active_connections_.fetch_add(1, std::memory_order_relaxed);
// Increment connection metrics using thread-local instances
connections_total.inc();
connections_active.inc();
assert(handler_); assert(handler_);
handler_->on_connection_established(*this); handler_->on_connection_established(*this);
} }
@@ -27,6 +53,10 @@ Connection::~Connection() {
if (auto server_ptr = server_.lock()) { if (auto server_ptr = server_.lock()) {
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
} }
// Decrement active connections gauge
connections_active.dec();
int e = close(fd_); int e = close(fd_);
if (e == -1 && errno != EINTR) { if (e == -1 && errno != EINTR) {
perror("close"); perror("close");
@@ -63,11 +93,18 @@ int Connection::readBytes(char *buf, size_t buffer_size) {
if (r == 0) { if (r == 0) {
return -1; return -1;
} }
// Increment bytes read metric
if (r > 0) {
bytes_read.inc(r);
}
return r; return r;
} }
} }
bool Connection::writeBytes() { bool Connection::writeBytes() {
ssize_t total_bytes_written = 0;
while (!messages_.empty()) { while (!messages_.empty()) {
// Build iovec array up to IOV_MAX limit using thread-local vector // Build iovec array up to IOV_MAX limit using thread-local vector
assert(g_iovec_buffer.size() == IOV_MAX); assert(g_iovec_buffer.size() == IOV_MAX);
@@ -93,6 +130,10 @@ bool Connection::writeBytes() {
continue; // Standard practice: retry on signal interruption continue; // Standard practice: retry on signal interruption
} }
if (errno == EAGAIN) { if (errno == EAGAIN) {
// Increment bytes written metric before returning
if (total_bytes_written > 0) {
bytes_written.inc(total_bytes_written);
}
return false; return false;
} }
perror("writev"); perror("writev");
@@ -102,6 +143,7 @@ bool Connection::writeBytes() {
} }
assert(w > 0); assert(w > 0);
total_bytes_written += w;
// Handle partial writes by updating string_view data/size // Handle partial writes by updating string_view data/size
size_t bytes_written = static_cast<size_t>(w); size_t bytes_written = static_cast<size_t>(w);
@@ -123,5 +165,10 @@ bool Connection::writeBytes() {
} }
assert(messages_.empty()); assert(messages_.empty());
// Increment bytes written metric
if (total_bytes_written > 0) {
bytes_written.inc(total_bytes_written);
}
return false; return false;
} }

View File

@@ -948,13 +948,7 @@ char *to_chars(char *first, const char *last, double value) {
} // namespace } // namespace
namespace detail { namespace detail {
void DoubleTerm::write(char *&buf) const { void DoubleTerm::write(char *&buf) const { buf = to_chars(buf, nullptr, s); }
char scratch[kMaxLength];
char *end = to_chars(scratch, nullptr, s);
const auto len = end - scratch;
std::memcpy(buf, scratch, static_cast<std::size_t>(len));
buf += len;
}
} // namespace detail } // namespace detail

View File

@@ -270,7 +270,6 @@ std::string_view static_format(ArenaAllocator &arena, Ts &&...ts) {
char *buf = result; char *buf = result;
(detail::term(ts).write(buf), ...); (detail::term(ts).write(buf), ...);
const int size = static_cast<int>(buf - result); const int size = static_cast<int>(buf - result);
return std::string_view( return std::string_view(arena.realloc(result, upper_bound, size),
arena.realloc(result, upper_bound, upper_bound - size),
static_cast<std::size_t>(size)); static_cast<std::size_t>(size));
} }

View File

@@ -1,10 +1,10 @@
#include "config.hpp" #include "config.hpp"
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp"
#include "http_handler.hpp" #include "http_handler.hpp"
#include "metric.hpp"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "process_collector.hpp"
#include "server.hpp" #include "server.hpp"
#include <atomic>
#include <csignal> #include <csignal>
#include <cstring> #include <cstring>
#include <fcntl.h> #include <fcntl.h>
@@ -176,6 +176,9 @@ int main(int argc, char *argv[]) {
perfetto::TrackEvent::Register(); perfetto::TrackEvent::Register();
#endif #endif
// Register the process collector for default metrics.
metric::register_collector(std::make_shared<ProcessCollector>());
std::string config_file = "config.toml"; std::string config_file = "config.toml";
// Parse command line arguments // Parse command line arguments

File diff suppressed because it is too large Load Diff

View File

@@ -20,6 +20,14 @@
// typical Prometheus client libraries that support multiple registries. // typical Prometheus client libraries that support multiple registries.
// This design choice prioritizes simplicity and performance over flexibility. // This design choice prioritizes simplicity and performance over flexibility.
// //
// PERFORMANCE NOTE:
// Family registration operations (create_counter/gauge/histogram), metric
// instance creation (.create()), and render() use a global mutex for thread
// safety. Registration operations should be performed during application
// initialization, not in performance-critical paths. Metric update operations
// (inc/dec/set/observe) are designed for high-frequency use and do not contend
// on the global mutex.
//
// METRIC LIFECYCLE: // METRIC LIFECYCLE:
// Metrics are created once and persist for the application lifetime. There is // Metrics are created once and persist for the application lifetime. There is
// no unregistration mechanism - this prevents accidental metric loss and // no unregistration mechanism - this prevents accidental metric loss and
@@ -37,6 +45,7 @@
#include <functional> #include <functional>
#include <initializer_list> #include <initializer_list>
#include <memory>
#include <span> #include <span>
#include <type_traits> #include <type_traits>
#include <vector> #include <vector>
@@ -223,6 +232,36 @@ bool is_valid_label_value(std::string_view value);
// when no metric objects are in use and no concurrent render() calls. // when no metric objects are in use and no concurrent render() calls.
void reset_metrics_for_testing(); void reset_metrics_for_testing();
/**
* @brief Interface for a custom collector that can be registered with the
* metrics system.
*
* This is used for complex metric gathering, such as reading from /proc, where
* multiple metrics need to be updated from a single data source.
*/
struct Collector {
/**
* @brief Virtual destructor.
*/
virtual ~Collector() = default;
/**
* @brief Called by the metrics system to update the metrics this collector is
* responsible for.
*/
virtual void collect() = 0;
};
/**
* @brief Register a collector with the metrics system.
*
* The system will hold a shared_ptr to the collector and call its collect()
* method during each metric rendering.
*
* @param collector A shared_ptr to the collector to be registered.
*/
void register_collector(std::shared_ptr<Collector> collector);
// Note: Histograms do not support callbacks due to their multi-value nature // Note: Histograms do not support callbacks due to their multi-value nature
// (buckets + sum + count). Use static histogram metrics only. // (buckets + sum + count). Use static histogram metrics only.

239
src/process_collector.cpp Normal file
View File

@@ -0,0 +1,239 @@
#include "process_collector.hpp"
#include <cstdio>
#include <cstring>
#include <dirent.h>
#include <sys/resource.h>
#include <unistd.h>
#include <vector>
namespace {
// Helper function to read the system boot time from /proc/stat.
// Returns boot time in seconds since epoch, or 0 on error.
double get_boot_time() {
FILE *fp = std::fopen("/proc/stat", "r");
if (!fp) {
return 0;
}
char line[256];
double boot_time = 0;
while (std::fgets(line, sizeof(line), fp)) {
if (std::strncmp(line, "btime ", 6) == 0) {
if (std::sscanf(line + 6, "%lf", &boot_time) != 1) {
boot_time = 0;
}
break;
}
}
std::fclose(fp);
return boot_time;
}
} // namespace
ProcessCollector::ProcessCollector()
: cpu_seconds_total_(metric::create_counter(
"process_cpu_seconds_total",
"Total user and system CPU time spent in seconds")
.create({})),
resident_memory_bytes_(
metric::create_gauge("process_resident_memory_bytes",
"Resident memory size in bytes")
.create({})),
virtual_memory_bytes_(metric::create_gauge("process_virtual_memory_bytes",
"Virtual memory size in bytes")
.create({})),
open_fds_(metric::create_gauge("process_open_fds",
"Number of open file descriptors")
.create({})),
max_fds_(metric::create_gauge("process_max_fds",
"Maximum number of open file descriptors")
.create({})),
start_time_seconds_(
metric::create_gauge(
"process_start_time_seconds",
"Start time of the process since unix epoch in seconds")
.create({})),
threads_(metric::create_gauge("process_threads",
"Number of OS threads in this process")
.create({})),
context_switches_total_voluntary_(
metric::create_counter("process_context_switches_total",
"Total number of context switches")
.create({{"type", "voluntary"}})),
context_switches_total_nonvoluntary_(
metric::create_counter("process_context_switches_total",
"Total number of context switches")
.create({{"type", "nonvoluntary"}})),
page_faults_total_minor_(
metric::create_counter("process_page_faults_total",
"Total number of page faults")
.create({{"type", "minor"}})),
page_faults_total_major_(
metric::create_counter("process_page_faults_total",
"Total number of page faults")
.create({{"type", "major"}})) {
// Set the constant max_fds metric.
struct rlimit rlim;
if (getrlimit(RLIMIT_NOFILE, &rlim) == 0) {
max_fds_.set(rlim.rlim_cur);
}
// Perform an initial collection to populate the other metrics and set the
// initial counter values.
collect();
}
void ProcessCollector::collect() {
// --- CPU Time, Memory, and Start Time from /proc/self/stat ---
FILE *fp = std::fopen("/proc/self/stat", "r");
if (!fp) {
return;
}
char buf[2048];
if (std::fgets(buf, sizeof(buf), fp) == nullptr) {
std::fclose(fp);
return;
}
std::fclose(fp);
// Find the end of the command name, which is in parentheses
const char *stats_start = std::strrchr(buf, ')');
if (!stats_start) {
return;
}
stats_start += 2; // Skip the ')' and the space
// Tokenize the rest of the string
std::vector<const char *> stats;
char *p = const_cast<char *>(stats_start);
while (*p) {
stats.push_back(p);
while (*p && *p != ' ') {
p++;
}
if (*p) {
*p = '\0';
p++;
}
}
// We need at least 24 fields for rss, and also fields 9,11 for page faults
if (stats.size() < 24) {
return;
}
long clk_tck = sysconf(_SC_CLK_TCK);
// --- Page Faults ---
unsigned long long minor_faults = std::strtoull(stats[7], nullptr, 10);
unsigned long long major_faults = std::strtoull(stats[9], nullptr, 10);
if (last_minor_faults_ > 0) {
if (minor_faults > last_minor_faults_) {
page_faults_total_minor_.inc(minor_faults - last_minor_faults_);
}
} else {
page_faults_total_minor_.inc(minor_faults);
}
last_minor_faults_ = minor_faults;
if (last_major_faults_ > 0) {
if (major_faults > last_major_faults_) {
page_faults_total_major_.inc(major_faults - last_major_faults_);
}
} else {
page_faults_total_major_.inc(major_faults);
}
last_major_faults_ = major_faults;
// --- CPU Time ---
unsigned long long utime_ticks = std::strtoull(stats[11], nullptr, 10);
unsigned long long stime_ticks = std::strtoull(stats[12], nullptr, 10);
unsigned long long current_total_ticks = utime_ticks + stime_ticks;
if (last_total_ticks_ > 0) { // If we have a previous value
if (current_total_ticks > last_total_ticks_) {
double delta_seconds =
(double)(current_total_ticks - last_total_ticks_) / clk_tck;
cpu_seconds_total_.inc(delta_seconds);
}
} else { // First run, initialize the counter
cpu_seconds_total_.inc((double)current_total_ticks / clk_tck);
}
last_total_ticks_ = current_total_ticks;
// --- Memory ---
unsigned long long vsize = std::strtoull(stats[20], nullptr, 10);
long rss_pages = std::strtol(stats[21], nullptr, 10);
virtual_memory_bytes_.set(vsize);
resident_memory_bytes_.set(rss_pages * sysconf(_SC_PAGESIZE));
// --- Start Time (only needs to be set once) ---
if (!start_time_set_) {
long long start_time_ticks = std::strtoll(stats[19], nullptr, 10);
double boot_time = get_boot_time();
if (boot_time > 0) {
start_time_seconds_.set(boot_time + (double)start_time_ticks / clk_tck);
start_time_set_ = true;
}
}
// --- File Descriptors ---
int fd_count = 0;
DIR *dp = opendir("/proc/self/fd");
if (dp) {
while (readdir(dp) != nullptr) {
fd_count++;
}
closedir(dp);
// Subtract 3 for '.', '..', and the opendir handle itself
open_fds_.set(fd_count > 3 ? fd_count - 3 : 0);
}
// --- Parse /proc/self/status for additional metrics ---
FILE *status_fp = std::fopen("/proc/self/status", "r");
if (status_fp) {
char status_line[256];
while (std::fgets(status_line, sizeof(status_line), status_fp)) {
if (std::strncmp(status_line, "Threads:\t", 9) == 0) {
int thread_count;
if (std::sscanf(status_line + 9, "%d", &thread_count) == 1) {
threads_.set(thread_count);
}
} else if (std::strncmp(status_line, "voluntary_ctxt_switches:\t", 25) ==
0) {
unsigned long long voluntary_switches;
if (std::sscanf(status_line + 25, "%llu", &voluntary_switches) == 1) {
if (last_voluntary_context_switches_ > 0) {
if (voluntary_switches > last_voluntary_context_switches_) {
context_switches_total_voluntary_.inc(
voluntary_switches - last_voluntary_context_switches_);
}
} else {
context_switches_total_voluntary_.inc(voluntary_switches);
}
last_voluntary_context_switches_ = voluntary_switches;
}
} else if (std::strncmp(status_line, "nonvoluntary_ctxt_switches:\t",
29) == 0) {
unsigned long long nonvoluntary_switches;
if (std::sscanf(status_line + 29, "%llu", &nonvoluntary_switches) ==
1) {
if (last_nonvoluntary_context_switches_ > 0) {
if (nonvoluntary_switches > last_nonvoluntary_context_switches_) {
context_switches_total_nonvoluntary_.inc(
nonvoluntary_switches - last_nonvoluntary_context_switches_);
}
} else {
context_switches_total_nonvoluntary_.inc(nonvoluntary_switches);
}
last_nonvoluntary_context_switches_ = nonvoluntary_switches;
}
}
}
std::fclose(status_fp);
}
}

47
src/process_collector.hpp Normal file
View File

@@ -0,0 +1,47 @@
#pragma once
#include "metric.hpp"
/**
* @brief A metric collector for standard process-level statistics.
*
* Gathers metrics like CPU usage, memory, and file descriptors by reading
* files from the /proc filesystem.
*/
struct ProcessCollector : public metric::Collector {
/**
* @brief Constructs the collector and initializes the process metrics.
*/
ProcessCollector();
/**
* @brief Called by the metrics system to update the process metrics.
*/
void collect() override;
private:
// Metrics for process statistics
metric::Counter cpu_seconds_total_;
metric::Gauge resident_memory_bytes_;
metric::Gauge virtual_memory_bytes_;
metric::Gauge open_fds_;
metric::Gauge max_fds_;
metric::Gauge start_time_seconds_;
// Additional process metrics from /proc/self/status
metric::Gauge threads_;
metric::Counter context_switches_total_voluntary_;
metric::Counter context_switches_total_nonvoluntary_;
// Page fault metrics from /proc/self/stat
metric::Counter page_faults_total_minor_;
metric::Counter page_faults_total_major_;
// Last observed values for calculating counter increments
unsigned long long last_total_ticks_ = 0;
unsigned long long last_minor_faults_ = 0;
unsigned long long last_major_faults_ = 0;
unsigned long long last_voluntary_context_switches_ = 0;
unsigned long long last_nonvoluntary_context_switches_ = 0;
bool start_time_set_ = false;
};

View File

@@ -487,6 +487,7 @@ void Server::process_connection_writes(std::unique_ptr<Connection> &conn,
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
bool had_messages = conn->hasMessages(); bool had_messages = conn->hasMessages();
bool error = conn->writeBytes(); bool error = conn->writeBytes();
if (error) { if (error) {
conn.reset(); // Connection should be closed conn.reset(); // Connection should be closed
return; return;