Flesh out metrics architecture more
This commit is contained in:
373
src/metric.cpp
373
src/metric.cpp
@@ -1,38 +1,68 @@
|
||||
#include "metric.hpp"
|
||||
|
||||
// WeaselDB Metrics System Design:
|
||||
//
|
||||
// THREADING MODEL:
|
||||
// - Counters and Histograms: Per-thread storage, single writer per thread
|
||||
// - Gauges: Global storage with mutex protection (multi-writer)
|
||||
//
|
||||
// PRECISION STRATEGY:
|
||||
// - Use atomic<uint64_t> for lock-free storage
|
||||
// - Store doubles by reinterpreting bits as uint64_t (preserves full IEEE 754
|
||||
// precision)
|
||||
// - Single writer assumption allows simple load/store without CAS loops
|
||||
//
|
||||
// MEMORY MODEL:
|
||||
// - Thread-local metrics auto-cleanup on thread destruction
|
||||
// - Global metrics (gauges) persist for application lifetime
|
||||
// - Histogram buckets are sorted, deduplicated, and include +Inf bucket
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstdint>
|
||||
#include <cstring>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string_view>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace metric {
|
||||
struct MetricKey {
|
||||
std::string_view name;
|
||||
|
||||
// Labels key for second level of map
|
||||
struct LabelsKey {
|
||||
std::vector<std::pair<std::string, std::string>> labels;
|
||||
bool operator==(const MetricKey &other) const {
|
||||
return name == other.name && labels == other.labels;
|
||||
|
||||
LabelsKey(std::vector<std::pair<std::string, std::string>> l)
|
||||
: labels(std::move(l)) {
|
||||
// Sort labels by key for Prometheus compatibility
|
||||
std::sort(labels.begin(), labels.end(),
|
||||
[](const auto &a, const auto &b) { return a.first < b.first; });
|
||||
}
|
||||
|
||||
bool operator==(const LabelsKey &other) const {
|
||||
return labels == other.labels;
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace metric
|
||||
|
||||
namespace std {
|
||||
template <> struct hash<metric::MetricKey> {
|
||||
std::size_t operator()(const metric::MetricKey &k) const {
|
||||
template <> struct hash<metric::LabelsKey> {
|
||||
std::size_t operator()(const metric::LabelsKey &k) const {
|
||||
thread_local std::vector<size_t> parts;
|
||||
parts.clear();
|
||||
parts.push_back(std::hash<std::string_view>{}(k.name));
|
||||
for (const auto &p : k.labels) {
|
||||
parts.push_back(std::hash<std::string>{}(p.first));
|
||||
parts.push_back(std::hash<std::string>{}(p.second));
|
||||
}
|
||||
return std::hash<std::string_view>{}(
|
||||
std::string_view{reinterpret_cast<const char *>(parts.data()),
|
||||
parts.size() * sizeof(size_t)});
|
||||
return std::hash<std::string>{}(
|
||||
std::string{reinterpret_cast<const char *>(parts.data()),
|
||||
parts.size() * sizeof(size_t)});
|
||||
}
|
||||
};
|
||||
} // namespace std
|
||||
@@ -41,77 +71,10 @@ namespace metric {
|
||||
|
||||
using AtomicWord = std::atomic<uint64_t>;
|
||||
|
||||
struct Counter::State {
|
||||
AtomicWord value;
|
||||
};
|
||||
|
||||
struct Gauge::State {
|
||||
std::mutex mutex;
|
||||
double value;
|
||||
};
|
||||
|
||||
struct Histogram::State {
|
||||
std::vector<double> thresholds;
|
||||
std::vector<AtomicWord> counts;
|
||||
AtomicWord sum;
|
||||
AtomicWord observations;
|
||||
};
|
||||
|
||||
struct Metric {
|
||||
|
||||
static std::mutex mutex;
|
||||
|
||||
struct PerThreadState {
|
||||
std::unordered_map<MetricKey, Counter::State> counters;
|
||||
std::unordered_map<MetricKey, Histogram::State> histograms;
|
||||
};
|
||||
|
||||
static std::unordered_map<std::thread::id, PerThreadState> perThreadState;
|
||||
|
||||
static std::unordered_map<MetricKey, Gauge::State> gauges;
|
||||
|
||||
struct ThreadInit {
|
||||
ThreadInit() {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
perThreadState[std::this_thread::get_id()] = {};
|
||||
}
|
||||
~ThreadInit() {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
perThreadState.erase(std::this_thread::get_id());
|
||||
}
|
||||
};
|
||||
static thread_local ThreadInit thread_init;
|
||||
|
||||
static Counter
|
||||
create_counter(std::string_view name,
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
Counter result;
|
||||
result.p = &perThreadState[std::this_thread::get_id()]
|
||||
.counters[MetricKey{name, labels}];
|
||||
return result;
|
||||
}
|
||||
|
||||
static Gauge
|
||||
create_gauge(std::string_view name,
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
Gauge result;
|
||||
result.p = &gauges[MetricKey{name, labels}];
|
||||
return result;
|
||||
}
|
||||
|
||||
static Histogram
|
||||
create_histogram(std::string_view name,
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
Histogram result;
|
||||
result.p = &perThreadState[std::this_thread::get_id()]
|
||||
.histograms[MetricKey{name, labels}];
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
// DESIGN: Store doubles in atomic<uint64_t> for lock-free operations
|
||||
// - Preserves full IEEE 754 double precision (no truncation)
|
||||
// - Allows atomic load/store without locks
|
||||
// - Safe bit-wise conversion between double and uint64_t
|
||||
template <class U, class V> U reinterpret(V v) {
|
||||
static_assert(sizeof(U) == sizeof(V));
|
||||
static_assert(std::is_arithmetic_v<U>);
|
||||
@@ -121,9 +84,160 @@ template <class U, class V> U reinterpret(V v) {
|
||||
return u;
|
||||
}
|
||||
|
||||
// Family::State structures own the second-level maps (labels -> instances)
|
||||
template <> struct Family<Counter>::State {
|
||||
std::string name;
|
||||
std::string help;
|
||||
|
||||
struct PerThreadState {
|
||||
std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>> instances;
|
||||
};
|
||||
std::unordered_map<std::thread::id, PerThreadState> perThreadState;
|
||||
};
|
||||
|
||||
template <> struct Family<Gauge>::State {
|
||||
std::string name;
|
||||
std::string help;
|
||||
std::unordered_map<LabelsKey, std::unique_ptr<Gauge::State>> instances;
|
||||
};
|
||||
|
||||
template <> struct Family<Histogram>::State {
|
||||
std::string name;
|
||||
std::string help;
|
||||
std::vector<double> buckets;
|
||||
|
||||
struct PerThreadState {
|
||||
std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>> instances;
|
||||
};
|
||||
std::unordered_map<std::thread::id, PerThreadState> perThreadState;
|
||||
};
|
||||
|
||||
// Counter: Thread-local, monotonically increasing, single writer per thread
|
||||
struct Counter::State {
|
||||
AtomicWord value; // Stores double as uint64_t bits
|
||||
friend struct Metric;
|
||||
};
|
||||
|
||||
// Gauge: Global, can increase/decrease, multiple writers (requires mutex)
|
||||
struct Gauge::State {
|
||||
std::mutex mutex;
|
||||
double value; // Plain double, protected by mutex
|
||||
friend struct Metric;
|
||||
};
|
||||
|
||||
// Histogram: Thread-local buckets, single writer per thread
|
||||
struct Histogram::State {
|
||||
std::vector<double>
|
||||
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
||||
std::vector<AtomicWord> counts; // Count per bucket (uint64_t)
|
||||
AtomicWord sum; // Sum of observations (double stored as uint64_t bits)
|
||||
AtomicWord observations; // Total observation count (uint64_t)
|
||||
friend struct Metric;
|
||||
};
|
||||
|
||||
struct Metric {
|
||||
static std::mutex mutex;
|
||||
|
||||
// Two-level map: name -> Family
|
||||
static std::unordered_map<std::string,
|
||||
std::unique_ptr<Family<Counter>::State>>
|
||||
counterFamilies;
|
||||
static std::unordered_map<std::string, std::unique_ptr<Family<Gauge>::State>>
|
||||
gaugeFamilies;
|
||||
static std::unordered_map<std::string,
|
||||
std::unique_ptr<Family<Histogram>::State>>
|
||||
histogramFamilies;
|
||||
|
||||
// Thread cleanup for per-family thread-local storage
|
||||
struct ThreadInit {
|
||||
ThreadInit() {
|
||||
// Thread registration happens lazily when metrics are created
|
||||
}
|
||||
~ThreadInit() {
|
||||
// Clean up this thread's storage from all families
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
auto thread_id = std::this_thread::get_id();
|
||||
|
||||
// Clean up counter families
|
||||
for (auto &[name, family] : counterFamilies) {
|
||||
family->perThreadState.erase(thread_id);
|
||||
}
|
||||
|
||||
// Clean up histogram families
|
||||
for (auto &[name, family] : histogramFamilies) {
|
||||
family->perThreadState.erase(thread_id);
|
||||
}
|
||||
|
||||
// Gauges are global, no per-thread cleanup needed
|
||||
}
|
||||
};
|
||||
static thread_local ThreadInit thread_init;
|
||||
|
||||
// Thread cleanup now handled by ThreadInit RAII
|
||||
|
||||
static Counter create_counter_instance(
|
||||
Family<Counter> *family,
|
||||
const std::vector<std::pair<std::string, std::string>> &labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
LabelsKey key{labels};
|
||||
auto &ptr =
|
||||
family->p->perThreadState[std::this_thread::get_id()].instances[key];
|
||||
if (!ptr) {
|
||||
ptr = std::make_unique<Counter::State>();
|
||||
}
|
||||
Counter result;
|
||||
result.p = ptr.get();
|
||||
return result;
|
||||
}
|
||||
|
||||
static Gauge create_gauge_instance(
|
||||
Family<Gauge> *family,
|
||||
const std::vector<std::pair<std::string, std::string>> &labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
LabelsKey key{labels};
|
||||
auto &ptr = family->p->instances[key];
|
||||
if (!ptr) {
|
||||
ptr = std::make_unique<Gauge::State>();
|
||||
}
|
||||
Gauge result;
|
||||
result.p = ptr.get();
|
||||
return result;
|
||||
}
|
||||
|
||||
static Histogram create_histogram_instance(
|
||||
Family<Histogram> *family,
|
||||
const std::vector<std::pair<std::string, std::string>> &labels) {
|
||||
std::unique_lock<std::mutex> _{mutex};
|
||||
LabelsKey key{labels};
|
||||
auto &ptr =
|
||||
family->p->perThreadState[std::this_thread::get_id()].instances[key];
|
||||
if (!ptr) {
|
||||
ptr = std::make_unique<Histogram::State>();
|
||||
// DESIGN: Prometheus-compatible histogram buckets
|
||||
// Use buckets from family configuration
|
||||
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated
|
||||
|
||||
// DESIGN: std::atomic is not copy-constructible
|
||||
// Initialize vector with correct size, all atomics default to 0
|
||||
ptr->counts = std::vector<AtomicWord>(ptr->thresholds.size());
|
||||
ptr->sum.store(0, std::memory_order_relaxed);
|
||||
ptr->observations.store(0, std::memory_order_relaxed);
|
||||
}
|
||||
Histogram result;
|
||||
result.p = ptr.get();
|
||||
return result;
|
||||
}
|
||||
};
|
||||
|
||||
void Counter::inc(double x) {
|
||||
assert(x >= 0);
|
||||
p->value.fetch_add(x, std::memory_order_relaxed);
|
||||
|
||||
// DESIGN: Single writer per thread allows simple load-modify-store
|
||||
// No CAS loop needed since only one thread writes to this counter
|
||||
auto current_value =
|
||||
reinterpret<double>(p->value.load(std::memory_order_relaxed));
|
||||
p->value.store(reinterpret<uint64_t>(current_value + x),
|
||||
std::memory_order_relaxed);
|
||||
}
|
||||
void Gauge::inc(double x) {
|
||||
std::unique_lock<std::mutex> _{p->mutex};
|
||||
@@ -139,36 +253,107 @@ void Gauge::set(double x) {
|
||||
}
|
||||
void Histogram::observe(double x) {
|
||||
assert(p->thresholds.size() == p->counts.size());
|
||||
|
||||
// Increment bucket counts (cumulative: each bucket counts all values <=
|
||||
// threshold)
|
||||
for (size_t i = 0; i < p->thresholds.size(); ++i) {
|
||||
p->counts[i].fetch_add(x <= p->thresholds[i], std::memory_order_relaxed);
|
||||
}
|
||||
auto sum = reinterpret<double>(p->sum.load(std::memory_order_relaxed));
|
||||
sum += x;
|
||||
p->sum.store(reinterpret<decltype(p->sum.load())>(sum));
|
||||
|
||||
// DESIGN: Single writer per thread allows simple load-modify-store for sum
|
||||
// No CAS loop needed since only one thread writes to this histogram
|
||||
auto current_sum =
|
||||
reinterpret<double>(p->sum.load(std::memory_order_relaxed));
|
||||
p->sum.store(reinterpret<uint64_t>(current_sum + x),
|
||||
std::memory_order_relaxed);
|
||||
|
||||
p->observations.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
template <>
|
||||
Counter Family<Counter>::create(
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
return Metric::create_counter(name, labels);
|
||||
return Metric::create_counter_instance(this, labels);
|
||||
}
|
||||
|
||||
template <>
|
||||
Gauge Family<Gauge>::create(
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
return Metric::create_gauge(name, labels);
|
||||
return Metric::create_gauge_instance(this, labels);
|
||||
}
|
||||
|
||||
template <>
|
||||
Histogram Family<Histogram>::create(
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
return Metric::create_histogram(name, labels);
|
||||
return Metric::create_histogram_instance(this, labels);
|
||||
}
|
||||
|
||||
Family<Counter> create_counter(std::string_view name, std::string_view help) {}
|
||||
Family<Gauge> create_gauge(std::string_view name, std::string_view help) {}
|
||||
Family<Histogram> create_histogram(std::string_view name, std::string_view help,
|
||||
std::initializer_list<double> buckets) {}
|
||||
Family<Counter> create_counter(std::string name, std::string help) {
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
auto &familyPtr = Metric::counterFamilies[name];
|
||||
if (!familyPtr) {
|
||||
familyPtr = std::make_unique<Family<Counter>::State>();
|
||||
familyPtr->name = std::move(name);
|
||||
familyPtr->help = std::move(help);
|
||||
}
|
||||
Family<Counter> family;
|
||||
family.p = familyPtr.get();
|
||||
return family;
|
||||
}
|
||||
|
||||
std::span<std::string_view> render(ArenaAllocator &) {}
|
||||
Family<Gauge> create_gauge(std::string name, std::string help) {
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
auto &familyPtr = Metric::gaugeFamilies[name];
|
||||
if (!familyPtr) {
|
||||
familyPtr = std::make_unique<Family<Gauge>::State>();
|
||||
familyPtr->name = std::move(name);
|
||||
familyPtr->help = std::move(help);
|
||||
}
|
||||
Family<Gauge> family;
|
||||
family.p = familyPtr.get();
|
||||
return family;
|
||||
}
|
||||
|
||||
Family<Histogram> create_histogram(std::string name, std::string help,
|
||||
std::initializer_list<double> buckets) {
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
auto &familyPtr = Metric::histogramFamilies[name];
|
||||
if (!familyPtr) {
|
||||
familyPtr = std::make_unique<Family<Histogram>::State>();
|
||||
familyPtr->name = std::move(name);
|
||||
familyPtr->help = std::move(help);
|
||||
|
||||
// DESIGN: Prometheus-compatible histogram buckets
|
||||
familyPtr->buckets = std::vector<double>(buckets);
|
||||
std::sort(familyPtr->buckets.begin(), familyPtr->buckets.end());
|
||||
familyPtr->buckets.erase(
|
||||
std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()),
|
||||
familyPtr->buckets.end());
|
||||
// +Inf bucket captures all observations (Prometheus requirement)
|
||||
if (familyPtr->buckets.empty() ||
|
||||
familyPtr->buckets.back() != std::numeric_limits<double>::infinity()) {
|
||||
familyPtr->buckets.push_back(std::numeric_limits<double>::infinity());
|
||||
}
|
||||
}
|
||||
Family<Histogram> family;
|
||||
family.p = familyPtr.get();
|
||||
return family;
|
||||
}
|
||||
|
||||
std::span<std::string> render(ArenaAllocator &arena) {
|
||||
// TODO: Implement Prometheus text format rendering
|
||||
static std::string empty_result = "";
|
||||
return std::span<std::string>(&empty_result, 0);
|
||||
}
|
||||
|
||||
// Static member definitions
|
||||
std::mutex Metric::mutex;
|
||||
std::unordered_map<std::string, std::unique_ptr<Family<Counter>::State>>
|
||||
Metric::counterFamilies;
|
||||
std::unordered_map<std::string, std::unique_ptr<Family<Gauge>::State>>
|
||||
Metric::gaugeFamilies;
|
||||
std::unordered_map<std::string, std::unique_ptr<Family<Histogram>::State>>
|
||||
Metric::histogramFamilies;
|
||||
thread_local Metric::ThreadInit Metric::thread_init;
|
||||
|
||||
} // namespace metric
|
||||
|
||||
Reference in New Issue
Block a user