Validation + callback api

This commit is contained in:
2025-08-29 11:31:06 -04:00
parent b6d4ae2862
commit e3a2ddbbfb
2 changed files with 198 additions and 19 deletions

View File

@@ -1,4 +1,5 @@
#include "metric.hpp" #include "metric.hpp"
#include <simdutf.h>
// WeaselDB Metrics System Design: // WeaselDB Metrics System Design:
// //
@@ -21,7 +22,10 @@
#include <atomic> #include <atomic>
#include <bit> #include <bit>
#include <cassert> #include <cassert>
#include <cctype>
#include <cstdint> #include <cstdint>
#include <cstdio>
#include <cstdlib>
#include <limits> #include <limits>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
@@ -33,12 +37,30 @@
namespace metric { namespace metric {
// Validation helper that works in both debug and release builds
static void validate_or_abort(bool condition, const char *message,
const char *value) {
if (!condition) {
std::fprintf(stderr, "WeaselDB metric validation failed: %s: '%s'\n",
message, value);
std::abort();
}
}
// Labels key for second level of map // Labels key for second level of map
struct LabelsKey { struct LabelsKey {
std::vector<std::pair<std::string, std::string>> labels; std::vector<std::pair<std::string, std::string>> labels;
LabelsKey(std::vector<std::pair<std::string, std::string>> l) LabelsKey(std::vector<std::pair<std::string, std::string>> l)
: labels(std::move(l)) { : labels(std::move(l)) {
// Validate all label keys and values
for (const auto &[key, value] : labels) {
validate_or_abort(is_valid_label_key(key), "invalid label key",
key.c_str());
validate_or_abort(is_valid_label_value(value), "invalid label value",
value.c_str());
}
// Sort labels by key for Prometheus compatibility // Sort labels by key for Prometheus compatibility
std::sort(labels.begin(), labels.end(), std::sort(labels.begin(), labels.end(),
[](const auto &a, const auto &b) { return a.first < b.first; }); [](const auto &a, const auto &b) { return a.first < b.first; });
@@ -85,12 +107,18 @@ template <> struct Family<Counter>::State {
std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>> instances; std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>> instances;
}; };
std::unordered_map<std::thread::id, PerThreadState> perThreadState; std::unordered_map<std::thread::id, PerThreadState> perThreadState;
// Callback-based metrics (global, not per-thread)
std::unordered_map<LabelsKey, MetricCallback<Counter>> callbacks;
}; };
template <> struct Family<Gauge>::State { template <> struct Family<Gauge>::State {
std::string name; std::string name;
std::string help; std::string help;
std::unordered_map<LabelsKey, std::unique_ptr<Gauge::State>> instances; std::unordered_map<LabelsKey, std::unique_ptr<Gauge::State>> instances;
// Callback-based metrics
std::unordered_map<LabelsKey, MetricCallback<Gauge>> callbacks;
}; };
template <> struct Family<Histogram>::State { template <> struct Family<Histogram>::State {
@@ -102,6 +130,8 @@ template <> struct Family<Histogram>::State {
std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>> instances; std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>> instances;
}; };
std::unordered_map<std::thread::id, PerThreadState> perThreadState; std::unordered_map<std::thread::id, PerThreadState> perThreadState;
// Note: No callbacks map - histograms don't support callback-based metrics
}; };
// Counter: Thread-local, monotonically increasing, single writer per thread // Counter: Thread-local, monotonically increasing, single writer per thread
@@ -110,10 +140,9 @@ struct Counter::State {
friend struct Metric; friend struct Metric;
}; };
// Gauge: Global, can increase/decrease, multiple writers (requires mutex) // Gauge: Global, can increase/decrease, multiple writers (uses atomic CAS)
struct Gauge::State { struct Gauge::State {
std::mutex mutex; AtomicWord value; // Stores double as uint64_t bits, lock-free
double value; // Plain double, protected by mutex
friend struct Metric; friend struct Metric;
}; };
@@ -172,6 +201,13 @@ struct Metric {
const std::vector<std::pair<std::string, std::string>> &labels) { const std::vector<std::pair<std::string, std::string>> &labels) {
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
LabelsKey key{labels}; LabelsKey key{labels};
// Validate that labels aren't already registered as callback
validate_or_abort(
family->p->callbacks.find(key) == family->p->callbacks.end(),
"labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str());
auto &ptr = auto &ptr =
family->p->perThreadState[std::this_thread::get_id()].instances[key]; family->p->perThreadState[std::this_thread::get_id()].instances[key];
if (!ptr) { if (!ptr) {
@@ -188,10 +224,17 @@ struct Metric {
const std::vector<std::pair<std::string, std::string>> &labels) { const std::vector<std::pair<std::string, std::string>> &labels) {
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
LabelsKey key{labels}; LabelsKey key{labels};
// Validate that labels aren't already registered as callback
validate_or_abort(
family->p->callbacks.find(key) == family->p->callbacks.end(),
"labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str());
auto &ptr = family->p->instances[key]; auto &ptr = family->p->instances[key];
if (!ptr) { if (!ptr) {
ptr = std::make_unique<Gauge::State>(); ptr = std::make_unique<Gauge::State>();
ptr->value = 0.0; ptr->value.store(0, std::memory_order_relaxed);
} }
Gauge result; Gauge result;
result.p = ptr.get(); result.p = ptr.get();
@@ -228,28 +271,47 @@ struct Metric {
}; };
void Counter::inc(double x) { void Counter::inc(double x) {
assert(x >= 0); validate_or_abort(x >= 0, "counter increment must be >= 0",
std::to_string(x).c_str());
// DESIGN: Single writer per thread allows simple load-modify-store // DESIGN: Single writer per thread allows simple load-modify-store
// No CAS loop needed since only one thread writes to this counter // No CAS loop needed since only one thread writes to this counter
auto current_value = auto current_value =
std::bit_cast<double>(p->value.load(std::memory_order_relaxed)); std::bit_cast<double>(p->value.load(std::memory_order_relaxed));
p->value.store(std::bit_cast<uint64_t>(current_value + x), auto new_value = current_value + x;
std::memory_order_relaxed);
// Validate monotonic property (counter never decreases)
validate_or_abort(new_value >= current_value,
"counter value overflow/wraparound detected",
std::to_string(new_value).c_str());
p->value.store(std::bit_cast<uint64_t>(new_value), std::memory_order_relaxed);
} }
void Gauge::inc(double x) { void Gauge::inc(double x) {
// IMPLEMENTATION DETAIL: Mutex protection used internally for thread safety, // Lock-free increment using CAS loop
// but API contract remains single-writer per instance uint64_t expected = p->value.load(std::memory_order_relaxed);
std::unique_lock<std::mutex> _{p->mutex}; uint64_t desired;
p->value += x; do {
double current_value = std::bit_cast<double>(expected);
double new_value = current_value + x;
desired = std::bit_cast<uint64_t>(new_value);
} while (!p->value.compare_exchange_weak(expected, desired,
std::memory_order_relaxed));
} }
void Gauge::dec(double x) { void Gauge::dec(double x) {
std::unique_lock<std::mutex> _{p->mutex}; // Lock-free decrement using CAS loop
p->value -= x; uint64_t expected = p->value.load(std::memory_order_relaxed);
uint64_t desired;
do {
double current_value = std::bit_cast<double>(expected);
double new_value = current_value - x;
desired = std::bit_cast<uint64_t>(new_value);
} while (!p->value.compare_exchange_weak(expected, desired,
std::memory_order_relaxed));
} }
void Gauge::set(double x) { void Gauge::set(double x) {
std::unique_lock<std::mutex> _{p->mutex}; // Simple atomic store for set operation
p->value = x; p->value.store(std::bit_cast<uint64_t>(x), std::memory_order_relaxed);
} }
void Histogram::observe(double x) { void Histogram::observe(double x) {
assert(p->thresholds.size() == p->counts.size()); assert(p->thresholds.size() == p->counts.size());
@@ -289,6 +351,9 @@ Histogram Family<Histogram>::create(
} }
Family<Counter> create_counter(std::string name, std::string help) { Family<Counter> create_counter(std::string name, std::string help) {
validate_or_abort(is_valid_metric_name(name), "invalid counter name",
name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::counterFamilies[name]; auto &familyPtr = Metric::counterFamilies[name];
if (!familyPtr) { if (!familyPtr) {
@@ -302,6 +367,9 @@ Family<Counter> create_counter(std::string name, std::string help) {
} }
Family<Gauge> create_gauge(std::string name, std::string help) { Family<Gauge> create_gauge(std::string name, std::string help) {
validate_or_abort(is_valid_metric_name(name), "invalid gauge name",
name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::gaugeFamilies[name]; auto &familyPtr = Metric::gaugeFamilies[name];
if (!familyPtr) { if (!familyPtr) {
@@ -316,6 +384,9 @@ Family<Gauge> create_gauge(std::string name, std::string help) {
Family<Histogram> create_histogram(std::string name, std::string help, Family<Histogram> create_histogram(std::string name, std::string help,
std::initializer_list<double> buckets) { std::initializer_list<double> buckets) {
validate_or_abort(is_valid_metric_name(name), "invalid histogram name",
name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::histogramFamilies[name]; auto &familyPtr = Metric::histogramFamilies[name];
if (!familyPtr) { if (!familyPtr) {
@@ -340,10 +411,69 @@ Family<Histogram> create_histogram(std::string name, std::string help,
return family; return family;
} }
std::span<std::string> render(ArenaAllocator &arena) { // Prometheus validation functions
// Metric names must match [a-zA-Z_:][a-zA-Z0-9_:]*
bool is_valid_metric_name(const std::string &name) {
if (name.empty())
return false;
// First character must be letter, underscore, or colon
char first = name[0];
if (!std::isalpha(first) && first != '_' && first != ':') {
return false;
}
// Remaining characters must be alphanumeric, underscore, or colon
for (size_t i = 1; i < name.size(); ++i) {
char c = name[i];
if (!std::isalnum(c) && c != '_' && c != ':') {
return false;
}
}
return true;
}
// Label keys must match [a-zA-Z_][a-zA-Z0-9_]*
bool is_valid_label_key(const std::string &key) {
if (key.empty())
return false;
// First character must be letter or underscore
char first = key[0];
if (!std::isalpha(first) && first != '_') {
return false;
}
// Remaining characters must be alphanumeric or underscore
for (size_t i = 1; i < key.size(); ++i) {
char c = key[i];
if (!std::isalnum(c) && c != '_') {
return false;
}
}
// Label keys starting with __ are reserved for internal use
if (key.size() >= 2 && key[0] == '_' && key[1] == '_') {
return false;
}
return true;
}
// Label values can contain any UTF-8 characters (no specific restrictions)
bool is_valid_label_value(const std::string &value) {
// Prometheus allows any UTF-8 string as label value
// Validate UTF-8 encoding for correctness using simdutf
return simdutf::validate_utf8(value.c_str(), value.size());
}
std::span<std::string_view> render(ArenaAllocator &arena) {
// TODO: Implement Prometheus text format rendering // TODO: Implement Prometheus text format rendering
static std::string empty_result = ""; // All string data should be allocated in the arena and returned as
return std::span<std::string>(&empty_result, 0); // string_views
static std::string_view empty_result = "";
return std::span<std::string_view>(&empty_result, 0);
} }
// Static member definitions // Static member definitions

View File

@@ -8,12 +8,23 @@
// - Single-writer semantics: Each metric instance bound to creating thread // - Single-writer semantics: Each metric instance bound to creating thread
// - Lock-free operations using atomic<uint64_t> storage for doubles // - Lock-free operations using atomic<uint64_t> storage for doubles
// - Full IEEE 754 double precision preservation via bit reinterpretation // - Full IEEE 754 double precision preservation via bit reinterpretation
// - Single global registry: All metrics registered in one global namespace
// //
// CRITICAL THREAD SAFETY CONSTRAINT: // CRITICAL THREAD SAFETY CONSTRAINT:
// Each metric instance has exactly ONE writer thread (the creating thread). // Each metric instance has exactly ONE writer thread (the creating thread).
// It is undefined behavior to call inc()/dec()/set()/observe() from a different // It is undefined behavior to call inc()/dec()/set()/observe() from a different
// thread. // thread.
// //
// REGISTRY MODEL:
// This implementation uses a single global registry for all metrics, unlike
// typical Prometheus client libraries that support multiple registries.
// This design choice prioritizes simplicity and performance over flexibility.
//
// METRIC LIFECYCLE:
// Metrics are created once and persist for the application lifetime. There is
// no unregistration mechanism - this prevents accidental metric loss and
// simplifies the implementation.
//
// USAGE: // USAGE:
// auto counter_family = metric::create_counter("requests_total", "Total // auto counter_family = metric::create_counter("requests_total", "Total
// requests"); auto counter = counter_family.create({{"method", "GET"}}); // // requests"); auto counter = counter_family.create({{"method", "GET"}}); //
@@ -100,6 +111,8 @@ template <class T> struct Family {
// Create metric instance with specific labels // Create metric instance with specific labels
// Labels are sorted by key for Prometheus compatibility // Labels are sorted by key for Prometheus compatibility
// ERROR: Will abort if labels already registered via register_callback()
// OK: Multiple calls with same labels return same instance (idempotent)
T create(std::vector<std::pair<std::string, std::string>> labels); T create(std::vector<std::pair<std::string, std::string>> labels);
private: private:
@@ -129,6 +142,42 @@ Family<Histogram> create_histogram(std::string name, std::string help,
std::initializer_list<double> buckets); std::initializer_list<double> buckets);
// Render all metrics in Prometheus text format // Render all metrics in Prometheus text format
std::span<std::string> render(ArenaAllocator &arena); // TODO: Implement Prometheus text exposition format
// THREAD SAFETY: Serialized by global mutex - callbacks need not be thread-safe
std::span<std::string_view> render(ArenaAllocator &arena);
// Validation functions for Prometheus compatibility
bool is_valid_metric_name(const std::string &name);
bool is_valid_label_key(const std::string &key);
bool is_valid_label_value(const std::string &value);
// Callback function type for dynamic metric values
// Called during render() to get current metric value
// THREAD SAFETY: May be called from arbitrary thread, but serialized by
// render() mutex
// - no need to be thread-safe internally
template <typename T> using MetricCallback = std::function<double()>;
// Register callback-based metric to Family
// Validates that label set isn't already taken by either:
// - A previous register_callback() call (callbacks must be unique)
// - A create() call (static and callback metrics cannot coexist for same
// labels)
//
// Similarly, create() will validate that label set isn't already registered as
// callback Note: create() can be called multiple times with same labels
// (returns same instance)
template <>
void Family<Counter>::register_callback(
std::vector<std::pair<std::string, std::string>> labels,
MetricCallback<Counter> callback);
template <>
void Family<Gauge>::register_callback(
std::vector<std::pair<std::string, std::string>> labels,
MetricCallback<Gauge> callback);
// Note: Histograms do not support callbacks due to their multi-value nature
// (buckets + sum + count). Use static histogram metrics only.
} // namespace metric } // namespace metric