Metrics implementation, WIP
This commit is contained in:
378
src/metric.cpp
378
src/metric.cpp
@@ -1,6 +1,29 @@
|
||||
#include "metric.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <bit>
|
||||
#include <cassert>
|
||||
#include <cctype>
|
||||
#include <cmath>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <cstring>
|
||||
#include <functional>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include <simdutf.h>
|
||||
|
||||
#include "format.hpp"
|
||||
|
||||
// WeaselDB Metrics System Design:
|
||||
//
|
||||
// THREADING MODEL:
|
||||
@@ -18,23 +41,6 @@
|
||||
// - Global metrics (gauges) persist for application lifetime
|
||||
// - Histogram buckets are sorted, deduplicated, and include +Inf bucket
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <bit>
|
||||
#include <cassert>
|
||||
#include <cctype>
|
||||
#include <cstdint>
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
#include <limits>
|
||||
#include <memory>
|
||||
#include <mutex>
|
||||
#include <string>
|
||||
#include <thread>
|
||||
#include <type_traits>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
namespace metric {
|
||||
|
||||
// Validation helper that works in both debug and release builds
|
||||
@@ -76,7 +82,15 @@ struct LabelsKey {
|
||||
namespace std {
|
||||
template <> struct hash<metric::LabelsKey> {
|
||||
std::size_t operator()(const metric::LabelsKey &k) const {
|
||||
return std::hash<decltype(k.labels)>{}(k.labels);
|
||||
std::size_t hash_value = 0;
|
||||
for (const auto &[key, value] : k.labels) {
|
||||
// Combine hashes using a simple but effective method
|
||||
hash_value ^= std::hash<std::string>{}(key) + 0x9e3779b9 +
|
||||
(hash_value << 6) + (hash_value >> 2);
|
||||
hash_value ^= std::hash<std::string>{}(value) + 0x9e3779b9 +
|
||||
(hash_value << 6) + (hash_value >> 2);
|
||||
}
|
||||
return hash_value;
|
||||
}
|
||||
};
|
||||
} // namespace std
|
||||
@@ -262,10 +276,9 @@ struct Metric {
|
||||
}
|
||||
};
|
||||
|
||||
void Counter::inc(double x) {
|
||||
validate_or_abort(x >= 0, "counter increment must be >= 0",
|
||||
std::to_string(x).c_str());
|
||||
Counter::Counter() = default;
|
||||
|
||||
void Counter::inc(double x) {
|
||||
// DESIGN: Single writer per thread allows simple load-modify-store
|
||||
// No CAS loop needed since only one thread writes to this counter
|
||||
auto current_value =
|
||||
@@ -273,12 +286,16 @@ void Counter::inc(double x) {
|
||||
auto new_value = current_value + x;
|
||||
|
||||
// Validate monotonic property (counter never decreases)
|
||||
validate_or_abort(new_value >= current_value,
|
||||
"counter value overflow/wraparound detected",
|
||||
std::to_string(new_value).c_str());
|
||||
if (new_value < current_value) [[unlikely]] {
|
||||
validate_or_abort(false, "counter value overflow/wraparound detected",
|
||||
std::to_string(new_value).c_str());
|
||||
}
|
||||
|
||||
p->value.store(std::bit_cast<uint64_t>(new_value), std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
Gauge::Gauge() = default;
|
||||
|
||||
void Gauge::inc(double x) {
|
||||
// Lock-free increment using CAS loop
|
||||
uint64_t expected = p->value.load(std::memory_order_relaxed);
|
||||
@@ -305,6 +322,9 @@ void Gauge::set(double x) {
|
||||
// Simple atomic store for set operation
|
||||
p->value.store(std::bit_cast<uint64_t>(x), std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
Histogram::Histogram() = default;
|
||||
|
||||
void Histogram::observe(double x) {
|
||||
assert(p->thresholds.size() == p->counts.size());
|
||||
|
||||
@@ -324,6 +344,10 @@ void Histogram::observe(double x) {
|
||||
p->observations.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
template <> Family<Counter>::Family() = default;
|
||||
template <> Family<Gauge>::Family() = default;
|
||||
template <> Family<Histogram>::Family() = default;
|
||||
|
||||
template <>
|
||||
Counter Family<Counter>::create(
|
||||
std::vector<std::pair<std::string, std::string>> labels) {
|
||||
@@ -352,6 +376,11 @@ Family<Counter> create_counter(std::string name, std::string help) {
|
||||
familyPtr = std::make_unique<Family<Counter>::State>();
|
||||
familyPtr->name = std::move(name);
|
||||
familyPtr->help = std::move(help);
|
||||
} else {
|
||||
validate_or_abort(
|
||||
familyPtr->help == help,
|
||||
"metric family already registered with different help text",
|
||||
name.c_str());
|
||||
}
|
||||
Family<Counter> family;
|
||||
family.p = familyPtr.get();
|
||||
@@ -368,6 +397,11 @@ Family<Gauge> create_gauge(std::string name, std::string help) {
|
||||
familyPtr = std::make_unique<Family<Gauge>::State>();
|
||||
familyPtr->name = std::move(name);
|
||||
familyPtr->help = std::move(help);
|
||||
} else {
|
||||
validate_or_abort(
|
||||
familyPtr->help == help,
|
||||
"metric family already registered with different help text",
|
||||
name.c_str());
|
||||
}
|
||||
Family<Gauge> family;
|
||||
family.p = familyPtr.get();
|
||||
@@ -375,7 +409,7 @@ Family<Gauge> create_gauge(std::string name, std::string help) {
|
||||
}
|
||||
|
||||
Family<Histogram> create_histogram(std::string name, std::string help,
|
||||
std::initializer_list<double> buckets) {
|
||||
std::span<const double> buckets) {
|
||||
validate_or_abort(is_valid_metric_name(name), "invalid histogram name",
|
||||
name.c_str());
|
||||
|
||||
@@ -387,7 +421,7 @@ Family<Histogram> create_histogram(std::string name, std::string help,
|
||||
familyPtr->help = std::move(help);
|
||||
|
||||
// DESIGN: Prometheus-compatible histogram buckets
|
||||
familyPtr->buckets = std::vector<double>(buckets);
|
||||
familyPtr->buckets = std::vector<double>(buckets.begin(), buckets.end());
|
||||
std::sort(familyPtr->buckets.begin(), familyPtr->buckets.end());
|
||||
familyPtr->buckets.erase(
|
||||
std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()),
|
||||
@@ -397,12 +431,66 @@ Family<Histogram> create_histogram(std::string name, std::string help,
|
||||
familyPtr->buckets.back() != std::numeric_limits<double>::infinity()) {
|
||||
familyPtr->buckets.push_back(std::numeric_limits<double>::infinity());
|
||||
}
|
||||
} else {
|
||||
validate_or_abort(
|
||||
familyPtr->help == help,
|
||||
"metric family already registered with different help text",
|
||||
name.c_str());
|
||||
std::vector<double> new_buckets_vec(buckets.begin(), buckets.end());
|
||||
std::sort(new_buckets_vec.begin(), new_buckets_vec.end());
|
||||
new_buckets_vec.erase(
|
||||
std::unique(new_buckets_vec.begin(), new_buckets_vec.end()),
|
||||
new_buckets_vec.end());
|
||||
if (new_buckets_vec.empty() ||
|
||||
new_buckets_vec.back() != std::numeric_limits<double>::infinity()) {
|
||||
new_buckets_vec.push_back(std::numeric_limits<double>::infinity());
|
||||
}
|
||||
validate_or_abort(familyPtr->buckets == new_buckets_vec,
|
||||
"metric family already registered with different buckets",
|
||||
name.c_str());
|
||||
}
|
||||
Family<Histogram> family;
|
||||
family.p = familyPtr.get();
|
||||
return family;
|
||||
}
|
||||
|
||||
std::vector<double> linear_buckets(double start, double width, int count) {
|
||||
validate_or_abort(width > 0, "linear bucket width must be positive",
|
||||
std::to_string(width).c_str());
|
||||
validate_or_abort(count >= 0, "linear bucket count must be non-negative",
|
||||
std::to_string(count).c_str());
|
||||
|
||||
std::vector<double> buckets;
|
||||
buckets.reserve(count);
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
buckets.push_back(start + i * width);
|
||||
}
|
||||
|
||||
return buckets;
|
||||
}
|
||||
|
||||
std::vector<double> exponential_buckets(double start, double factor,
|
||||
int count) {
|
||||
validate_or_abort(start > 0, "exponential bucket start must be positive",
|
||||
std::to_string(start).c_str());
|
||||
validate_or_abort(factor > 1, "exponential bucket factor must be > 1",
|
||||
std::to_string(factor).c_str());
|
||||
validate_or_abort(count >= 0, "exponential bucket count must be non-negative",
|
||||
std::to_string(count).c_str());
|
||||
|
||||
std::vector<double> buckets;
|
||||
buckets.reserve(count);
|
||||
|
||||
double current = start;
|
||||
for (int i = 0; i < count; ++i) {
|
||||
buckets.push_back(current);
|
||||
current *= factor;
|
||||
}
|
||||
|
||||
return buckets;
|
||||
}
|
||||
|
||||
// Prometheus validation functions
|
||||
// Metric names must match [a-zA-Z_:][a-zA-Z0-9_:]*
|
||||
bool is_valid_metric_name(const std::string &name) {
|
||||
@@ -461,13 +549,241 @@ bool is_valid_label_value(const std::string &value) {
|
||||
}
|
||||
|
||||
std::span<std::string_view> render(ArenaAllocator &arena) {
|
||||
// TODO: Implement Prometheus text format rendering
|
||||
// All string data should be allocated in the arena and returned as
|
||||
// string_views
|
||||
static std::string_view empty_result = "";
|
||||
return std::span<std::string_view>(&empty_result, 0);
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
|
||||
std::vector<std::string_view> output;
|
||||
|
||||
auto format_labels =
|
||||
[&](const std::vector<std::pair<std::string_view, std::string_view>>
|
||||
&labels) -> std::string_view {
|
||||
if (labels.empty()) {
|
||||
return "";
|
||||
}
|
||||
|
||||
size_t required_size = 2; // {}
|
||||
for (const auto &[key, value] : labels) {
|
||||
required_size += key.length() + 3 + value.length(); // key="value"
|
||||
for (char c : value) {
|
||||
if (c == '\\' || c == '"' || c == '\n') {
|
||||
required_size++;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!labels.empty()) {
|
||||
required_size += labels.size() - 1; // commas
|
||||
}
|
||||
|
||||
char *buf = arena.allocate<char>(required_size);
|
||||
char *p = buf;
|
||||
|
||||
*p++ = '{';
|
||||
for (size_t i = 0; i < labels.size(); ++i) {
|
||||
if (i > 0)
|
||||
*p++ = ',';
|
||||
std::memcpy(p, labels[i].first.data(), labels[i].first.length());
|
||||
p += labels[i].first.length();
|
||||
*p++ = '=';
|
||||
*p++ = '"';
|
||||
for (char c : labels[i].second) {
|
||||
switch (c) {
|
||||
case '\\':
|
||||
*p++ = '\\';
|
||||
*p++ = '\\';
|
||||
break;
|
||||
case '"':
|
||||
*p++ = '\\';
|
||||
*p++ = '"';
|
||||
break;
|
||||
case '\n':
|
||||
*p++ = '\\';
|
||||
*p++ = 'n';
|
||||
break;
|
||||
default:
|
||||
*p++ = c;
|
||||
break;
|
||||
}
|
||||
}
|
||||
*p++ = '"';
|
||||
}
|
||||
*p++ = '}';
|
||||
return std::string_view(buf, p - buf);
|
||||
};
|
||||
|
||||
// Render counters
|
||||
for (const auto &[name, family] : Metric::counterFamilies) {
|
||||
output.push_back(
|
||||
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str()));
|
||||
output.push_back(format(arena, "# TYPE %s counter\n", name.c_str()));
|
||||
|
||||
std::vector<std::pair<std::string_view, std::string_view>> labels_sv;
|
||||
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||
auto value = callback();
|
||||
labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
labels_sv.push_back(l);
|
||||
auto labels = format_labels(labels_sv);
|
||||
output.push_back(format(arena, "%.*s%.*s %.17g\n",
|
||||
static_cast<int>(name.length()), name.data(),
|
||||
static_cast<int>(labels.length()), labels.data(),
|
||||
value));
|
||||
}
|
||||
|
||||
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||
auto value = std::bit_cast<double>(
|
||||
instance->value.load(std::memory_order_relaxed));
|
||||
labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
labels_sv.push_back(l);
|
||||
auto labels = format_labels(labels_sv);
|
||||
output.push_back(format(arena, "%.*s%.*s %.17g\n",
|
||||
static_cast<int>(name.length()), name.data(),
|
||||
static_cast<int>(labels.length()),
|
||||
labels.data(), value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Render gauges
|
||||
for (const auto &[name, family] : Metric::gaugeFamilies) {
|
||||
output.push_back(
|
||||
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str()));
|
||||
output.push_back(format(arena, "# TYPE %s gauge\n", name.c_str()));
|
||||
|
||||
std::vector<std::pair<std::string_view, std::string_view>> labels_sv;
|
||||
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||
auto value = callback();
|
||||
labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
labels_sv.push_back(l);
|
||||
auto labels = format_labels(labels_sv);
|
||||
output.push_back(format(arena, "%.*s%.*s %.17g\n",
|
||||
static_cast<int>(name.length()), name.data(),
|
||||
static_cast<int>(labels.length()), labels.data(),
|
||||
value));
|
||||
}
|
||||
|
||||
for (const auto &[labels_key, instance] : family->instances) {
|
||||
auto value = std::bit_cast<double>(
|
||||
instance->value.load(std::memory_order_relaxed));
|
||||
labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
labels_sv.push_back(l);
|
||||
auto labels = format_labels(labels_sv);
|
||||
output.push_back(format(arena, "%.*s%.*s %.17g\n",
|
||||
static_cast<int>(name.length()), name.data(),
|
||||
static_cast<int>(labels.length()), labels.data(),
|
||||
value));
|
||||
}
|
||||
}
|
||||
|
||||
// Render histograms
|
||||
for (const auto &[name, family] : Metric::histogramFamilies) {
|
||||
output.push_back(
|
||||
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str()));
|
||||
output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str()));
|
||||
|
||||
std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv;
|
||||
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
|
||||
bucket_labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
bucket_labels_sv.push_back(l);
|
||||
|
||||
if (std::isinf(instance->thresholds[i])) {
|
||||
bucket_labels_sv.push_back({"le", "+Inf"});
|
||||
} else {
|
||||
bucket_labels_sv.push_back(
|
||||
{"le", format(arena, "%.17g", instance->thresholds[i])});
|
||||
}
|
||||
auto count = instance->counts[i].load(std::memory_order_relaxed);
|
||||
auto labels = format_labels(bucket_labels_sv);
|
||||
output.push_back(format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
||||
static_cast<int>(labels.length()),
|
||||
labels.data(),
|
||||
static_cast<unsigned long long>(count)));
|
||||
}
|
||||
|
||||
auto sum_value = std::bit_cast<double>(
|
||||
instance->sum.load(std::memory_order_relaxed));
|
||||
bucket_labels_sv.clear();
|
||||
for (const auto &l : labels_key.labels)
|
||||
bucket_labels_sv.push_back(l);
|
||||
auto labels = format_labels(bucket_labels_sv);
|
||||
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(),
|
||||
static_cast<int>(labels.length()),
|
||||
labels.data(), sum_value));
|
||||
|
||||
auto count_value =
|
||||
instance->observations.load(std::memory_order_relaxed);
|
||||
output.push_back(format(arena, "%s_count%.*s %llu\n", name.c_str(),
|
||||
static_cast<int>(labels.length()),
|
||||
labels.data(),
|
||||
static_cast<unsigned long long>(count_value)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto result = arena.allocate<std::string_view>(output.size());
|
||||
std::copy(output.begin(), output.end(), result);
|
||||
return std::span<std::string_view>(result, output.size());
|
||||
}
|
||||
|
||||
// Template specialization implementations for register_callback
|
||||
template <>
|
||||
void Family<Counter>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>> labels,
|
||||
MetricCallback<Counter> callback) {
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
LabelsKey key{std::move(labels)};
|
||||
|
||||
// Validate that labels aren't already in use by create() calls
|
||||
for (const auto &[thread_id, per_thread] : p->perThreadState) {
|
||||
validate_or_abort(
|
||||
per_thread.instances.find(key) == per_thread.instances.end(),
|
||||
"labels already registered as static instance",
|
||||
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str());
|
||||
}
|
||||
|
||||
// Validate that callback isn't already registered for these labels
|
||||
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
|
||||
"callback already registered for labels",
|
||||
key.labels.empty() ? "(no labels)"
|
||||
: key.labels[0].first.c_str());
|
||||
|
||||
p->callbacks[std::move(key)] = std::move(callback);
|
||||
}
|
||||
|
||||
template <>
|
||||
void Family<Gauge>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>> labels,
|
||||
MetricCallback<Gauge> callback) {
|
||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||
LabelsKey key{std::move(labels)};
|
||||
|
||||
// Validate that labels aren't already in use by create() calls
|
||||
validate_or_abort(p->instances.find(key) == p->instances.end(),
|
||||
"labels already registered as static instance",
|
||||
key.labels.empty() ? "(no labels)"
|
||||
: key.labels[0].first.c_str());
|
||||
|
||||
// Validate that callback isn't already registered for these labels
|
||||
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
|
||||
"callback already registered for labels",
|
||||
key.labels.empty() ? "(no labels)"
|
||||
: key.labels[0].first.c_str());
|
||||
|
||||
p->callbacks[std::move(key)] = std::move(callback);
|
||||
}
|
||||
|
||||
// Explicit template instantiations to provide member implementations
|
||||
template void Family<Counter>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>>, MetricCallback<Counter>);
|
||||
|
||||
template void Family<Gauge>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>>, MetricCallback<Gauge>);
|
||||
|
||||
// Static member definitions
|
||||
std::mutex Metric::mutex;
|
||||
std::unordered_map<std::string, std::unique_ptr<Family<Counter>::State>>
|
||||
|
||||
Reference in New Issue
Block a user