Use Arena's to manage Metric memory where appropriate

This commit is contained in:
2025-08-31 11:54:17 -04:00
parent b52d6e5a13
commit 93ccd2eb71
3 changed files with 416 additions and 205 deletions

View File

@@ -637,6 +637,14 @@ template <typename T> struct ArenaVector {
T &operator[](size_t index) { return data_[index]; } T &operator[](size_t index) { return data_[index]; }
const T &operator[](size_t index) const { return data_[index]; } const T &operator[](size_t index) const { return data_[index]; }
void clear() { size_ = 0; }
// Iterator support for range-based for loops
T *begin() { return data_; }
const T *begin() const { return data_; }
T *end() { return data_ + size_; }
const T *end() const { return data_ + size_; }
// No destructor - arena cleanup handles memory // No destructor - arena cleanup handles memory
private: private:

View File

@@ -11,24 +11,19 @@
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
#include <functional> #include <functional>
#include <memory>
#include <mutex> #include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
#include <type_traits>
#include <unordered_map> #include <unordered_map>
#include <vector> #include <vector>
#include <immintrin.h> #include <immintrin.h>
#include <simdutf.h> #include <simdutf.h>
#include "arena_allocator.hpp"
#include "format.hpp" #include "format.hpp"
// Verify that malloc provides sufficient alignment for atomic 128-bit
// operations
static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16,
"Default new alignment must be at least 16 bytes for atomic "
"128-bit stores");
// WeaselDB Metrics System Design: // WeaselDB Metrics System Design:
// //
// THREADING MODEL: // THREADING MODEL:
@@ -69,27 +64,53 @@ static void validate_or_abort(bool condition, const char *message,
} }
} }
// Labels key for second level of map // Helper to copy a string into arena memory
struct LabelsKey { static std::string_view arena_copy_string(const std::string &str,
std::vector<std::pair<std::string, std::string>> labels; ArenaAllocator &arena) {
if (str.empty()) {
return std::string_view{};
}
char *copied = arena.allocate<char>(str.size() + 1);
std::memcpy(copied, str.c_str(), str.size());
copied[str.size()] = '\0';
return std::string_view(copied, str.size());
}
LabelsKey(std::vector<std::pair<std::string, std::string>> l) // Arena-based labels key for second level of map
: labels(std::move(l)) { // Uses string_view to point to arena-allocated strings
// Validate all label keys and values struct LabelsKey {
for (const auto &[key, value] : labels) { ArenaVector<std::pair<std::string_view, std::string_view>> labels;
LabelsKey(const std::vector<std::pair<std::string, std::string>> &l,
ArenaAllocator &arena)
: labels(&arena) {
// Copy and validate all label keys and values into arena
for (const auto &[key, value] : l) {
validate_or_abort(is_valid_label_key(key), "invalid label key", validate_or_abort(is_valid_label_key(key), "invalid label key",
key.c_str()); key.c_str());
validate_or_abort(is_valid_label_value(value), "invalid label value", validate_or_abort(is_valid_label_value(value), "invalid label value",
value.c_str()); value.c_str());
auto key_view = arena_copy_string(key, arena);
auto value_view = arena_copy_string(value, arena);
labels.push_back({key_view, value_view});
} }
// Sort labels by key for Prometheus compatibility // Sort labels by key for Prometheus compatibility
std::sort(labels.begin(), labels.end(), std::sort(labels.data(), labels.data() + labels.size(),
[](const auto &a, const auto &b) { return a.first < b.first; }); [](const auto &a, const auto &b) { return a.first < b.first; });
} }
bool operator==(const LabelsKey &other) const { bool operator==(const LabelsKey &other) const {
return labels == other.labels; if (labels.size() != other.labels.size())
return false;
for (size_t i = 0; i < labels.size(); ++i) {
if (labels[i].first != other.labels[i].first ||
labels[i].second != other.labels[i].second) {
return false;
}
}
return true;
} }
}; };
@@ -99,11 +120,12 @@ namespace std {
template <> struct hash<metric::LabelsKey> { template <> struct hash<metric::LabelsKey> {
std::size_t operator()(const metric::LabelsKey &k) const { std::size_t operator()(const metric::LabelsKey &k) const {
std::size_t hash_value = 0; std::size_t hash_value = 0;
for (const auto &[key, value] : k.labels) { for (size_t i = 0; i < k.labels.size(); ++i) {
const auto &[key, value] = k.labels[i];
// Combine hashes using a simple but effective method // Combine hashes using a simple but effective method
hash_value ^= std::hash<std::string>{}(key) + 0x9e3779b9 + hash_value ^= std::hash<std::string_view>{}(key) + 0x9e3779b9 +
(hash_value << 6) + (hash_value >> 2); (hash_value << 6) + (hash_value >> 2);
hash_value ^= std::hash<std::string>{}(value) + 0x9e3779b9 + hash_value ^= std::hash<std::string_view>{}(value) + 0x9e3779b9 +
(hash_value << 6) + (hash_value >> 2); (hash_value << 6) + (hash_value >> 2);
} }
return hash_value; return hash_value;
@@ -120,45 +142,82 @@ namespace metric {
// Family::State structures own the second-level maps (labels -> instances) // Family::State structures own the second-level maps (labels -> instances)
template <> struct Family<Counter>::State { template <> struct Family<Counter>::State {
std::string name; std::string_view name;
std::string help; std::string_view help;
struct PerThreadState { struct PerThreadState {
std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>> instances; std::unordered_map<LabelsKey, Counter::State *> instances;
}; };
std::unordered_map<std::thread::id, PerThreadState> per_thread_state; std::unordered_map<std::thread::id, PerThreadState> per_thread_state;
// Global accumulation state for destroyed threads // Global accumulation state for destroyed threads
std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>> std::unordered_map<
LabelsKey, Counter::State *, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>>
global_accumulated_values; global_accumulated_values;
// Callback-based metrics (global, not per-thread) // Callback-based metrics (global, not per-thread)
std::unordered_map<LabelsKey, MetricCallback<Counter>> callbacks; std::unordered_map<
LabelsKey, MetricCallback<Counter>, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Counter>>>>
callbacks;
State(ArenaAllocator &arena)
: global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Counter::State *>>(
&arena)),
callbacks(
ArenaStlAllocator<
std::pair<const LabelsKey, MetricCallback<Counter>>>(&arena)) {}
}; };
template <> struct Family<Gauge>::State { template <> struct Family<Gauge>::State {
std::string name; std::string_view name;
std::string help; std::string_view help;
std::unordered_map<LabelsKey, std::unique_ptr<Gauge::State>> instances; std::unordered_map<
LabelsKey, Gauge::State *, std::hash<LabelsKey>, std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Gauge::State *>>>
instances;
// Callback-based metrics // Callback-based metrics
std::unordered_map<LabelsKey, MetricCallback<Gauge>> callbacks; std::unordered_map<
LabelsKey, MetricCallback<Gauge>, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, MetricCallback<Gauge>>>>
callbacks;
State(ArenaAllocator &arena)
: instances(ArenaStlAllocator<std::pair<const LabelsKey, Gauge::State *>>(
&arena)),
callbacks(ArenaStlAllocator<
std::pair<const LabelsKey, MetricCallback<Gauge>>>(&arena)) {}
}; };
template <> struct Family<Histogram>::State { template <> struct Family<Histogram>::State {
std::string name; std::string_view name;
std::string help; std::string_view help;
std::vector<double> buckets; ArenaVector<double> buckets;
struct PerThreadState { struct PerThreadState {
std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>> instances; std::unordered_map<LabelsKey, Histogram::State *> instances;
}; };
std::unordered_map<std::thread::id, PerThreadState> per_thread_state; std::unordered_map<std::thread::id, PerThreadState> per_thread_state;
// Global accumulation state for destroyed threads // Global accumulation state for destroyed threads
std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>> std::unordered_map<
LabelsKey, Histogram::State *, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>>
global_accumulated_values; global_accumulated_values;
State(ArenaAllocator &arena)
: buckets(&arena),
global_accumulated_values(
ArenaStlAllocator<std::pair<const LabelsKey, Histogram::State *>>(
&arena)) {}
// Note: No callbacks map - histograms don't support callback-based metrics // Note: No callbacks map - histograms don't support callback-based metrics
}; };
@@ -178,46 +237,76 @@ struct Gauge::State {
// Histogram: Thread-local buckets, single writer, mutex protection per thread, // Histogram: Thread-local buckets, single writer, mutex protection per thread,
// per histogram // per histogram
struct Histogram::State { struct Histogram::State {
std::vector<double> thresholds; // Bucket boundaries (sorted, deduplicated, ArenaVector<double> thresholds; // Bucket boundaries (sorted, deduplicated,
// sizes never change) // sizes never change)
std::vector<uint64_t> counts; // Count per bucket ArenaVector<uint64_t> counts; // Count per bucket
double sum; // Sum of observations double sum; // Sum of observations
uint64_t observations; // Total observation count uint64_t observations; // Total observation count
std::mutex std::mutex
mutex; // Per-thread, per-histogram mutex for consistent reads/writes mutex; // Per-thread, per-histogram mutex for consistent reads/writes
State(ArenaAllocator &arena)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
friend struct Metric; friend struct Metric;
}; };
struct Metric { struct Metric {
// We use a raw pointer to these in a map, so we don't call their destructors
static_assert(std::is_trivially_destructible_v<Counter::State>);
static_assert(std::is_trivially_destructible_v<Gauge::State>);
static_assert(std::is_trivially_destructible_v<Histogram::State>);
static std::mutex mutex; static std::mutex mutex;
// Global arena allocator for metric families and persistent global state
static ArenaAllocator &get_global_arena() {
static ArenaAllocator global_arena(64 * 1024); // 64KB initial size
return global_arena;
}
// Function-local statics to avoid static initialization order fiasco // Function-local statics to avoid static initialization order fiasco
static auto &get_counter_families() { static auto &get_counter_families() {
static std::unordered_map<std::string, using FamilyMap = std::unordered_map<
std::unique_ptr<Family<Counter>::State>> std::string_view, Family<Counter>::State *, std::hash<std::string_view>,
counterFamilies; std::equal_to<std::string_view>,
return counterFamilies; ArenaStlAllocator<
std::pair<const std::string_view, Family<Counter>::State *>>>;
static FamilyMap *counterFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Counter>::State *>>(
&get_global_arena()));
return *counterFamilies;
} }
static auto &get_gauge_families() { static auto &get_gauge_families() {
static std::unordered_map<std::string, using FamilyMap = std::unordered_map<
std::unique_ptr<Family<Gauge>::State>> std::string_view, Family<Gauge>::State *, std::hash<std::string_view>,
gaugeFamilies; std::equal_to<std::string_view>,
return gaugeFamilies; ArenaStlAllocator<
std::pair<const std::string_view, Family<Gauge>::State *>>>;
static FamilyMap *gaugeFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Gauge>::State *>>(
&get_global_arena()));
return *gaugeFamilies;
} }
static auto &get_histogram_families() { static auto &get_histogram_families() {
static std::unordered_map<std::string, using FamilyMap = std::unordered_map<
std::unique_ptr<Family<Histogram>::State>> std::string_view, Family<Histogram>::State *,
histogramFamilies; std::hash<std::string_view>, std::equal_to<std::string_view>,
return histogramFamilies; ArenaStlAllocator<
std::pair<const std::string_view, Family<Histogram>::State *>>>;
static FamilyMap *histogramFamilies = new FamilyMap(
ArenaStlAllocator<
std::pair<const std::string_view, Family<Histogram>::State *>>(
&get_global_arena()));
return *histogramFamilies;
} }
// Thread cleanup for per-family thread-local storage // Thread cleanup for per-family thread-local storage
struct ThreadInit { struct ThreadInit {
ThreadInit() { ArenaAllocator arena;
// Thread registration happens lazily when metrics are created ThreadInit() {}
}
~ThreadInit() { ~ThreadInit() {
// Accumulate thread-local state into global state before cleanup // Accumulate thread-local state into global state before cleanup
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
@@ -234,7 +323,7 @@ struct Metric {
// Ensure global accumulator exists // Ensure global accumulator exists
auto &global_state = family->global_accumulated_values[labels_key]; auto &global_state = family->global_accumulated_values[labels_key];
if (!global_state) { if (!global_state) {
global_state = std::make_unique<Counter::State>(); global_state = get_global_arena().construct<Counter::State>();
global_state->value = 0.0; global_state->value = 0.0;
} }
@@ -256,12 +345,16 @@ struct Metric {
// Ensure global accumulator exists // Ensure global accumulator exists
auto &global_state = family->global_accumulated_values[labels_key]; auto &global_state = family->global_accumulated_values[labels_key];
if (!global_state) { if (!global_state) {
global_state = std::make_unique<Histogram::State>(); global_state = get_global_arena().construct<Histogram::State>(
global_state->thresholds = instance->thresholds; get_global_arena());
global_state->counts = // Copy thresholds from instance
std::vector<uint64_t>(instance->counts.size(), 0); for (size_t i = 0; i < instance->thresholds.size(); ++i) {
global_state->sum = 0.0; global_state->thresholds.push_back(instance->thresholds[i]);
global_state->observations = 0; }
// Initialize counts with zeros
for (size_t i = 0; i < instance->counts.size(); ++i) {
global_state->counts.push_back(0);
}
} }
// Accumulate bucket counts (mutex already held) // Accumulate bucket counts (mutex already held)
@@ -282,6 +375,9 @@ struct Metric {
}; };
static thread_local ThreadInit thread_init; static thread_local ThreadInit thread_init;
// Thread-local arena allocator for metric instances
static ArenaAllocator &get_thread_local_arena() { return thread_init.arena; }
// Thread cleanup now handled by ThreadInit RAII // Thread cleanup now handled by ThreadInit RAII
static Counter create_counter_instance( static Counter create_counter_instance(
@@ -291,29 +387,33 @@ struct Metric {
(void)thread_init; (void)thread_init;
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
LabelsKey key{labels}; LabelsKey key{labels, get_thread_local_arena()};
// Validate that labels aren't already registered as callback // Validate that labels aren't already registered as callback
validate_or_abort( validate_or_abort(
family->p->callbacks.find(key) == family->p->callbacks.end(), family->p->callbacks.find(key) == family->p->callbacks.end(),
"labels already registered as callback", "labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); key.labels.empty() ? "(no labels)"
: std::string(key.labels[0].first).c_str());
auto &ptr = // Ensure thread state exists
family->p->per_thread_state[std::this_thread::get_id()].instances[key]; auto thread_id = std::this_thread::get_id();
// Thread state is automatically created by operator[]
auto &ptr = family->p->per_thread_state[thread_id].instances[key];
if (!ptr) { if (!ptr) {
ptr = std::make_unique<Counter::State>(); ptr = get_thread_local_arena().construct<Counter::State>();
ptr->value = 0.0; ptr->value = 0.0;
// Ensure global accumulator exists for this label set // Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key]; auto &global_state = family->p->global_accumulated_values[key];
if (!global_state) { if (!global_state) {
global_state = std::make_unique<Counter::State>(); global_state = get_global_arena().construct<Counter::State>();
global_state->value = 0.0; global_state->value = 0.0;
} }
} }
Counter result; Counter result;
result.p = ptr.get(); result.p = ptr;
return result; return result;
} }
@@ -321,21 +421,22 @@ struct Metric {
Family<Gauge> *family, Family<Gauge> *family,
const std::vector<std::pair<std::string, std::string>> &labels) { const std::vector<std::pair<std::string, std::string>> &labels) {
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
LabelsKey key{labels}; LabelsKey key{labels, get_global_arena()};
// Validate that labels aren't already registered as callback // Validate that labels aren't already registered as callback
validate_or_abort( validate_or_abort(
family->p->callbacks.find(key) == family->p->callbacks.end(), family->p->callbacks.find(key) == family->p->callbacks.end(),
"labels already registered as callback", "labels already registered as callback",
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); key.labels.empty() ? "(no labels)"
: std::string(key.labels[0].first).c_str());
auto &ptr = family->p->instances[key]; auto &ptr = family->p->instances[key];
if (!ptr) { if (!ptr) {
ptr = std::make_unique<Gauge::State>(); ptr = get_global_arena().construct<Gauge::State>();
ptr->value.store(0, std::memory_order_relaxed); ptr->value.store(0, std::memory_order_relaxed);
} }
Gauge result; Gauge result;
result.p = ptr.get(); result.p = ptr;
return result; return result;
} }
@@ -346,32 +447,45 @@ struct Metric {
(void)thread_init; (void)thread_init;
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
LabelsKey key{labels}; LabelsKey key{labels, get_thread_local_arena()};
auto &ptr =
family->p->per_thread_state[std::this_thread::get_id()].instances[key]; // Ensure thread state exists
auto thread_id = std::this_thread::get_id();
// Thread state is automatically created by operator[]
auto &ptr = family->p->per_thread_state[thread_id].instances[key];
if (!ptr) { if (!ptr) {
ptr = std::make_unique<Histogram::State>(); ptr = get_thread_local_arena().construct<Histogram::State>(
get_thread_local_arena());
// DESIGN: Prometheus-compatible histogram buckets // DESIGN: Prometheus-compatible histogram buckets
// Use buckets from family configuration // Use buckets from family configuration
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated for (size_t i = 0; i < family->p->buckets.size(); ++i) {
ptr->thresholds.push_back(family->p->buckets[i]);
}
// Initialize with zero values, mutex protects all operations // Initialize with zero values, mutex protects all operations
ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0); for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
ptr->sum = 0.0; ptr->counts.push_back(0);
ptr->observations = 0; }
// Ensure global accumulator exists for this label set // Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key]; auto &global_state = family->p->global_accumulated_values[key];
if (!global_state) { if (!global_state) {
global_state = std::make_unique<Histogram::State>(); global_state =
global_state->thresholds = ptr->thresholds; get_global_arena().construct<Histogram::State>(get_global_arena());
global_state->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0); // Copy thresholds
global_state->sum = 0.0; for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
global_state->observations = 0; global_state->thresholds.push_back(ptr->thresholds[i]);
}
// Initialize counts with zeros
for (size_t i = 0; i < ptr->thresholds.size(); ++i) {
global_state->counts.push_back(0);
}
} }
} }
Histogram result; Histogram result;
result.p = ptr.get(); result.p = ptr;
return result; return result;
} }
}; };
@@ -439,8 +553,8 @@ Histogram::Histogram() = default;
// AVX-optimized implementation for high performance // AVX-optimized implementation for high performance
__attribute__((target("avx"))) static void __attribute__((target("avx"))) static void
update_histogram_buckets_simd(const std::vector<double> &thresholds, update_histogram_buckets_simd(const ArenaVector<double> &thresholds,
std::vector<uint64_t> &counts, double x, ArenaVector<uint64_t> &counts, double x,
size_t start_idx) { size_t start_idx) {
const size_t size = thresholds.size(); const size_t size = thresholds.size();
size_t i = start_idx; size_t i = start_idx;
@@ -512,11 +626,17 @@ Family<Counter> create_counter(std::string name, std::string help) {
name.c_str()); name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::get_counter_families()[name]; auto &global_arena = Metric::get_global_arena();
auto name_view = arena_copy_string(name, global_arena);
auto &familyPtr = Metric::get_counter_families()[name_view];
if (!familyPtr) { if (!familyPtr) {
familyPtr = std::make_unique<Family<Counter>::State>(); // NOTE: Family<T>::State instances are never destroyed - this is fine
familyPtr->name = std::move(name); // because the number of metric families is bounded by application design
familyPtr->help = std::move(help); familyPtr = new (global_arena.allocate_raw(sizeof(Family<Counter>::State),
alignof(Family<Counter>::State)))
Family<Counter>::State(global_arena);
familyPtr->name = name_view;
familyPtr->help = arena_copy_string(help, global_arena);
} else { } else {
validate_or_abort( validate_or_abort(
familyPtr->help == help, familyPtr->help == help,
@@ -524,7 +644,7 @@ Family<Counter> create_counter(std::string name, std::string help) {
name.c_str()); name.c_str());
} }
Family<Counter> family; Family<Counter> family;
family.p = familyPtr.get(); family.p = familyPtr;
return family; return family;
} }
@@ -533,11 +653,17 @@ Family<Gauge> create_gauge(std::string name, std::string help) {
name.c_str()); name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::get_gauge_families()[name]; auto &global_arena = Metric::get_global_arena();
auto name_view = arena_copy_string(name, global_arena);
auto &familyPtr = Metric::get_gauge_families()[name_view];
if (!familyPtr) { if (!familyPtr) {
familyPtr = std::make_unique<Family<Gauge>::State>(); // NOTE: Family<T>::State instances are never destroyed - this is fine
familyPtr->name = std::move(name); // because the number of metric families is bounded by application design
familyPtr->help = std::move(help); familyPtr = new (global_arena.allocate_raw(sizeof(Family<Gauge>::State),
alignof(Family<Gauge>::State)))
Family<Gauge>::State(global_arena);
familyPtr->name = name_view;
familyPtr->help = arena_copy_string(help, global_arena);
} else { } else {
validate_or_abort( validate_or_abort(
familyPtr->help == help, familyPtr->help == help,
@@ -545,7 +671,7 @@ Family<Gauge> create_gauge(std::string name, std::string help) {
name.c_str()); name.c_str());
} }
Family<Gauge> family; Family<Gauge> family;
family.p = familyPtr.get(); family.p = familyPtr;
return family; return family;
} }
@@ -555,23 +681,34 @@ Family<Histogram> create_histogram(std::string name, std::string help,
name.c_str()); name.c_str());
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
auto &familyPtr = Metric::get_histogram_families()[name]; auto &global_arena = Metric::get_global_arena();
if (!familyPtr) { auto name_view = arena_copy_string(name, global_arena);
familyPtr = std::make_unique<Family<Histogram>::State>(); auto &family_ptr = Metric::get_histogram_families()[name_view];
familyPtr->name = std::move(name); if (!family_ptr) {
familyPtr->help = std::move(help); // NOTE: Family<T>::State instances are never destroyed - this is fine
// because the number of metric families is bounded by application design
family_ptr = new (global_arena.allocate_raw(
sizeof(Family<Histogram>::State), alignof(Family<Histogram>::State)))
Family<Histogram>::State(global_arena);
family_ptr->name = name_view;
family_ptr->help = arena_copy_string(help, global_arena);
// DESIGN: Prometheus-compatible histogram buckets // DESIGN: Prometheus-compatible histogram buckets
familyPtr->buckets = std::vector<double>(buckets.begin(), buckets.end()); // Convert to vector for sorting
std::sort(familyPtr->buckets.begin(), familyPtr->buckets.end()); std::vector<double> temp_buckets(buckets.begin(), buckets.end());
familyPtr->buckets.erase( std::sort(temp_buckets.begin(), temp_buckets.end());
std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()), temp_buckets.erase(std::unique(temp_buckets.begin(), temp_buckets.end()),
familyPtr->buckets.end()); temp_buckets.end());
// Copy sorted buckets to arena vector
for (double bucket : temp_buckets) {
family_ptr->buckets.push_back(bucket);
}
// Note: +Inf bucket is not stored explicitly - we use total observations // Note: +Inf bucket is not stored explicitly - we use total observations
// count // count
} else { } else {
validate_or_abort( validate_or_abort(
familyPtr->help == help, family_ptr->help == help,
"metric family already registered with different help text", "metric family already registered with different help text",
name.c_str()); name.c_str());
std::vector<double> new_buckets_vec(buckets.begin(), buckets.end()); std::vector<double> new_buckets_vec(buckets.begin(), buckets.end());
@@ -581,12 +718,23 @@ Family<Histogram> create_histogram(std::string name, std::string help,
new_buckets_vec.end()); new_buckets_vec.end());
// Note: +Inf bucket is not stored explicitly - we use total observations // Note: +Inf bucket is not stored explicitly - we use total observations
// count // count
validate_or_abort(familyPtr->buckets == new_buckets_vec,
// Compare with existing buckets
bool buckets_match = (family_ptr->buckets.size() == new_buckets_vec.size());
if (buckets_match) {
for (size_t i = 0; i < family_ptr->buckets.size(); ++i) {
if (family_ptr->buckets[i] != new_buckets_vec[i]) {
buckets_match = false;
break;
}
}
}
validate_or_abort(buckets_match,
"metric family already registered with different buckets", "metric family already registered with different buckets",
name.c_str()); name.c_str());
} }
Family<Histogram> family; Family<Histogram> family;
family.p = familyPtr.get(); family.p = family_ptr;
return family; return family;
} }
@@ -687,10 +835,10 @@ bool is_valid_label_value(const std::string &value) {
std::span<std::string_view> render(ArenaAllocator &arena) { std::span<std::string_view> render(ArenaAllocator &arena) {
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
std::vector<std::string_view> output; ArenaVector<std::string_view> output(&arena);
auto format_labels = auto format_labels =
[&](const std::vector<std::pair<std::string_view, std::string_view>> [&](const ArenaVector<std::pair<std::string_view, std::string_view>>
&labels) -> std::string_view { &labels) -> std::string_view {
if (labels.empty()) { if (labels.empty()) {
return ""; return "";
@@ -747,16 +895,21 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
// Render counters // Render counters
for (const auto &[name, family] : Metric::get_counter_families()) { for (const auto &[name, family] : Metric::get_counter_families()) {
output.push_back( output.push_back(format(arena, "# HELP %.*s %.*s\n",
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); static_cast<int>(name.length()), name.data(),
output.push_back(format(arena, "# TYPE %s counter\n", name.c_str())); static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s counter\n",
static_cast<int>(name.length()), name.data()));
std::vector<std::pair<std::string_view, std::string_view>> labels_sv; ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
&arena);
for (const auto &[labels_key, callback] : family->callbacks) { for (const auto &[labels_key, callback] : family->callbacks) {
auto value = callback(); auto value = callback();
labels_sv.clear(); labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(l); labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv); auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n", output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -765,7 +918,11 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
} }
// Aggregate all counter values (thread-local + global accumulated) // Aggregate all counter values (thread-local + global accumulated)
std::unordered_map<LabelsKey, double> aggregated_values; std::unordered_map<LabelsKey, double, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
aggregated_values{
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
// First, add thread-local values // First, add thread-local values
for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[thread_id, per_thread] : family->per_thread_state) {
@@ -788,8 +945,9 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
// Render aggregated counter values // Render aggregated counter values
for (const auto &[labels_key, total_value] : aggregated_values) { for (const auto &[labels_key, total_value] : aggregated_values) {
labels_sv.clear(); labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(l); labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv); auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n", output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -800,16 +958,21 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
// Render gauges // Render gauges
for (const auto &[name, family] : Metric::get_gauge_families()) { for (const auto &[name, family] : Metric::get_gauge_families()) {
output.push_back( output.push_back(format(arena, "# HELP %.*s %.*s\n",
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); static_cast<int>(name.length()), name.data(),
output.push_back(format(arena, "# TYPE %s gauge\n", name.c_str())); static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s gauge\n",
static_cast<int>(name.length()), name.data()));
std::vector<std::pair<std::string_view, std::string_view>> labels_sv; ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
&arena);
for (const auto &[labels_key, callback] : family->callbacks) { for (const auto &[labels_key, callback] : family->callbacks) {
auto value = callback(); auto value = callback();
labels_sv.clear(); labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(l); labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv); auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n", output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -821,8 +984,9 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
auto value = std::bit_cast<double>( auto value = std::bit_cast<double>(
instance->value.load(std::memory_order_relaxed)); instance->value.load(std::memory_order_relaxed));
labels_sv.clear(); labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t i = 0; i < labels_key.labels.size(); ++i) {
labels_sv.push_back(l); labels_sv.push_back(labels_key.labels[i]);
}
auto labels = format_labels(labels_sv); auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n", output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -833,60 +997,85 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
// Render histograms // Render histograms
for (const auto &[name, family] : Metric::get_histogram_families()) { for (const auto &[name, family] : Metric::get_histogram_families()) {
output.push_back( output.push_back(format(arena, "# HELP %.*s %.*s\n",
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); static_cast<int>(name.length()), name.data(),
output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str())); static_cast<int>(family->help.length()),
family->help.data()));
output.push_back(format(arena, "# TYPE %.*s histogram\n",
static_cast<int>(name.length()), name.data()));
// Aggregate all histogram values (thread-local + global accumulated) // Aggregate all histogram values (thread-local + global accumulated)
std::unordered_map<LabelsKey, // Use a simpler structure to avoid tuple constructor issues
std::tuple<std::vector<double>, std::vector<uint64_t>, struct AggregatedHistogram {
double, uint64_t>> ArenaVector<double> thresholds;
aggregated_histograms; ArenaVector<uint64_t> counts;
double sum;
uint64_t observations;
std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv; AggregatedHistogram(ArenaAllocator &arena)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
};
std::unordered_map<
LabelsKey, AggregatedHistogram *, std::hash<LabelsKey>,
std::equal_to<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, AggregatedHistogram *>>>
aggregated_histograms{ArenaStlAllocator<
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv(
&arena);
// First, collect thread-local histogram data // First, collect thread-local histogram data
for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) { for (const auto &[labels_key, instance] : per_thread.instances) {
// Extract data under lock - minimize critical section // Extract data under lock - minimize critical section
// Pre-allocate vectors to avoid malloc inside critical section
// Note: thresholds and counts sizes never change after histogram // Note: thresholds and counts sizes never change after histogram
// creation // creation
std::vector<double> thresholds_snapshot; ArenaVector<double> thresholds_snapshot(&arena);
std::vector<uint64_t> counts_snapshot; ArenaVector<uint64_t> counts_snapshot(&arena);
double sum_snapshot; double sum_snapshot;
uint64_t observations_snapshot; uint64_t observations_snapshot;
// Pre-allocate outside critical section using immutable sizes
thresholds_snapshot.resize(instance->thresholds.size());
counts_snapshot.resize(instance->counts.size());
// Copy data with minimal critical section // Copy data with minimal critical section
{ {
std::lock_guard<std::mutex> lock(instance->mutex); std::lock_guard<std::mutex> lock(instance->mutex);
std::memcpy(thresholds_snapshot.data(), instance->thresholds.data(), // Copy thresholds
instance->thresholds.size() * sizeof(double)); for (size_t i = 0; i < instance->thresholds.size(); ++i) {
std::memcpy(counts_snapshot.data(), instance->counts.data(), thresholds_snapshot.push_back(instance->thresholds[i]);
instance->counts.size() * sizeof(uint64_t)); }
// Copy counts
for (size_t i = 0; i < instance->counts.size(); ++i) {
counts_snapshot.push_back(instance->counts[i]);
}
sum_snapshot = instance->sum; sum_snapshot = instance->sum;
observations_snapshot = instance->observations; observations_snapshot = instance->observations;
} }
// Initialize or aggregate into aggregated_histograms // Initialize or aggregate into aggregated_histograms
auto &[thresholds, counts, sum, observations] = auto it = aggregated_histograms.find(labels_key);
aggregated_histograms[labels_key]; if (it == aggregated_histograms.end()) {
if (thresholds.empty()) { // Create new entry
thresholds = thresholds_snapshot; auto *agg_hist = new (arena.allocate_raw(
counts = counts_snapshot; sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
sum = sum_snapshot; AggregatedHistogram(arena);
observations = observations_snapshot; for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
agg_hist->thresholds.push_back(thresholds_snapshot[i]);
}
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
agg_hist->counts.push_back(counts_snapshot[i]);
}
agg_hist->sum = sum_snapshot;
agg_hist->observations = observations_snapshot;
aggregated_histograms[labels_key] = agg_hist;
} else { } else {
// Aggregate with existing entry
auto *agg_hist = it->second;
// Aggregate counts // Aggregate counts
for (size_t i = 0; i < counts_snapshot.size(); ++i) { for (size_t i = 0; i < counts_snapshot.size(); ++i) {
counts[i] += counts_snapshot[i]; agg_hist->counts[i] += counts_snapshot[i];
} }
sum += sum_snapshot; agg_hist->sum += sum_snapshot;
observations += observations_snapshot; agg_hist->observations += observations_snapshot;
} }
} }
} }
@@ -895,74 +1084,85 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
for (const auto &[labels_key, global_state] : for (const auto &[labels_key, global_state] :
family->global_accumulated_values) { family->global_accumulated_values) {
if (global_state) { if (global_state) {
auto &[thresholds, counts, sum, observations] = auto it = aggregated_histograms.find(labels_key);
aggregated_histograms[labels_key]; if (it == aggregated_histograms.end()) {
if (thresholds.empty()) { // Create new entry from global state
thresholds = global_state->thresholds; auto *agg_hist = new (arena.allocate_raw(
counts = global_state->counts; sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
sum = global_state->sum; AggregatedHistogram(arena);
observations = global_state->observations; for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
} else { agg_hist->thresholds.push_back(global_state->thresholds[i]);
// Add global accumulated values
for (size_t i = 0; i < global_state->counts.size(); ++i) {
counts[i] += global_state->counts[i];
} }
sum += global_state->sum; for (size_t i = 0; i < global_state->counts.size(); ++i) {
observations += global_state->observations; agg_hist->counts.push_back(global_state->counts[i]);
}
agg_hist->sum = global_state->sum;
agg_hist->observations = global_state->observations;
aggregated_histograms[labels_key] = agg_hist;
} else {
// Add global accumulated values to existing entry
auto *agg_hist = it->second;
for (size_t i = 0; i < global_state->counts.size(); ++i) {
agg_hist->counts[i] += global_state->counts[i];
}
agg_hist->sum += global_state->sum;
agg_hist->observations += global_state->observations;
} }
} }
} }
// Render aggregated histogram data // Render aggregated histogram data
for (const auto &[labels_key, histogram_data] : aggregated_histograms) { for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
const auto &[thresholds_snapshot, counts_snapshot, sum_snapshot,
observations_snapshot] = histogram_data;
// Render explicit bucket counts // Render explicit bucket counts
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) {
bucket_labels_sv.clear(); bucket_labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(l); bucket_labels_sv.push_back(labels_key.labels[j]);
}
bucket_labels_sv.push_back( bucket_labels_sv.push_back(
{"le", static_format(arena, thresholds_snapshot[i])}); {"le", static_format(arena, agg_hist->thresholds[i])});
auto labels = format_labels(bucket_labels_sv); auto labels = format_labels(bucket_labels_sv);
output.push_back( output.push_back(format(
format(arena, "%s_bucket%.*s %llu\n", name.c_str(), arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
static_cast<int>(labels.length()), labels.data(), name.data(), static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(counts_snapshot[i]))); static_cast<unsigned long long>(agg_hist->counts[i])));
} }
// Render +Inf bucket using total observations count // Render +Inf bucket using total observations count
bucket_labels_sv.clear(); bucket_labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(l); bucket_labels_sv.push_back(labels_key.labels[j]);
}
bucket_labels_sv.push_back({"le", "+Inf"}); bucket_labels_sv.push_back({"le", "+Inf"});
auto inf_labels = format_labels(bucket_labels_sv); auto inf_labels = format_labels(bucket_labels_sv);
output.push_back( output.push_back(format(
format(arena, "%s_bucket%.*s %llu\n", name.c_str(), arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
static_cast<int>(inf_labels.length()), inf_labels.data(), name.data(), static_cast<int>(inf_labels.length()), inf_labels.data(),
static_cast<unsigned long long>(observations_snapshot))); static_cast<unsigned long long>(agg_hist->observations)));
// Render sum // Render sum
bucket_labels_sv.clear(); bucket_labels_sv.clear();
for (const auto &l : labels_key.labels) for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(l); bucket_labels_sv.push_back(labels_key.labels[j]);
}
auto labels = format_labels(bucket_labels_sv); auto labels = format_labels(bucket_labels_sv);
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(), output.push_back(format(arena, "%.*s_sum%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(), static_cast<int>(labels.length()), labels.data(),
sum_snapshot)); agg_hist->sum));
// Render count // Render count
output.push_back( output.push_back(format(
format(arena, "%s_count%.*s %llu\n", name.c_str(), arena, "%.*s_count%.*s %llu\n", static_cast<int>(name.length()),
static_cast<int>(labels.length()), labels.data(), name.data(), static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(observations_snapshot))); static_cast<unsigned long long>(agg_hist->observations)));
} }
} }
auto result = arena.allocate<std::string_view>(output.size()); auto result = arena.allocate<std::string_view>(output.size());
std::copy(output.begin(), output.end(), result); std::copy(output.data(), output.data() + output.size(), result);
return std::span<std::string_view>(result, output.size()); return std::span<std::string_view>(result, output.size());
} }
@@ -972,21 +1172,23 @@ void Family<Counter>::register_callback(
std::vector<std::pair<std::string, std::string>> labels, std::vector<std::pair<std::string, std::string>> labels,
MetricCallback<Counter> callback) { MetricCallback<Counter> callback) {
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
LabelsKey key{std::move(labels)}; LabelsKey key{labels, Metric::get_global_arena()};
// Validate that labels aren't already in use by create() calls // Validate that labels aren't already in use by create() calls
for (const auto &[thread_id, per_thread] : p->per_thread_state) { for (const auto &[thread_id, per_thread] : p->per_thread_state) {
validate_or_abort( validate_or_abort(
per_thread.instances.find(key) == per_thread.instances.end(), per_thread.instances.find(key) == per_thread.instances.end(),
"labels already registered as static instance", "labels already registered as static instance",
key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); key.labels.empty() ? "(no labels)"
: std::string(key.labels[0].first).c_str());
} }
// Validate that callback isn't already registered for these labels // Validate that callback isn't already registered for these labels
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(), validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
"callback already registered for labels", "callback already registered for labels",
key.labels.empty() ? "(no labels)" key.labels.empty()
: key.labels[0].first.c_str()); ? "(no labels)"
: std::string(key.labels[0].first).c_str());
p->callbacks[std::move(key)] = std::move(callback); p->callbacks[std::move(key)] = std::move(callback);
} }
@@ -996,19 +1198,21 @@ void Family<Gauge>::register_callback(
std::vector<std::pair<std::string, std::string>> labels, std::vector<std::pair<std::string, std::string>> labels,
MetricCallback<Gauge> callback) { MetricCallback<Gauge> callback) {
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
LabelsKey key{std::move(labels)}; LabelsKey key{labels, Metric::get_global_arena()};
// Validate that labels aren't already in use by create() calls // Validate that labels aren't already in use by create() calls
validate_or_abort(p->instances.find(key) == p->instances.end(), validate_or_abort(p->instances.find(key) == p->instances.end(),
"labels already registered as static instance", "labels already registered as static instance",
key.labels.empty() ? "(no labels)" key.labels.empty()
: key.labels[0].first.c_str()); ? "(no labels)"
: std::string(key.labels[0].first).c_str());
// Validate that callback isn't already registered for these labels // Validate that callback isn't already registered for these labels
validate_or_abort(p->callbacks.find(key) == p->callbacks.end(), validate_or_abort(p->callbacks.find(key) == p->callbacks.end(),
"callback already registered for labels", "callback already registered for labels",
key.labels.empty() ? "(no labels)" key.labels.empty()
: key.labels[0].first.c_str()); ? "(no labels)"
: std::string(key.labels[0].first).c_str());
p->callbacks[std::move(key)] = std::move(callback); p->callbacks[std::move(key)] = std::move(callback);
} }

View File

@@ -36,7 +36,6 @@
// histogram.observe(0.25); // ONLY call from creating thread // histogram.observe(0.25); // ONLY call from creating thread
#include <functional> #include <functional>
#include <initializer_list>
#include <span> #include <span>
#include <string> #include <string>
#include <type_traits> #include <type_traits>
@@ -197,4 +196,4 @@ bool is_valid_label_value(const std::string &value);
// Note: Histograms do not support callbacks due to their multi-value nature // Note: Histograms do not support callbacks due to their multi-value nature
// (buckets + sum + count). Use static histogram metrics only. // (buckets + sum + count). Use static histogram metrics only.
} // namespace metric } // namespace metric