diff --git a/src/arena_allocator.hpp b/src/arena_allocator.hpp index fec2507..c21e603 100644 --- a/src/arena_allocator.hpp +++ b/src/arena_allocator.hpp @@ -637,6 +637,14 @@ template struct ArenaVector { T &operator[](size_t index) { return data_[index]; } const T &operator[](size_t index) const { return data_[index]; } + void clear() { size_ = 0; } + + // Iterator support for range-based for loops + T *begin() { return data_; } + const T *begin() const { return data_; } + T *end() { return data_ + size_; } + const T *end() const { return data_ + size_; } + // No destructor - arena cleanup handles memory private: diff --git a/src/metric.cpp b/src/metric.cpp index 91cb853..b490cb8 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -11,24 +11,19 @@ #include #include #include -#include #include #include #include +#include #include #include #include #include +#include "arena_allocator.hpp" #include "format.hpp" -// Verify that malloc provides sufficient alignment for atomic 128-bit -// operations -static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16, - "Default new alignment must be at least 16 bytes for atomic " - "128-bit stores"); - // WeaselDB Metrics System Design: // // THREADING MODEL: @@ -69,27 +64,53 @@ static void validate_or_abort(bool condition, const char *message, } } -// Labels key for second level of map -struct LabelsKey { - std::vector> labels; +// Helper to copy a string into arena memory +static std::string_view arena_copy_string(const std::string &str, + ArenaAllocator &arena) { + if (str.empty()) { + return std::string_view{}; + } + char *copied = arena.allocate(str.size() + 1); + std::memcpy(copied, str.c_str(), str.size()); + copied[str.size()] = '\0'; + return std::string_view(copied, str.size()); +} - LabelsKey(std::vector> l) - : labels(std::move(l)) { - // Validate all label keys and values - for (const auto &[key, value] : labels) { +// Arena-based labels key for second level of map +// Uses string_view to point to arena-allocated strings +struct LabelsKey { + ArenaVector> labels; + + LabelsKey(const std::vector> &l, + ArenaAllocator &arena) + : labels(&arena) { + // Copy and validate all label keys and values into arena + for (const auto &[key, value] : l) { validate_or_abort(is_valid_label_key(key), "invalid label key", key.c_str()); validate_or_abort(is_valid_label_value(value), "invalid label value", value.c_str()); + + auto key_view = arena_copy_string(key, arena); + auto value_view = arena_copy_string(value, arena); + labels.push_back({key_view, value_view}); } // Sort labels by key for Prometheus compatibility - std::sort(labels.begin(), labels.end(), + std::sort(labels.data(), labels.data() + labels.size(), [](const auto &a, const auto &b) { return a.first < b.first; }); } bool operator==(const LabelsKey &other) const { - return labels == other.labels; + if (labels.size() != other.labels.size()) + return false; + for (size_t i = 0; i < labels.size(); ++i) { + if (labels[i].first != other.labels[i].first || + labels[i].second != other.labels[i].second) { + return false; + } + } + return true; } }; @@ -99,11 +120,12 @@ namespace std { template <> struct hash { std::size_t operator()(const metric::LabelsKey &k) const { std::size_t hash_value = 0; - for (const auto &[key, value] : k.labels) { + for (size_t i = 0; i < k.labels.size(); ++i) { + const auto &[key, value] = k.labels[i]; // Combine hashes using a simple but effective method - hash_value ^= std::hash{}(key) + 0x9e3779b9 + + hash_value ^= std::hash{}(key) + 0x9e3779b9 + (hash_value << 6) + (hash_value >> 2); - hash_value ^= std::hash{}(value) + 0x9e3779b9 + + hash_value ^= std::hash{}(value) + 0x9e3779b9 + (hash_value << 6) + (hash_value >> 2); } return hash_value; @@ -120,45 +142,82 @@ namespace metric { // Family::State structures own the second-level maps (labels -> instances) template <> struct Family::State { - std::string name; - std::string help; + std::string_view name; + std::string_view help; struct PerThreadState { - std::unordered_map> instances; + std::unordered_map instances; }; std::unordered_map per_thread_state; // Global accumulation state for destroyed threads - std::unordered_map> + std::unordered_map< + LabelsKey, Counter::State *, std::hash, + std::equal_to, + ArenaStlAllocator>> global_accumulated_values; // Callback-based metrics (global, not per-thread) - std::unordered_map> callbacks; + std::unordered_map< + LabelsKey, MetricCallback, std::hash, + std::equal_to, + ArenaStlAllocator>>> + callbacks; + + State(ArenaAllocator &arena) + : global_accumulated_values( + ArenaStlAllocator>( + &arena)), + callbacks( + ArenaStlAllocator< + std::pair>>(&arena)) {} }; template <> struct Family::State { - std::string name; - std::string help; - std::unordered_map> instances; + std::string_view name; + std::string_view help; + std::unordered_map< + LabelsKey, Gauge::State *, std::hash, std::equal_to, + ArenaStlAllocator>> + instances; // Callback-based metrics - std::unordered_map> callbacks; + std::unordered_map< + LabelsKey, MetricCallback, std::hash, + std::equal_to, + ArenaStlAllocator>>> + callbacks; + + State(ArenaAllocator &arena) + : instances(ArenaStlAllocator>( + &arena)), + callbacks(ArenaStlAllocator< + std::pair>>(&arena)) {} }; template <> struct Family::State { - std::string name; - std::string help; - std::vector buckets; + std::string_view name; + std::string_view help; + ArenaVector buckets; struct PerThreadState { - std::unordered_map> instances; + std::unordered_map instances; }; std::unordered_map per_thread_state; // Global accumulation state for destroyed threads - std::unordered_map> + std::unordered_map< + LabelsKey, Histogram::State *, std::hash, + std::equal_to, + ArenaStlAllocator>> global_accumulated_values; + State(ArenaAllocator &arena) + : buckets(&arena), + global_accumulated_values( + ArenaStlAllocator>( + &arena)) {} + // Note: No callbacks map - histograms don't support callback-based metrics }; @@ -178,46 +237,76 @@ struct Gauge::State { // Histogram: Thread-local buckets, single writer, mutex protection per thread, // per histogram struct Histogram::State { - std::vector thresholds; // Bucket boundaries (sorted, deduplicated, + ArenaVector thresholds; // Bucket boundaries (sorted, deduplicated, // sizes never change) - std::vector counts; // Count per bucket + ArenaVector counts; // Count per bucket double sum; // Sum of observations uint64_t observations; // Total observation count std::mutex mutex; // Per-thread, per-histogram mutex for consistent reads/writes + + State(ArenaAllocator &arena) + : thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} friend struct Metric; }; struct Metric { + // We use a raw pointer to these in a map, so we don't call their destructors + static_assert(std::is_trivially_destructible_v); + static_assert(std::is_trivially_destructible_v); + static_assert(std::is_trivially_destructible_v); static std::mutex mutex; + // Global arena allocator for metric families and persistent global state + static ArenaAllocator &get_global_arena() { + static ArenaAllocator global_arena(64 * 1024); // 64KB initial size + return global_arena; + } + // Function-local statics to avoid static initialization order fiasco static auto &get_counter_families() { - static std::unordered_map::State>> - counterFamilies; - return counterFamilies; + using FamilyMap = std::unordered_map< + std::string_view, Family::State *, std::hash, + std::equal_to, + ArenaStlAllocator< + std::pair::State *>>>; + static FamilyMap *counterFamilies = new FamilyMap( + ArenaStlAllocator< + std::pair::State *>>( + &get_global_arena())); + return *counterFamilies; } static auto &get_gauge_families() { - static std::unordered_map::State>> - gaugeFamilies; - return gaugeFamilies; + using FamilyMap = std::unordered_map< + std::string_view, Family::State *, std::hash, + std::equal_to, + ArenaStlAllocator< + std::pair::State *>>>; + static FamilyMap *gaugeFamilies = new FamilyMap( + ArenaStlAllocator< + std::pair::State *>>( + &get_global_arena())); + return *gaugeFamilies; } static auto &get_histogram_families() { - static std::unordered_map::State>> - histogramFamilies; - return histogramFamilies; + using FamilyMap = std::unordered_map< + std::string_view, Family::State *, + std::hash, std::equal_to, + ArenaStlAllocator< + std::pair::State *>>>; + static FamilyMap *histogramFamilies = new FamilyMap( + ArenaStlAllocator< + std::pair::State *>>( + &get_global_arena())); + return *histogramFamilies; } // Thread cleanup for per-family thread-local storage struct ThreadInit { - ThreadInit() { - // Thread registration happens lazily when metrics are created - } + ArenaAllocator arena; + ThreadInit() {} ~ThreadInit() { // Accumulate thread-local state into global state before cleanup std::unique_lock _{mutex}; @@ -234,7 +323,7 @@ struct Metric { // Ensure global accumulator exists auto &global_state = family->global_accumulated_values[labels_key]; if (!global_state) { - global_state = std::make_unique(); + global_state = get_global_arena().construct(); global_state->value = 0.0; } @@ -256,12 +345,16 @@ struct Metric { // Ensure global accumulator exists auto &global_state = family->global_accumulated_values[labels_key]; if (!global_state) { - global_state = std::make_unique(); - global_state->thresholds = instance->thresholds; - global_state->counts = - std::vector(instance->counts.size(), 0); - global_state->sum = 0.0; - global_state->observations = 0; + global_state = get_global_arena().construct( + get_global_arena()); + // Copy thresholds from instance + for (size_t i = 0; i < instance->thresholds.size(); ++i) { + global_state->thresholds.push_back(instance->thresholds[i]); + } + // Initialize counts with zeros + for (size_t i = 0; i < instance->counts.size(); ++i) { + global_state->counts.push_back(0); + } } // Accumulate bucket counts (mutex already held) @@ -282,6 +375,9 @@ struct Metric { }; static thread_local ThreadInit thread_init; + // Thread-local arena allocator for metric instances + static ArenaAllocator &get_thread_local_arena() { return thread_init.arena; } + // Thread cleanup now handled by ThreadInit RAII static Counter create_counter_instance( @@ -291,29 +387,33 @@ struct Metric { (void)thread_init; std::unique_lock _{mutex}; - LabelsKey key{labels}; + LabelsKey key{labels, get_thread_local_arena()}; // Validate that labels aren't already registered as callback validate_or_abort( family->p->callbacks.find(key) == family->p->callbacks.end(), "labels already registered as callback", - key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); + key.labels.empty() ? "(no labels)" + : std::string(key.labels[0].first).c_str()); - auto &ptr = - family->p->per_thread_state[std::this_thread::get_id()].instances[key]; + // Ensure thread state exists + auto thread_id = std::this_thread::get_id(); + // Thread state is automatically created by operator[] + + auto &ptr = family->p->per_thread_state[thread_id].instances[key]; if (!ptr) { - ptr = std::make_unique(); + ptr = get_thread_local_arena().construct(); ptr->value = 0.0; // Ensure global accumulator exists for this label set auto &global_state = family->p->global_accumulated_values[key]; if (!global_state) { - global_state = std::make_unique(); + global_state = get_global_arena().construct(); global_state->value = 0.0; } } Counter result; - result.p = ptr.get(); + result.p = ptr; return result; } @@ -321,21 +421,22 @@ struct Metric { Family *family, const std::vector> &labels) { std::unique_lock _{mutex}; - LabelsKey key{labels}; + LabelsKey key{labels, get_global_arena()}; // Validate that labels aren't already registered as callback validate_or_abort( family->p->callbacks.find(key) == family->p->callbacks.end(), "labels already registered as callback", - key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); + key.labels.empty() ? "(no labels)" + : std::string(key.labels[0].first).c_str()); auto &ptr = family->p->instances[key]; if (!ptr) { - ptr = std::make_unique(); + ptr = get_global_arena().construct(); ptr->value.store(0, std::memory_order_relaxed); } Gauge result; - result.p = ptr.get(); + result.p = ptr; return result; } @@ -346,32 +447,45 @@ struct Metric { (void)thread_init; std::unique_lock _{mutex}; - LabelsKey key{labels}; - auto &ptr = - family->p->per_thread_state[std::this_thread::get_id()].instances[key]; + LabelsKey key{labels, get_thread_local_arena()}; + + // Ensure thread state exists + auto thread_id = std::this_thread::get_id(); + // Thread state is automatically created by operator[] + + auto &ptr = family->p->per_thread_state[thread_id].instances[key]; if (!ptr) { - ptr = std::make_unique(); + ptr = get_thread_local_arena().construct( + get_thread_local_arena()); + // DESIGN: Prometheus-compatible histogram buckets // Use buckets from family configuration - ptr->thresholds = family->p->buckets; // Already sorted and deduplicated + for (size_t i = 0; i < family->p->buckets.size(); ++i) { + ptr->thresholds.push_back(family->p->buckets[i]); + } // Initialize with zero values, mutex protects all operations - ptr->counts = std::vector(ptr->thresholds.size(), 0); - ptr->sum = 0.0; - ptr->observations = 0; + for (size_t i = 0; i < ptr->thresholds.size(); ++i) { + ptr->counts.push_back(0); + } // Ensure global accumulator exists for this label set auto &global_state = family->p->global_accumulated_values[key]; if (!global_state) { - global_state = std::make_unique(); - global_state->thresholds = ptr->thresholds; - global_state->counts = std::vector(ptr->thresholds.size(), 0); - global_state->sum = 0.0; - global_state->observations = 0; + global_state = + get_global_arena().construct(get_global_arena()); + // Copy thresholds + for (size_t i = 0; i < ptr->thresholds.size(); ++i) { + global_state->thresholds.push_back(ptr->thresholds[i]); + } + // Initialize counts with zeros + for (size_t i = 0; i < ptr->thresholds.size(); ++i) { + global_state->counts.push_back(0); + } } } Histogram result; - result.p = ptr.get(); + result.p = ptr; return result; } }; @@ -439,8 +553,8 @@ Histogram::Histogram() = default; // AVX-optimized implementation for high performance __attribute__((target("avx"))) static void -update_histogram_buckets_simd(const std::vector &thresholds, - std::vector &counts, double x, +update_histogram_buckets_simd(const ArenaVector &thresholds, + ArenaVector &counts, double x, size_t start_idx) { const size_t size = thresholds.size(); size_t i = start_idx; @@ -512,11 +626,17 @@ Family create_counter(std::string name, std::string help) { name.c_str()); std::unique_lock _{Metric::mutex}; - auto &familyPtr = Metric::get_counter_families()[name]; + auto &global_arena = Metric::get_global_arena(); + auto name_view = arena_copy_string(name, global_arena); + auto &familyPtr = Metric::get_counter_families()[name_view]; if (!familyPtr) { - familyPtr = std::make_unique::State>(); - familyPtr->name = std::move(name); - familyPtr->help = std::move(help); + // NOTE: Family::State instances are never destroyed - this is fine + // because the number of metric families is bounded by application design + familyPtr = new (global_arena.allocate_raw(sizeof(Family::State), + alignof(Family::State))) + Family::State(global_arena); + familyPtr->name = name_view; + familyPtr->help = arena_copy_string(help, global_arena); } else { validate_or_abort( familyPtr->help == help, @@ -524,7 +644,7 @@ Family create_counter(std::string name, std::string help) { name.c_str()); } Family family; - family.p = familyPtr.get(); + family.p = familyPtr; return family; } @@ -533,11 +653,17 @@ Family create_gauge(std::string name, std::string help) { name.c_str()); std::unique_lock _{Metric::mutex}; - auto &familyPtr = Metric::get_gauge_families()[name]; + auto &global_arena = Metric::get_global_arena(); + auto name_view = arena_copy_string(name, global_arena); + auto &familyPtr = Metric::get_gauge_families()[name_view]; if (!familyPtr) { - familyPtr = std::make_unique::State>(); - familyPtr->name = std::move(name); - familyPtr->help = std::move(help); + // NOTE: Family::State instances are never destroyed - this is fine + // because the number of metric families is bounded by application design + familyPtr = new (global_arena.allocate_raw(sizeof(Family::State), + alignof(Family::State))) + Family::State(global_arena); + familyPtr->name = name_view; + familyPtr->help = arena_copy_string(help, global_arena); } else { validate_or_abort( familyPtr->help == help, @@ -545,7 +671,7 @@ Family create_gauge(std::string name, std::string help) { name.c_str()); } Family family; - family.p = familyPtr.get(); + family.p = familyPtr; return family; } @@ -555,23 +681,34 @@ Family create_histogram(std::string name, std::string help, name.c_str()); std::unique_lock _{Metric::mutex}; - auto &familyPtr = Metric::get_histogram_families()[name]; - if (!familyPtr) { - familyPtr = std::make_unique::State>(); - familyPtr->name = std::move(name); - familyPtr->help = std::move(help); + auto &global_arena = Metric::get_global_arena(); + auto name_view = arena_copy_string(name, global_arena); + auto &family_ptr = Metric::get_histogram_families()[name_view]; + if (!family_ptr) { + // NOTE: Family::State instances are never destroyed - this is fine + // because the number of metric families is bounded by application design + family_ptr = new (global_arena.allocate_raw( + sizeof(Family::State), alignof(Family::State))) + Family::State(global_arena); + family_ptr->name = name_view; + family_ptr->help = arena_copy_string(help, global_arena); // DESIGN: Prometheus-compatible histogram buckets - familyPtr->buckets = std::vector(buckets.begin(), buckets.end()); - std::sort(familyPtr->buckets.begin(), familyPtr->buckets.end()); - familyPtr->buckets.erase( - std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()), - familyPtr->buckets.end()); + // Convert to vector for sorting + std::vector temp_buckets(buckets.begin(), buckets.end()); + std::sort(temp_buckets.begin(), temp_buckets.end()); + temp_buckets.erase(std::unique(temp_buckets.begin(), temp_buckets.end()), + temp_buckets.end()); + + // Copy sorted buckets to arena vector + for (double bucket : temp_buckets) { + family_ptr->buckets.push_back(bucket); + } // Note: +Inf bucket is not stored explicitly - we use total observations // count } else { validate_or_abort( - familyPtr->help == help, + family_ptr->help == help, "metric family already registered with different help text", name.c_str()); std::vector new_buckets_vec(buckets.begin(), buckets.end()); @@ -581,12 +718,23 @@ Family create_histogram(std::string name, std::string help, new_buckets_vec.end()); // Note: +Inf bucket is not stored explicitly - we use total observations // count - validate_or_abort(familyPtr->buckets == new_buckets_vec, + + // Compare with existing buckets + bool buckets_match = (family_ptr->buckets.size() == new_buckets_vec.size()); + if (buckets_match) { + for (size_t i = 0; i < family_ptr->buckets.size(); ++i) { + if (family_ptr->buckets[i] != new_buckets_vec[i]) { + buckets_match = false; + break; + } + } + } + validate_or_abort(buckets_match, "metric family already registered with different buckets", name.c_str()); } Family family; - family.p = familyPtr.get(); + family.p = family_ptr; return family; } @@ -687,10 +835,10 @@ bool is_valid_label_value(const std::string &value) { std::span render(ArenaAllocator &arena) { std::unique_lock _{Metric::mutex}; - std::vector output; + ArenaVector output(&arena); auto format_labels = - [&](const std::vector> + [&](const ArenaVector> &labels) -> std::string_view { if (labels.empty()) { return ""; @@ -747,16 +895,21 @@ std::span render(ArenaAllocator &arena) { // Render counters for (const auto &[name, family] : Metric::get_counter_families()) { - output.push_back( - format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); - output.push_back(format(arena, "# TYPE %s counter\n", name.c_str())); + output.push_back(format(arena, "# HELP %.*s %.*s\n", + static_cast(name.length()), name.data(), + static_cast(family->help.length()), + family->help.data())); + output.push_back(format(arena, "# TYPE %.*s counter\n", + static_cast(name.length()), name.data())); - std::vector> labels_sv; + ArenaVector> labels_sv( + &arena); for (const auto &[labels_key, callback] : family->callbacks) { auto value = callback(); labels_sv.clear(); - for (const auto &l : labels_key.labels) - labels_sv.push_back(l); + for (size_t i = 0; i < labels_key.labels.size(); ++i) { + labels_sv.push_back(labels_key.labels[i]); + } auto labels = format_labels(labels_sv); output.push_back(format(arena, "%.*s%.*s %.17g\n", static_cast(name.length()), name.data(), @@ -765,7 +918,11 @@ std::span render(ArenaAllocator &arena) { } // Aggregate all counter values (thread-local + global accumulated) - std::unordered_map aggregated_values; + std::unordered_map, + std::equal_to, + ArenaStlAllocator>> + aggregated_values{ + ArenaStlAllocator>(&arena)}; // First, add thread-local values for (const auto &[thread_id, per_thread] : family->per_thread_state) { @@ -788,8 +945,9 @@ std::span render(ArenaAllocator &arena) { // Render aggregated counter values for (const auto &[labels_key, total_value] : aggregated_values) { labels_sv.clear(); - for (const auto &l : labels_key.labels) - labels_sv.push_back(l); + for (size_t i = 0; i < labels_key.labels.size(); ++i) { + labels_sv.push_back(labels_key.labels[i]); + } auto labels = format_labels(labels_sv); output.push_back(format(arena, "%.*s%.*s %.17g\n", static_cast(name.length()), name.data(), @@ -800,16 +958,21 @@ std::span render(ArenaAllocator &arena) { // Render gauges for (const auto &[name, family] : Metric::get_gauge_families()) { - output.push_back( - format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); - output.push_back(format(arena, "# TYPE %s gauge\n", name.c_str())); + output.push_back(format(arena, "# HELP %.*s %.*s\n", + static_cast(name.length()), name.data(), + static_cast(family->help.length()), + family->help.data())); + output.push_back(format(arena, "# TYPE %.*s gauge\n", + static_cast(name.length()), name.data())); - std::vector> labels_sv; + ArenaVector> labels_sv( + &arena); for (const auto &[labels_key, callback] : family->callbacks) { auto value = callback(); labels_sv.clear(); - for (const auto &l : labels_key.labels) - labels_sv.push_back(l); + for (size_t i = 0; i < labels_key.labels.size(); ++i) { + labels_sv.push_back(labels_key.labels[i]); + } auto labels = format_labels(labels_sv); output.push_back(format(arena, "%.*s%.*s %.17g\n", static_cast(name.length()), name.data(), @@ -821,8 +984,9 @@ std::span render(ArenaAllocator &arena) { auto value = std::bit_cast( instance->value.load(std::memory_order_relaxed)); labels_sv.clear(); - for (const auto &l : labels_key.labels) - labels_sv.push_back(l); + for (size_t i = 0; i < labels_key.labels.size(); ++i) { + labels_sv.push_back(labels_key.labels[i]); + } auto labels = format_labels(labels_sv); output.push_back(format(arena, "%.*s%.*s %.17g\n", static_cast(name.length()), name.data(), @@ -833,60 +997,85 @@ std::span render(ArenaAllocator &arena) { // Render histograms for (const auto &[name, family] : Metric::get_histogram_families()) { - output.push_back( - format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); - output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str())); + output.push_back(format(arena, "# HELP %.*s %.*s\n", + static_cast(name.length()), name.data(), + static_cast(family->help.length()), + family->help.data())); + output.push_back(format(arena, "# TYPE %.*s histogram\n", + static_cast(name.length()), name.data())); // Aggregate all histogram values (thread-local + global accumulated) - std::unordered_map, std::vector, - double, uint64_t>> - aggregated_histograms; + // Use a simpler structure to avoid tuple constructor issues + struct AggregatedHistogram { + ArenaVector thresholds; + ArenaVector counts; + double sum; + uint64_t observations; - std::vector> bucket_labels_sv; + AggregatedHistogram(ArenaAllocator &arena) + : thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} + }; + std::unordered_map< + LabelsKey, AggregatedHistogram *, std::hash, + std::equal_to, + ArenaStlAllocator>> + aggregated_histograms{ArenaStlAllocator< + std::pair>(&arena)}; + + ArenaVector> bucket_labels_sv( + &arena); // First, collect thread-local histogram data for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[labels_key, instance] : per_thread.instances) { // Extract data under lock - minimize critical section - // Pre-allocate vectors to avoid malloc inside critical section // Note: thresholds and counts sizes never change after histogram // creation - std::vector thresholds_snapshot; - std::vector counts_snapshot; + ArenaVector thresholds_snapshot(&arena); + ArenaVector counts_snapshot(&arena); double sum_snapshot; uint64_t observations_snapshot; - // Pre-allocate outside critical section using immutable sizes - thresholds_snapshot.resize(instance->thresholds.size()); - counts_snapshot.resize(instance->counts.size()); - // Copy data with minimal critical section { std::lock_guard lock(instance->mutex); - std::memcpy(thresholds_snapshot.data(), instance->thresholds.data(), - instance->thresholds.size() * sizeof(double)); - std::memcpy(counts_snapshot.data(), instance->counts.data(), - instance->counts.size() * sizeof(uint64_t)); + // Copy thresholds + for (size_t i = 0; i < instance->thresholds.size(); ++i) { + thresholds_snapshot.push_back(instance->thresholds[i]); + } + // Copy counts + for (size_t i = 0; i < instance->counts.size(); ++i) { + counts_snapshot.push_back(instance->counts[i]); + } sum_snapshot = instance->sum; observations_snapshot = instance->observations; } // Initialize or aggregate into aggregated_histograms - auto &[thresholds, counts, sum, observations] = - aggregated_histograms[labels_key]; - if (thresholds.empty()) { - thresholds = thresholds_snapshot; - counts = counts_snapshot; - sum = sum_snapshot; - observations = observations_snapshot; + auto it = aggregated_histograms.find(labels_key); + if (it == aggregated_histograms.end()) { + // Create new entry + auto *agg_hist = new (arena.allocate_raw( + sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) + AggregatedHistogram(arena); + for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { + agg_hist->thresholds.push_back(thresholds_snapshot[i]); + } + for (size_t i = 0; i < counts_snapshot.size(); ++i) { + agg_hist->counts.push_back(counts_snapshot[i]); + } + agg_hist->sum = sum_snapshot; + agg_hist->observations = observations_snapshot; + aggregated_histograms[labels_key] = agg_hist; } else { + // Aggregate with existing entry + auto *agg_hist = it->second; // Aggregate counts for (size_t i = 0; i < counts_snapshot.size(); ++i) { - counts[i] += counts_snapshot[i]; + agg_hist->counts[i] += counts_snapshot[i]; } - sum += sum_snapshot; - observations += observations_snapshot; + agg_hist->sum += sum_snapshot; + agg_hist->observations += observations_snapshot; } } } @@ -895,74 +1084,85 @@ std::span render(ArenaAllocator &arena) { for (const auto &[labels_key, global_state] : family->global_accumulated_values) { if (global_state) { - auto &[thresholds, counts, sum, observations] = - aggregated_histograms[labels_key]; - if (thresholds.empty()) { - thresholds = global_state->thresholds; - counts = global_state->counts; - sum = global_state->sum; - observations = global_state->observations; - } else { - // Add global accumulated values - for (size_t i = 0; i < global_state->counts.size(); ++i) { - counts[i] += global_state->counts[i]; + auto it = aggregated_histograms.find(labels_key); + if (it == aggregated_histograms.end()) { + // Create new entry from global state + auto *agg_hist = new (arena.allocate_raw( + sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) + AggregatedHistogram(arena); + for (size_t i = 0; i < global_state->thresholds.size(); ++i) { + agg_hist->thresholds.push_back(global_state->thresholds[i]); } - sum += global_state->sum; - observations += global_state->observations; + for (size_t i = 0; i < global_state->counts.size(); ++i) { + agg_hist->counts.push_back(global_state->counts[i]); + } + agg_hist->sum = global_state->sum; + agg_hist->observations = global_state->observations; + aggregated_histograms[labels_key] = agg_hist; + } else { + // Add global accumulated values to existing entry + auto *agg_hist = it->second; + for (size_t i = 0; i < global_state->counts.size(); ++i) { + agg_hist->counts[i] += global_state->counts[i]; + } + agg_hist->sum += global_state->sum; + agg_hist->observations += global_state->observations; } } } // Render aggregated histogram data - for (const auto &[labels_key, histogram_data] : aggregated_histograms) { - const auto &[thresholds_snapshot, counts_snapshot, sum_snapshot, - observations_snapshot] = histogram_data; + for (const auto &[labels_key, agg_hist] : aggregated_histograms) { // Render explicit bucket counts - for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { + for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { bucket_labels_sv.clear(); - for (const auto &l : labels_key.labels) - bucket_labels_sv.push_back(l); + for (size_t j = 0; j < labels_key.labels.size(); ++j) { + bucket_labels_sv.push_back(labels_key.labels[j]); + } bucket_labels_sv.push_back( - {"le", static_format(arena, thresholds_snapshot[i])}); + {"le", static_format(arena, agg_hist->thresholds[i])}); auto labels = format_labels(bucket_labels_sv); - output.push_back( - format(arena, "%s_bucket%.*s %llu\n", name.c_str(), - static_cast(labels.length()), labels.data(), - static_cast(counts_snapshot[i]))); + output.push_back(format( + arena, "%.*s_bucket%.*s %llu\n", static_cast(name.length()), + name.data(), static_cast(labels.length()), labels.data(), + static_cast(agg_hist->counts[i]))); } // Render +Inf bucket using total observations count bucket_labels_sv.clear(); - for (const auto &l : labels_key.labels) - bucket_labels_sv.push_back(l); + for (size_t j = 0; j < labels_key.labels.size(); ++j) { + bucket_labels_sv.push_back(labels_key.labels[j]); + } bucket_labels_sv.push_back({"le", "+Inf"}); auto inf_labels = format_labels(bucket_labels_sv); - output.push_back( - format(arena, "%s_bucket%.*s %llu\n", name.c_str(), - static_cast(inf_labels.length()), inf_labels.data(), - static_cast(observations_snapshot))); + output.push_back(format( + arena, "%.*s_bucket%.*s %llu\n", static_cast(name.length()), + name.data(), static_cast(inf_labels.length()), inf_labels.data(), + static_cast(agg_hist->observations))); // Render sum bucket_labels_sv.clear(); - for (const auto &l : labels_key.labels) - bucket_labels_sv.push_back(l); + for (size_t j = 0; j < labels_key.labels.size(); ++j) { + bucket_labels_sv.push_back(labels_key.labels[j]); + } auto labels = format_labels(bucket_labels_sv); - output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(), + output.push_back(format(arena, "%.*s_sum%.*s %.17g\n", + static_cast(name.length()), name.data(), static_cast(labels.length()), labels.data(), - sum_snapshot)); + agg_hist->sum)); // Render count - output.push_back( - format(arena, "%s_count%.*s %llu\n", name.c_str(), - static_cast(labels.length()), labels.data(), - static_cast(observations_snapshot))); + output.push_back(format( + arena, "%.*s_count%.*s %llu\n", static_cast(name.length()), + name.data(), static_cast(labels.length()), labels.data(), + static_cast(agg_hist->observations))); } } auto result = arena.allocate(output.size()); - std::copy(output.begin(), output.end(), result); + std::copy(output.data(), output.data() + output.size(), result); return std::span(result, output.size()); } @@ -972,21 +1172,23 @@ void Family::register_callback( std::vector> labels, MetricCallback callback) { std::unique_lock _{Metric::mutex}; - LabelsKey key{std::move(labels)}; + LabelsKey key{labels, Metric::get_global_arena()}; // Validate that labels aren't already in use by create() calls for (const auto &[thread_id, per_thread] : p->per_thread_state) { validate_or_abort( per_thread.instances.find(key) == per_thread.instances.end(), "labels already registered as static instance", - key.labels.empty() ? "(no labels)" : key.labels[0].first.c_str()); + key.labels.empty() ? "(no labels)" + : std::string(key.labels[0].first).c_str()); } // Validate that callback isn't already registered for these labels validate_or_abort(p->callbacks.find(key) == p->callbacks.end(), "callback already registered for labels", - key.labels.empty() ? "(no labels)" - : key.labels[0].first.c_str()); + key.labels.empty() + ? "(no labels)" + : std::string(key.labels[0].first).c_str()); p->callbacks[std::move(key)] = std::move(callback); } @@ -996,19 +1198,21 @@ void Family::register_callback( std::vector> labels, MetricCallback callback) { std::unique_lock _{Metric::mutex}; - LabelsKey key{std::move(labels)}; + LabelsKey key{labels, Metric::get_global_arena()}; // Validate that labels aren't already in use by create() calls validate_or_abort(p->instances.find(key) == p->instances.end(), "labels already registered as static instance", - key.labels.empty() ? "(no labels)" - : key.labels[0].first.c_str()); + key.labels.empty() + ? "(no labels)" + : std::string(key.labels[0].first).c_str()); // Validate that callback isn't already registered for these labels validate_or_abort(p->callbacks.find(key) == p->callbacks.end(), "callback already registered for labels", - key.labels.empty() ? "(no labels)" - : key.labels[0].first.c_str()); + key.labels.empty() + ? "(no labels)" + : std::string(key.labels[0].first).c_str()); p->callbacks[std::move(key)] = std::move(callback); } diff --git a/src/metric.hpp b/src/metric.hpp index 7340ced..093e283 100644 --- a/src/metric.hpp +++ b/src/metric.hpp @@ -36,7 +36,6 @@ // histogram.observe(0.25); // ONLY call from creating thread #include -#include #include #include #include @@ -197,4 +196,4 @@ bool is_valid_label_value(const std::string &value); // Note: Histograms do not support callbacks due to their multi-value nature // (buckets + sum + count). Use static histogram metrics only. -} // namespace metric \ No newline at end of file +} // namespace metric