Files
weaseldb/src/metric.cpp

1335 lines
49 KiB
C++

#include "metric.hpp"
#include <algorithm>
#include <atomic>
#include <bit>
#include <cassert>
#include <cctype>
#include <cmath>
#include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <immintrin.h>
#include <simdutf.h>
#include "arena_allocator.hpp"
#include "format.hpp"
// WeaselDB Metrics System Design:
//
// THREADING MODEL:
// - Counters: Per-thread storage, single writer, atomic write/read coordination
// with render thread
// - Histograms: Per-thread storage, single writer, mutex protection for all
// access (both observe and render)
// - Gauges: Global storage with atomic CAS operations (multi-writer, no mutex
// needed)
//
// SYNCHRONIZATION STRATEGY:
// - Counters: Atomic store in Counter::inc(), atomic load in render thread
// - Histograms: Mutex serializes all access - updates in observe(), reads in
// render
// - Gauges: Lock-free atomic operations for all updates and reads
//
// PRECISION STRATEGY:
// - Use atomic<uint64_t> for lock-free storage
// - Store doubles using std::bit_cast to uint64_t (preserves full IEEE 754
// precision)
// - Single writer for counters enables simple atomic store/load
//
// MEMORY MODEL:
// - Thread-local metrics auto-cleanup on thread destruction
// - Global metrics (gauges) persist for application lifetime
// - Histogram buckets are sorted, deduplicated, sizes never change after
// creation
namespace metric {
// ARENA OWNERSHIP AND MEMORY MANAGEMENT DOCUMENTATION
//
// The metrics system uses multiple arena allocators with distinct ownership
// patterns:
//
// 1. GLOBAL ARENA (get_global_arena()):
// - Lifetime: Application lifetime (never destroyed)
// - Purpose: Persistent storage for metric families, interned labels, and
// global state
// - Owner: Static storage - automatically managed
// - Content: Family names, help text, LabelsKey instances, global
// accumulated values
//
// 2. THREAD-LOCAL ARENA (get_thread_local_arena()):
// - Lifetime: Per-thread lifetime (destroyed on thread exit)
// - Purpose: Storage for per-thread metric instances (Counter::State,
// Histogram::State)
// - Owner: thread_local ThreadInit instance
// - Content: Thread-specific metric instance state
//
// 3. TEMPORARY ARENAS:
// a) Caller-Provided Arenas (ArenaAllocator& parameters):
// - Lifetime: Controlled by caller (function parameter)
// - Purpose: Output formatting where caller controls result lifetime
// - Owner: Caller owns arena and controls string lifetime
// - Example: render(ArenaAllocator& arena) - caller manages arena
// lifecycle
//
// b) Stack-Owned Temporary Arenas:
// - Lifetime: Function/scope lifetime (automatic destruction)
// - Purpose: Internal temporary allocations for lookups and processing
// - Owner: Function owns arena on stack, destroyed at scope exit
// - Example: intern_labels() creates ArenaAllocator lookup_arena(1024)
//
// CRITICAL OWNERSHIP RULES:
//
// - LabelsKey Arena Dependency: LabelsKey instances store string_views pointing
// to arena-allocated memory. The arena MUST outlive any LabelsKey that
// references its memory. LabelsKey constructor copies input strings into the
// provided arena.
//
// - Render Function: render(arena) allocates ALL output strings in the provided
// arena. Callers own the arena and control string lifetime. String_views
// become invalid after arena.reset() or arena destruction.
//
// - Thread Cleanup: ThreadInit destructor accumulates thread-local
// counter/histogram
// values into global storage before thread exit. This copies VALUES (not
// ownership) to prevent metric data loss when threads are destroyed.
//
// - Family Creation: Uses placement new in global arena without explicit
// destructors.
// This is acceptable because Family::State instances persist for application
// lifetime and the global arena is never destroyed.
// Validation helper that works in both debug and release builds
static void validate_or_abort(bool condition, const char *message,
std::string_view value) {
if (!condition) {
std::fprintf(stderr, "WeaselDB metric validation failed: %s: '%.*s'\n",
message, static_cast<int>(value.size()), value.data());
std::abort();
}
}
// Helper to copy a string into arena memory
static std::string_view arena_copy_string(std::string_view str,
ArenaAllocator &arena) {
if (str.empty()) {
return std::string_view{};
}
char *copied = arena.allocate<char>(str.size() + 1);
std::memcpy(copied, str.data(), str.size());
copied[str.size()] = '\0';
return std::string_view(copied, str.size());
}
// Arena-based labels key for second level of map
// Uses string_view to point to arena-allocated strings
struct LabelsKey {
ArenaVector<std::pair<std::string_view, std::string_view>> labels;
// Arena-owning constructor (copies strings into arena)
LabelsKey(std::span<const std::pair<std::string_view, std::string_view>> l,
ArenaAllocator &arena)
: labels(&arena) {
// Copy and validate all label keys and values into arena
for (const auto &[key, value] : l) {
validate_or_abort(is_valid_label_key(key), "invalid label key", key);
validate_or_abort(is_valid_label_value(value), "invalid label value",
value);
auto key_view = arena_copy_string(key, arena);
auto value_view = arena_copy_string(value, arena);
labels.push_back({key_view, value_view});
}
// Sort labels by key for Prometheus compatibility
std::sort(labels.data(), labels.data() + labels.size(),
[](const auto &a, const auto &b) { return a.first < b.first; });
}
bool operator==(const LabelsKey &other) const {
if (labels.size() != other.labels.size())
return false;
for (size_t i = 0; i < labels.size(); ++i) {
if (labels[i].first != other.labels[i].first ||
labels[i].second != other.labels[i].second) {
return false;
}
}
return true;
}
bool operator<(const LabelsKey &other) const {
if (labels.size() != other.labels.size()) {
return labels.size() < other.labels.size();
}
for (size_t i = 0; i < labels.size(); ++i) {
if (labels[i].first != other.labels[i].first) {
return labels[i].first < other.labels[i].first;
}
if (labels[i].second != other.labels[i].second) {
return labels[i].second < other.labels[i].second;
}
}
return false; // They are equal
}
};
} // namespace metric
namespace std {
template <> struct hash<metric::LabelsKey> {
std::size_t operator()(const metric::LabelsKey &k) const {
std::size_t hash_value = 0;
for (size_t i = 0; i < k.labels.size(); ++i) {
const auto &[key, value] = k.labels[i];
// Combine hashes using a simple but effective method
hash_value ^= std::hash<std::string_view>{}(key) + 0x9e3779b9 +
(hash_value << 6) + (hash_value >> 2);
hash_value ^= std::hash<std::string_view>{}(value) + 0x9e3779b9 +
(hash_value << 6) + (hash_value >> 2);
}
return hash_value;
}
};
} // namespace std
namespace metric {
// DESIGN: Store doubles in atomic<uint64_t> for lock-free operations
// - Preserves full IEEE 754 double precision (no truncation)
// - Allows atomic load/store without locks
// - Use std::bit_cast for safe conversion between double and uint64_t
// Family::State structures own the second-level maps (labels -> instances)
template <> struct Family<Counter>::State {
std::string_view name;
std::string_view help;
struct PerThreadState {
std::unordered_map<LabelsKey, Counter::State *> instances;
};
std::unordered_map<std::thread::id, PerThreadState> per_thread_state;
// Global accumulation state for destroyed threads
std::unordered_map<
LabelsKey, Counter::State *, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>>
global_accumulated_values;
// Callback-based metrics (global, not per-thread)
std::map<
LabelsKey, MetricCallback<Counter>, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Counter>>>>
callbacks;
State(ArenaAllocator &arena)
: global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>(
&arena)),
callbacks(
ArenaStlAllocator<
std::pair<const LabelsKey, MetricCallback<Counter>>>(&arena)) {}
};
template <> struct Family<Gauge>::State {
std::string_view name;
std::string_view help;
std::unordered_map<
LabelsKey, Gauge::State *, std::hash<LabelsKey>, std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Gauge::State *>>>
instances;
// Callback-based metrics
std::map<LabelsKey, MetricCallback<Gauge>, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Gauge>>>>
callbacks;
State(ArenaAllocator &arena)
: instances(ArenaStlAllocator<std::pair<const LabelsKey, Gauge::State *>>(
&arena)),
callbacks(ArenaStlAllocator<
std::pair<const LabelsKey, MetricCallback<Gauge>>>(&arena)) {}
};
template <> struct Family<Histogram>::State {
std::string_view name;
std::string_view help;
ArenaVector<double> buckets;
struct PerThreadState {
std::unordered_map<LabelsKey, Histogram::State *> instances;
};
std::unordered_map<std::thread::id, PerThreadState> per_thread_state;
// Global accumulation state for destroyed threads
std::unordered_map<
LabelsKey, Histogram::State *, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>>
global_accumulated_values;
State(ArenaAllocator &arena)
: buckets(&arena),
global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>(
&arena)) {}
// Note: No callbacks map - histograms don't support callback-based metrics
};
// Counter: Thread-local, monotonically increasing, single writer
struct Counter::State {
double value; // Single writer, atomic coordination with render thread
friend struct Metric;
};
// Gauge: Global, can increase/decrease, multiple writers (uses atomic CAS)
struct Gauge::State {
std::atomic<uint64_t>
value; // Stores double as uint64_t bits, lock-free CAS operations
friend struct Metric;
};
// Histogram: Thread-local buckets, single writer, mutex protection per thread,
// per histogram
struct Histogram::State {
ArenaVector<double> thresholds; // Bucket boundaries (sorted, deduplicated,
// sizes never change)
ArenaVector<uint64_t> counts; // Count per bucket
double sum; // Sum of observations
uint64_t observations; // Total observation count
std::mutex
mutex; // Per-thread, per-histogram mutex for consistent reads/writes
State(ArenaAllocator &arena)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
friend struct Metric;
};
struct Metric {
// We use a raw pointer to these in a map, so we don't call their destructors
static_assert(std::is_trivially_destructible_v<Counter::State>);
static_assert(std::is_trivially_destructible_v<Gauge::State>);
static_assert(std::is_trivially_destructible_v<Histogram::State>);
static std::mutex mutex;
// Global arena allocator for metric families and persistent global state
static ArenaAllocator &get_global_arena() {
static ArenaAllocator global_arena(64 * 1024); // 64KB initial size
return global_arena;
}
// Function-local statics to avoid static initialization order fiasco
static auto &get_counter_families() {
using FamilyMap = std::map<
std::string_view, Family<Counter>::State *, std::less<std::string_view>,
ArenaStlAllocator<
std::pair<const std::string_view, Family<Counter>::State *>>>;
static FamilyMap *counterFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Counter>::State *>>(
&get_global_arena()));
return *counterFamilies;
}
static auto &get_gauge_families() {
using FamilyMap = std::map<
std::string_view, Family<Gauge>::State *, std::less<std::string_view>,
ArenaStlAllocator<
std::pair<const std::string_view, Family<Gauge>::State *>>>;
static FamilyMap *gaugeFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Gauge>::State *>>(
&get_global_arena()));
return *gaugeFamilies;
}
static auto &get_histogram_families() {
using FamilyMap =
std::map<std::string_view, Family<Histogram>::State *,
std::less<std::string_view>,
ArenaStlAllocator<std::pair<const std::string_view,
Family<Histogram>::State *>>>;
static FamilyMap *histogramFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Histogram>::State *>>(
&get_global_arena()));
return *histogramFamilies;
}
// Global label interning set to avoid duplicate LabelsKey allocations
static auto &get_interned_labels() {
using InternSet = std::unordered_set<LabelsKey, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<LabelsKey>>;
static InternSet *internedLabels =
new InternSet(ArenaStlAllocator<LabelsKey>(&get_global_arena()));
return *internedLabels;
}
// Thread cleanup for per-family thread-local storage
struct ThreadInit {
ArenaAllocator arena;
ThreadInit() {}
~ThreadInit() {
// Accumulate thread-local state into global state before cleanup
std::unique_lock<std::mutex> _{mutex};
auto thread_id = std::this_thread::get_id();
// Accumulate counter families
for (auto &[name, family] : Metric::get_counter_families()) {
auto thread_it = family->per_thread_state.find(thread_id);
if (thread_it != family->per_thread_state.end()) {
for (auto &[labels_key, instance] : thread_it->second.instances) {
// Get current thread-local value
double current_value = instance->value;
// Ensure global accumulator exists
auto &global_state = family->global_accumulated_values[labels_key];
if (!global_state) {
global_state = get_global_arena().construct<Counter::State>();
global_state->value = 0.0;
}
// Add thread-local value to global accumulator (mutex already held)
global_state->value += current_value;
}
family->per_thread_state.erase(thread_it);
}
}
// Accumulate histogram families
for (auto &[name, family] : Metric::get_histogram_families()) {
auto thread_it = family->per_thread_state.find(thread_id);
if (thread_it != family->per_thread_state.end()) {
for (auto &[labels_key, instance] : thread_it->second.instances) {
// Acquire lock to get consistent snapshot
std::lock_guard<std::mutex> lock(instance->mutex);
// Ensure global accumulator exists
auto &global_state = family->global_accumulated_values[labels_key];
if (!global_state) {
global_state = get_global_arena().construct<Histogram::State>(
get_global_arena());
// Copy thresholds from instance
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
global_state->thresholds.push_back(instance->thresholds[i]);
}
// Initialize counts with zeros
for (size_t i = 0; i < instance->counts.size(); ++i) {
global_state->counts.push_back(0);
}
}
// Accumulate bucket counts (mutex already held)
for (size_t i = 0; i < instance->counts.size(); ++i) {
global_state->counts[i] += instance->counts[i];
}
// Accumulate sum and observations
global_state->sum += instance->sum;
global_state->observations += instance->observations;
}
family->per_thread_state.erase(thread_it);
}
}
// Gauges are global, no per-thread cleanup needed
}
};
static thread_local ThreadInit thread_init;
// Thread-local arena allocator for metric instances
static ArenaAllocator &get_thread_local_arena() { return thread_init.arena; }
// Thread cleanup now handled by ThreadInit RAII
// Intern labels to avoid duplicate arena allocations
static const LabelsKey &intern_labels(
std::span<const std::pair<std::string_view, std::string_view>> labels) {
auto &interned_set = get_interned_labels();
// Create temporary lookup key using stack-allocated arena
ArenaAllocator lookup_arena(1024); // Small arena for lookups
LabelsKey lookup_key{labels, lookup_arena};
// Use standard hash set lookup
auto it = interned_set.find(lookup_key);
if (it != interned_set.end()) {
return *it;
}
// Not found - create and intern new key
LabelsKey new_key{labels, get_global_arena()};
auto result = interned_set.emplace(std::move(new_key));
return *result.first;
}
static Counter create_counter_instance(
Family<Counter> *family,
std::span<const std::pair<std::string_view, std::string_view>> labels) {
// Force thread_local initialization
(void)thread_init;
std::unique_lock<std::mutex> _{mutex};
const LabelsKey &key = intern_labels(labels);
// Validate that labels aren't already registered as callback
validate_or_abort(family->p->callbacks.find(key) ==
family->p->callbacks.end(),
"labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
// Ensure thread state exists
auto thread_id = std::this_thread::get_id();
// Thread state is automatically created by operator[]
auto &ptr = family->p->per_thread_state[thread_id].instances[key];
if (!ptr) {
ptr = get_thread_local_arena().construct<Counter::State>();
ptr->value = 0.0;
// Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key];
if (!global_state) {
global_state = get_global_arena().construct<Counter::State>();
global_state->value = 0.0;
}
}
Counter result;
result.p = ptr;
return result;
}
static Gauge create_gauge_instance(
Family<Gauge> *family,
std::span<const std::pair<std::string_view, std::string_view>> labels) {
std::unique_lock<std::mutex> _{mutex};
const LabelsKey &key = intern_labels(labels);
// Validate that labels aren't already registered as callback
validate_or_abort(family->p->callbacks.find(key) ==
family->p->callbacks.end(),
"labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
auto &ptr = family->p->instances[key];
if (!ptr) {
ptr = get_global_arena().construct<Gauge::State>();
ptr->value.store(0, std::memory_order_relaxed);
}
Gauge result;
result.p = ptr;
return result;
}
static Histogram create_histogram_instance(
Family<Histogram> *family,
std::span<const std::pair<std::string_view, std::string_view>> labels) {
// Force thread_local initialization
(void)thread_init;
std::unique_lock<std::mutex> _{mutex};
const LabelsKey &key = intern_labels(labels);
// Ensure thread state exists
auto thread_id = std::this_thread::get_id();
// Thread state is automatically created by operator[]
auto &ptr = family->p->per_thread_state[thread_id].instances[key];
if (!ptr) {
ptr = get_thread_local_arena().construct<Histogram::State>(
get_thread_local_arena());
// DESIGN: Prometheus-compatible histogram buckets
// Use buckets from family configuration
for (size_t i = 0; i < family->p->buckets.size(); ++i) {
ptr->thresholds.push_back(family->p->buckets[i]);
}
// Initialize with zero values, mutex protects all operations
for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
ptr->counts.push_back(0);
}
// Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key];
if (!global_state) {
global_state =
get_global_arena().construct<Histogram::State>(get_global_arena());
// Copy thresholds
for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
global_state->thresholds.push_back(ptr->thresholds[i]);
}
// Initialize counts with zeros
for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
global_state->counts.push_back(0);
}
}
}
Histogram result;
result.p = ptr;
return result;
}
};
Counter::Counter() = default;
void Counter::inc(double x) {
// THREAD-SAFETY: This method mixes a non-atomic read and an atomic store
// which is safe ONLY under the following conditions, which this system meets:
//
// 1. Single-Writer Guarantee: The underlying Counter::State is thread-local.
// Only one thread will ever call inc() on a given instance. All writes
// and the non-atomic read below are therefore *sequenced*, not concurrent,
// preventing torn reads within this thread.
//
// 2. Atomic Visibility: The render thread is the only other thread that
// accesses this value, and it does so via an atomic load. A concurrent
// non-atomic read (writer) and atomic read (renderer) is not a data race.
//
// This contrasts with Gauges, whose state can be shared by multiple threads
// and thus requires a fully atomic read-modify-write cycle (CAS loop).
auto new_value = p->value + x;
// Validate monotonic property (counter never decreases)
if (new_value < p->value) [[unlikely]] {
validate_or_abort(false, "counter value overflow/wraparound detected",
std::to_string(new_value));
}
__atomic_store(&p->value, &new_value, __ATOMIC_RELAXED);
}
Gauge::Gauge() = default;
void Gauge::inc(double x) {
// Lock-free increment using CAS loop
uint64_t expected = p->value.load(std::memory_order_relaxed);
uint64_t desired;
do {
double current_value = std::bit_cast<double>(expected);
double new_value = current_value + x;
desired = std::bit_cast<uint64_t>(new_value);
} while (!p->value.compare_exchange_weak(expected, desired,
std::memory_order_relaxed));
}
void Gauge::dec(double x) {
// Lock-free decrement using CAS loop
uint64_t expected = p->value.load(std::memory_order_relaxed);
uint64_t desired;
do {
double current_value = std::bit_cast<double>(expected);
double new_value = current_value - x;
desired = std::bit_cast<uint64_t>(new_value);
} while (!p->value.compare_exchange_weak(expected, desired,
std::memory_order_relaxed));
}
void Gauge::set(double x) {
// Simple atomic store for set operation
p->value.store(std::bit_cast<uint64_t>(x), std::memory_order_relaxed);
}
Histogram::Histogram() = default;
// Vectorized histogram bucket updates with mutex protection for consistency
// AVX-optimized implementation for high performance
__attribute__((target("avx"))) static void
update_histogram_buckets_simd(const ArenaVector<double> &thresholds,
ArenaVector<uint64_t> &counts, double x,
size_t start_idx) {
const size_t size = thresholds.size();
size_t i = start_idx;
// Process 2 buckets at a time with 128-bit vectors
const __m128d x_vec = _mm_set1_pd(x);
for (; i + 2 <= size; i += 2) {
// 128-bit vectorized comparison and arithmetic
__m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]);
__m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
__m128i cmp_as_int = _mm_castpd_si128(cmp_result);
__m128i ones = _mm_set1_epi64x(1);
__m128i increments = _mm_and_si128(cmp_as_int, ones);
// Load current counts and add increments
__m128i current_counts = _mm_loadu_si128((__m128i *)&counts[i]);
__m128i updated_counts = _mm_add_epi64(current_counts, increments);
// Store updated counts
_mm_storeu_si128((__m128i *)&counts[i], updated_counts);
}
// Handle remainder with scalar operations
for (; i < size; ++i) {
if (x <= thresholds[i]) {
counts[i]++;
}
}
}
void Histogram::observe(double x) {
assert(p->thresholds.size() == p->counts.size());
std::lock_guard<std::mutex> lock(p->mutex);
// Update bucket counts using SIMD
update_histogram_buckets_simd(p->thresholds, p->counts, x, 0);
// Update sum and observation count
p->sum += x;
p->observations++;
}
template <> Family<Counter>::Family() = default;
template <> Family<Gauge>::Family() = default;
template <> Family<Histogram>::Family() = default;
template <>
Counter Family<Counter>::create(
std::span<const std::pair<std::string_view, std::string_view>> labels) {
return Metric::create_counter_instance(this, labels);
}
template <>
Gauge Family<Gauge>::create(
std::span<const std::pair<std::string_view, std::string_view>> labels) {
return Metric::create_gauge_instance(this, labels);
}
template <>
Histogram Family<Histogram>::create(
std::span<const std::pair<std::string_view, std::string_view>> labels) {
return Metric::create_histogram_instance(this, labels);
}
Family<Counter> create_counter(std::string_view name, std::string_view help) {
validate_or_abort(is_valid_metric_name(name), "invalid counter name", name);
std::unique_lock<std::mutex> _{Metric::mutex};
auto &global_arena = Metric::get_global_arena();
auto name_view = arena_copy_string(name, global_arena);
auto &familyPtr = Metric::get_counter_families()[name_view];
if (!familyPtr) {
// NOTE: Family<T>::State instances are never destroyed - this is fine
// because the number of metric families is bounded by application design
familyPtr = new (global_arena.allocate_raw(sizeof(Family<Counter>::State),
alignof(Family<Counter>::State)))
Family<Counter>::State(global_arena);
familyPtr->name = name_view;
familyPtr->help = arena_copy_string(help, global_arena);
} else {
validate_or_abort(
familyPtr->help == help,
"metric family already registered with different help text", name);
}
Family<Counter> family;
family.p = familyPtr;
return family;
}
Family<Gauge> create_gauge(std::string_view name, std::string_view help) {
validate_or_abort(is_valid_metric_name(name), "invalid gauge name", name);
std::unique_lock<std::mutex> _{Metric::mutex};
auto &global_arena = Metric::get_global_arena();
auto name_view = arena_copy_string(name, global_arena);
auto &familyPtr = Metric::get_gauge_families()[name_view];
if (!familyPtr) {
// NOTE: Family<T>::State instances are never destroyed - this is fine
// because the number of metric families is bounded by application design
familyPtr = new (global_arena.allocate_raw(sizeof(Family<Gauge>::State),
alignof(Family<Gauge>::State)))
Family<Gauge>::State(global_arena);
familyPtr->name = name_view;
familyPtr->help = arena_copy_string(help, global_arena);
} else {
validate_or_abort(
familyPtr->help == help,
"metric family already registered with different help text", name);
}
Family<Gauge> family;
family.p = familyPtr;
return family;
}
Family<Histogram> create_histogram(std::string_view name, std::string_view help,
std::span<const double> buckets) {
validate_or_abort(is_valid_metric_name(name), "invalid histogram name", name);
std::unique_lock<std::mutex> _{Metric::mutex};
auto &global_arena = Metric::get_global_arena();
auto name_view = arena_copy_string(name, global_arena);
auto &family_ptr = Metric::get_histogram_families()[name_view];
if (!family_ptr) {
// NOTE: Family<T>::State instances are never destroyed - this is fine
// because the number of metric families is bounded by application design
family_ptr = new (global_arena.allocate_raw(
sizeof(Family<Histogram>::State), alignof(Family<Histogram>::State)))
Family<Histogram>::State(global_arena);
family_ptr->name = name_view;
family_ptr->help = arena_copy_string(help, global_arena);
// DESIGN: Prometheus-compatible histogram buckets
// Convert to vector for sorting
std::vector<double> temp_buckets(buckets.begin(), buckets.end());
std::sort(temp_buckets.begin(), temp_buckets.end());
temp_buckets.erase(std::unique(temp_buckets.begin(), temp_buckets.end()),
temp_buckets.end());
// Copy sorted buckets to arena vector
for (double bucket : temp_buckets) {
family_ptr->buckets.push_back(bucket);
}
// Note: +Inf bucket is not stored explicitly - we use total observations
// count
} else {
validate_or_abort(
family_ptr->help == help,
"metric family already registered with different help text", name);
std::vector<double> new_buckets_vec(buckets.begin(), buckets.end());
std::sort(new_buckets_vec.begin(), new_buckets_vec.end());
new_buckets_vec.erase(
std::unique(new_buckets_vec.begin(), new_buckets_vec.end()),
new_buckets_vec.end());
// Note: +Inf bucket is not stored explicitly - we use total observations
// count
// Compare with existing buckets
bool buckets_match = (family_ptr->buckets.size() == new_buckets_vec.size());
if (buckets_match) {
for (size_t i = 0; i < family_ptr->buckets.size(); ++i) {
if (family_ptr->buckets[i] != new_buckets_vec[i]) {
buckets_match = false;
break;
}
}
}
validate_or_abort(buckets_match,
"metric family already registered with different buckets",
name);
}
Family<Histogram> family;
family.p = family_ptr;
return family;
}
std::vector<double> linear_buckets(double start, double width, int count) {
validate_or_abort(width > 0, "linear bucket width must be positive",
std::to_string(width));
validate_or_abort(count >= 0, "linear bucket count must be non-negative",
std::to_string(count));
std::vector<double> buckets;
buckets.reserve(count);
for (int i = 0; i < count; ++i) {
buckets.push_back(start + i * width);
}
return buckets;
}
std::vector<double> exponential_buckets(double start, double factor,
int count) {
validate_or_abort(start > 0, "exponential bucket start must be positive",
std::to_string(start));
validate_or_abort(factor > 1, "exponential bucket factor must be > 1",
std::to_string(factor));
validate_or_abort(count >= 0, "exponential bucket count must be non-negative",
std::to_string(count));
std::vector<double> buckets;
buckets.reserve(count);
double current = start;
for (int i = 0; i < count; ++i) {
buckets.push_back(current);
current *= factor;
}
return buckets;
}
// Prometheus validation functions
// Metric names must match [a-zA-Z_:][a-zA-Z0-9_:]*
bool is_valid_metric_name(std::string_view name) {
if (name.empty())
return false;
// First character must be letter, underscore, or colon
char first = name[0];
if (!std::isalpha(first) && first != '_' && first != ':') {
return false;
}
// Remaining characters must be alphanumeric, underscore, or colon
for (size_t i = 1; i < name.size(); ++i) {
char c = name[i];
if (!std::isalnum(c) && c != '_' && c != ':') {
return false;
}
}
return true;
}
// Label keys must match [a-zA-Z_][a-zA-Z0-9_]*
bool is_valid_label_key(std::string_view key) {
if (key.empty())
return false;
// First character must be letter or underscore
char first = key[0];
if (!std::isalpha(first) && first != '_') {
return false;
}
// Remaining characters must be alphanumeric or underscore
for (size_t i = 1; i < key.size(); ++i) {
char c = key[i];
if (!std::isalnum(c) && c != '_') {
return false;
}
}
// Label keys starting with __ are reserved for internal use
if (key.size() >= 2 && key[0] == '_' && key[1] == '_') {
return false;
}
return true;
}
// Label values can contain any UTF-8 characters (no specific restrictions)
bool is_valid_label_value(std::string_view value) {
// Prometheus allows any UTF-8 string as label value
// Validate UTF-8 encoding for correctness using simdutf
return simdutf::validate_utf8(value.data(), value.size());
}
std::span<std::string_view> render(ArenaAllocator &arena) {
std::unique_lock<std::mutex> _{Metric::mutex};
ArenaVector<std::string_view> output(&arena);
auto format_labels =
[&](const ArenaVector<std::pair<std::string_view, std::string_view>>
&labels) -> std::string_view {
if (labels.empty()) {
return "";
}
size_t required_size = 2; // {}
for (const auto &[key, value] : labels) {
required_size += key.length() + 3 + value.length(); // key="value"
for (char c : value) {
if (c == '\\' || c == '"' || c == '\n') {
required_size++;
}
}
}
if (!labels.empty()) {
required_size += labels.size() - 1; // commas
}
char *buf = arena.allocate<char>(required_size);
char *p = buf;
*p++ = '{';
for (size_t i = 0; i < labels.size(); ++i) {
if (i > 0)
*p++ = ',';
std::memcpy(p, labels[i].first.data(), labels[i].first.length());
p += labels[i].first.length();
*p++ = '=';
*p++ = '"';
for (char c : labels[i].second) {
switch (c) {
case '\\':
*p++ = '\\';
*p++ = '\\';
break;
case '"':
*p++ = '\\';
*p++ = '"';
break;
case '\n':
*p++ = '\\';
*p++ = 'n';
break;
default:
*p++ = c;
break;
}
}
*p++ = '"';
}
*p++ = '}';
return std::string_view(buf, p - buf);
};
// Render counters
for (const auto &[name, family] : Metric::get_counter_families()) {
output.push_back(format(arena, "# HELP %.*s %.*s\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s counter\n",
static_cast<int>(name.length()), name.data()));
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
&arena);
for (const auto &[labels_key, callback] : family->callbacks) {
auto value = callback();
labels_sv.clear();
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
value));
}
// Aggregate all counter values (thread-local + global accumulated)
std::map<LabelsKey, double, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
aggregated_values{
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
// First, add thread-local values
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
// Atomic read to match atomic store in Counter::inc()
double value;
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED);
aggregated_values[labels_key] += value;
}
}
// Then, add globally accumulated values from destroyed threads
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
aggregated_values[labels_key] += global_state->value;
}
}
// Render aggregated counter values
for (const auto &[labels_key, total_value] : aggregated_values) {
labels_sv.clear();
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
total_value));
}
}
// Render gauges
for (const auto &[name, family] : Metric::get_gauge_families()) {
output.push_back(format(arena, "# HELP %.*s %.*s\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s gauge\n",
static_cast<int>(name.length()), name.data()));
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
&arena);
for (const auto &[labels_key, callback] : family->callbacks) {
auto value = callback();
labels_sv.clear();
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
value));
}
for (const auto &[labels_key, instance] : family->instances) {
auto value = std::bit_cast<double>(
instance->value.load(std::memory_order_relaxed));
labels_sv.clear();
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
value));
}
}
// Render histograms
for (const auto &[name, family] : Metric::get_histogram_families()) {
output.push_back(format(arena, "# HELP %.*s %.*s\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s histogram\n",
static_cast<int>(name.length()), name.data()));
// Aggregate all histogram values (thread-local + global accumulated)
// Use a simpler structure to avoid tuple constructor issues
struct AggregatedHistogram {
ArenaVector<double> thresholds;
ArenaVector<uint64_t> counts;
double sum;
uint64_t observations;
AggregatedHistogram(ArenaAllocator &arena)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
};
std::map<
LabelsKey, AggregatedHistogram *, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, AggregatedHistogram *>>>
aggregated_histograms{ArenaStlAllocator<
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv(
&arena);
// First, collect thread-local histogram data
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
// Extract data under lock - minimize critical section
// Note: thresholds and counts sizes never change after histogram
// creation
ArenaVector<double> thresholds_snapshot(&arena);
ArenaVector<uint64_t> counts_snapshot(&arena);
double sum_snapshot;
uint64_t observations_snapshot;
// Copy data with minimal critical section
{
std::lock_guard<std::mutex> lock(instance->mutex);
// Copy thresholds
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
thresholds_snapshot.push_back(instance->thresholds[i]);
}
// Copy counts
for (size_t i = 0; i < instance->counts.size(); ++i) {
counts_snapshot.push_back(instance->counts[i]);
}
sum_snapshot = instance->sum;
observations_snapshot = instance->observations;
}
// Initialize or aggregate into aggregated_histograms
auto it = aggregated_histograms.find(labels_key);
if (it == aggregated_histograms.end()) {
// Create new entry
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
agg_hist->thresholds.push_back(thresholds_snapshot[i]);
}
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
agg_hist->counts.push_back(counts_snapshot[i]);
}
agg_hist->sum = sum_snapshot;
agg_hist->observations = observations_snapshot;
aggregated_histograms[labels_key] = agg_hist;
} else {
// Aggregate with existing entry
auto *agg_hist = it->second;
// Aggregate counts
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
agg_hist->counts[i] += counts_snapshot[i];
}
agg_hist->sum += sum_snapshot;
agg_hist->observations += observations_snapshot;
}
}
}
// Then, add globally accumulated values from destroyed threads
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
auto it = aggregated_histograms.find(labels_key);
if (it == aggregated_histograms.end()) {
// Create new entry from global state
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
agg_hist->thresholds.push_back(global_state->thresholds[i]);
}
for (size_t i = 0; i < global_state->counts.size(); ++i) {
agg_hist->counts.push_back(global_state->counts[i]);
}
agg_hist->sum = global_state->sum;
agg_hist->observations = global_state->observations;
aggregated_histograms[labels_key] = agg_hist;
} else {
// Add global accumulated values to existing entry
auto *agg_hist = it->second;
for (size_t i = 0; i < global_state->counts.size(); ++i) {
agg_hist->counts[i] += global_state->counts[i];
}
agg_hist->sum += global_state->sum;
agg_hist->observations += global_state->observations;
}
}
}
// Render aggregated histogram data
for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
// Render explicit bucket counts
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) {
bucket_labels_sv.clear();
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(labels_key.labels[j]);
}
bucket_labels_sv.push_back(
{"le", static_format(arena, agg_hist->thresholds[i])});
auto labels = format_labels(bucket_labels_sv);
output.push_back(format(
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
name.data(), static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(agg_hist->counts[i])));
}
// Render +Inf bucket using total observations count
bucket_labels_sv.clear();
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(labels_key.labels[j]);
}
bucket_labels_sv.push_back({"le", "+Inf"});
auto inf_labels = format_labels(bucket_labels_sv);
output.push_back(format(
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
name.data(), static_cast<int>(inf_labels.length()), inf_labels.data(),
static_cast<unsigned long long>(agg_hist->observations)));
// Render sum
bucket_labels_sv.clear();
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(labels_key.labels[j]);
}
auto labels = format_labels(bucket_labels_sv);
output.push_back(format(arena, "%.*s_sum%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
agg_hist->sum));
// Render count
output.push_back(format(
arena, "%.*s_count%.*s %llu\n", static_cast<int>(name.length()),
name.data(), static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(agg_hist->observations)));
}
}
auto result = arena.allocate<std::string_view>(output.size());
std::copy(output.data(), output.data() + output.size(), result);
return std::span<std::string_view>(result, output.size());
}
// Template specialization implementations for register_callback
template <>
void Family<Counter>::register_callback(
std::span<const std::pair<std::string_view, std::string_view>> labels,
MetricCallback<Counter> callback) {
std::unique_lock<std::mutex> _{Metric::mutex};
const LabelsKey &key = Metric::intern_labels(labels);
// Validate that labels aren't already in use by create() calls
for (const auto &[thread_id, per_thread] : p->per_thread_state) {
validate_or_abort(per_thread.instances.find(key) ==
per_thread.instances.end(),
"labels already registered as static instance",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
}
// Validate that callback isn't already registered for these labels
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
"callback already registered for labels",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
p->callbacks[std::move(key)] = std::move(callback);
}
template <>
void Family<Gauge>::register_callback(
std::span<const std::pair<std::string_view, std::string_view>> labels,
MetricCallback<Gauge> callback) {
std::unique_lock<std::mutex> _{Metric::mutex};
const LabelsKey &key = Metric::intern_labels(labels);
// Validate that labels aren't already in use by create() calls
validate_or_abort(p->instances.find(key) == p->instances.end(),
"labels already registered as static instance",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
// Validate that callback isn't already registered for these labels
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
"callback already registered for labels",
key.labels.empty() ? "(no labels)" : key.labels[0].first);
p->callbacks[std::move(key)] = std::move(callback);
}
// Explicit template instantiations to provide member implementations
// Static member definitions
std::mutex Metric::mutex;
thread_local Metric::ThreadInit Metric::thread_init;
void reset_metrics_for_testing() {
std::lock_guard _{Metric::mutex};
// WARNING: This function assumes no metric objects are in use!
// Clear all family maps - this will leak the Family::State objects but
// that's acceptable for testing since they were allocated in the global arena
// Get references to the maps
auto &counter_families = Metric::get_counter_families();
auto &gauge_families = Metric::get_gauge_families();
auto &histogram_families = Metric::get_histogram_families();
auto &interned_labels = Metric::get_interned_labels();
// Clear all family registrations
counter_families.clear();
gauge_families.clear();
histogram_families.clear();
interned_labels.clear();
// Reset the global arena - this will invalidate all arena-allocated strings
// but since we're clearing everything, that's OK
Metric::get_global_arena().reset();
// Note: Thread-local arenas will be cleaned up by ThreadInit destructors
// when threads exit naturally
}
} // namespace metric