From 31e751fe75b579c36410d462456b10a51b5911f4 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 1 Sep 2025 16:52:40 -0400 Subject: [PATCH] Change iteration order to avoid temporary map --- src/arena_allocator.hpp | 7 + src/metric.cpp | 359 +++++++++++++++++++--------------------- 2 files changed, 174 insertions(+), 192 deletions(-) diff --git a/src/arena_allocator.hpp b/src/arena_allocator.hpp index c21e603..460415d 100644 --- a/src/arena_allocator.hpp +++ b/src/arena_allocator.hpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -639,6 +640,12 @@ template struct ArenaVector { void clear() { size_ = 0; } + // Implicit conversion to std::span + operator std::span() { return std::span(data_, size_); } + operator std::span() const { + return std::span(data_, size_); + } + // Iterator support for range-based for loops T *begin() { return data_; } const T *begin() const { return data_; } diff --git a/src/metric.cpp b/src/metric.cpp index c327bc8..42dbb08 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -13,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -927,11 +928,83 @@ union MetricValue { uint64_t as_uint64; }; +// Label sets for each family type, built once and reused +struct LabelSets { + std::vector< + std::set, ArenaStlAllocator>> + counter_labels; + std::vector< + std::set, ArenaStlAllocator>> + gauge_labels; + std::vector< + std::set, ArenaStlAllocator>> + histogram_labels; +}; + +// Build label sets once for reuse in both phases +static LabelSets build_label_sets(ArenaAllocator &arena) { + LabelSets label_sets; + + // Build counter label sets + for (const auto &[name, family] : Metric::get_counter_families()) { + std::set, ArenaStlAllocator> + all_labels{ArenaStlAllocator(&arena)}; + + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + for (const auto &[labels_key, instance] : per_thread.instances) { + all_labels.insert(labels_key); + } + } + for (const auto &[labels_key, global_state] : + family->global_accumulated_values) { + if (global_state) { + all_labels.insert(labels_key); + } + } + + label_sets.counter_labels.push_back(std::move(all_labels)); + } + + // Build gauge label sets (none needed - gauges iterate directly over + // instances) + for (const auto &[name, family] : Metric::get_gauge_families()) { + (void)name; + (void)family; // Suppress unused variable warnings + std::set, ArenaStlAllocator> + empty_set{ArenaStlAllocator(&arena)}; + label_sets.gauge_labels.push_back(std::move(empty_set)); + } + + // Build histogram label sets + for (const auto &[name, family] : Metric::get_histogram_families()) { + std::set, ArenaStlAllocator> + all_labels{ArenaStlAllocator(&arena)}; + + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + for (const auto &[labels_key, instance] : per_thread.instances) { + all_labels.insert(labels_key); + } + } + for (const auto &[labels_key, global_state] : + family->global_accumulated_values) { + if (global_state) { + all_labels.insert(labels_key); + } + } + + label_sets.histogram_labels.push_back(std::move(all_labels)); + } + + return label_sets; +} + // Phase 1: Compute all metric values in deterministic order -static ArenaVector compute_metric_values(ArenaAllocator &arena) { +static ArenaVector +compute_metric_values(ArenaAllocator &arena, const LabelSets &label_sets) { ArenaVector values(&arena); // Compute counter values - ITERATION ORDER MUST MATCH FORMAT PHASE + size_t counter_family_idx = 0; for (const auto &[name, family] : Metric::get_counter_families()) { // Callback values for (const auto &[labels_key, callback] : family->callbacks) { @@ -939,32 +1012,31 @@ static ArenaVector compute_metric_values(ArenaAllocator &arena) { values.push_back({.as_double = value}); } - // Aggregate all counter values (thread-local + global accumulated) - std::map, - ArenaStlAllocator>> - aggregated_values{ - ArenaStlAllocator>(&arena)}; + // Use pre-built label sets + const auto &all_labels = label_sets.counter_labels[counter_family_idx++]; - // First, add thread-local values - for (const auto &[thread_id, per_thread] : family->per_thread_state) { - for (const auto &[labels_key, instance] : per_thread.instances) { - // Atomic read to match atomic store in Counter::inc() - double value; - __atomic_load(&instance->value, &value, __ATOMIC_RELAXED); - aggregated_values[labels_key] += value; + // Iterate by label, lookup per thread (O(1) unordered_map lookup) + for (const auto &labels_key : all_labels) { + double total_value = 0.0; + + // Sum thread-local values for this label set + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + auto it = per_thread.instances.find(labels_key); + if (it != per_thread.instances.end()) { + // Atomic read to match atomic store in Counter::inc() + double value; + __atomic_load(&it->second->value, &value, __ATOMIC_RELAXED); + total_value += value; + } } - } - // Then, add globally accumulated values from destroyed threads - for (const auto &[labels_key, global_state] : - family->global_accumulated_values) { - if (global_state) { - aggregated_values[labels_key] += global_state->value; + // Add global accumulated value for this label set + auto global_it = family->global_accumulated_values.find(labels_key); + if (global_it != family->global_accumulated_values.end() && + global_it->second) { + total_value += global_it->second->value; } - } - // Store aggregated counter values - for (const auto &[labels_key, total_value] : aggregated_values) { values.push_back({.as_double = total_value}); } } @@ -977,7 +1049,7 @@ static ArenaVector compute_metric_values(ArenaAllocator &arena) { values.push_back({.as_double = value}); } - // Instance values + // Instance values (gauges don't aggregate, just direct values) for (const auto &[labels_key, instance] : family->instances) { auto value = std::bit_cast( instance->value.load(std::memory_order_relaxed)); @@ -986,116 +1058,76 @@ static ArenaVector compute_metric_values(ArenaAllocator &arena) { } // Compute histogram values - ITERATION ORDER MUST MATCH FORMAT PHASE + size_t histogram_family_idx = 0; for (const auto &[name, family] : Metric::get_histogram_families()) { - // Aggregate all histogram values (thread-local + global accumulated) - struct AggregatedHistogram { - ArenaVector thresholds; - ArenaVector counts; - double sum; - uint64_t observations; + // Use pre-built label sets + const auto &all_labels = + label_sets.histogram_labels[histogram_family_idx++]; - AggregatedHistogram(ArenaAllocator &arena) - : thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} - }; - std::map< - LabelsKey, AggregatedHistogram *, std::less, - ArenaStlAllocator>> - aggregated_histograms{ArenaStlAllocator< - std::pair>(&arena)}; + // Iterate by label, lookup per thread (O(1) unordered_map lookup) + for (const auto &labels_key : all_labels) { + // Get bucket count from family config or first instance + size_t bucket_count = family->buckets.size(); - // First, collect thread-local histogram data - for (const auto &[thread_id, per_thread] : family->per_thread_state) { - for (const auto &[labels_key, instance] : per_thread.instances) { - // Extract data under lock - minimize critical section - ArenaVector thresholds_snapshot(&arena); - ArenaVector counts_snapshot(&arena); - double sum_snapshot; - uint64_t observations_snapshot; + ArenaVector total_counts(&arena); + for (size_t i = 0; i < bucket_count; ++i) { + total_counts.push_back(0); + } + double total_sum = 0.0; + uint64_t total_observations = 0; - // Copy data with minimal critical section - { - std::lock_guard lock(instance->mutex); - for (size_t i = 0; i < instance->thresholds.size(); ++i) { - thresholds_snapshot.push_back(instance->thresholds[i]); + // Sum thread-local values for this label set + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + auto it = per_thread.instances.find(labels_key); + if (it != per_thread.instances.end()) { + auto *instance = it->second; + + // Extract data under lock - minimize critical section + ArenaVector counts_snapshot(&arena); + double sum_snapshot; + uint64_t observations_snapshot; + + { + std::lock_guard lock(instance->mutex); + for (size_t i = 0; i < instance->counts.size(); ++i) { + counts_snapshot.push_back(instance->counts[i]); + } + sum_snapshot = instance->sum; + observations_snapshot = instance->observations; } - for (size_t i = 0; i < instance->counts.size(); ++i) { - counts_snapshot.push_back(instance->counts[i]); - } - sum_snapshot = instance->sum; - observations_snapshot = instance->observations; - } - // Initialize or aggregate into aggregated_histograms - auto it = aggregated_histograms.find(labels_key); - if (it == aggregated_histograms.end()) { - // Create new entry - auto *agg_hist = new (arena.allocate_raw( - sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) - AggregatedHistogram(arena); - for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { - agg_hist->thresholds.push_back(thresholds_snapshot[i]); - } + // Add to totals for (size_t i = 0; i < counts_snapshot.size(); ++i) { - agg_hist->counts.push_back(counts_snapshot[i]); + total_counts[i] += counts_snapshot[i]; } - agg_hist->sum = sum_snapshot; - agg_hist->observations = observations_snapshot; - aggregated_histograms[labels_key] = agg_hist; - } else { - // Aggregate with existing entry - auto *agg_hist = it->second; - for (size_t i = 0; i < counts_snapshot.size(); ++i) { - agg_hist->counts[i] += counts_snapshot[i]; - } - agg_hist->sum += sum_snapshot; - agg_hist->observations += observations_snapshot; + total_sum += sum_snapshot; + total_observations += observations_snapshot; } } - } - // Then, add globally accumulated values from destroyed threads - for (const auto &[labels_key, global_state] : - family->global_accumulated_values) { - if (global_state) { - auto it = aggregated_histograms.find(labels_key); - if (it == aggregated_histograms.end()) { - // Create new entry from global state - auto *agg_hist = new (arena.allocate_raw( - sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) - AggregatedHistogram(arena); - for (size_t i = 0; i < global_state->thresholds.size(); ++i) { - agg_hist->thresholds.push_back(global_state->thresholds[i]); - } - for (size_t i = 0; i < global_state->counts.size(); ++i) { - agg_hist->counts.push_back(global_state->counts[i]); - } - agg_hist->sum = global_state->sum; - agg_hist->observations = global_state->observations; - aggregated_histograms[labels_key] = agg_hist; - } else { - // Add global accumulated values to existing entry - auto *agg_hist = it->second; - for (size_t i = 0; i < global_state->counts.size(); ++i) { - agg_hist->counts[i] += global_state->counts[i]; - } - agg_hist->sum += global_state->sum; - agg_hist->observations += global_state->observations; + // Add global accumulated value for this label set + auto global_it = family->global_accumulated_values.find(labels_key); + if (global_it != family->global_accumulated_values.end() && + global_it->second) { + auto *global_state = global_it->second; + for (size_t i = 0; i < global_state->counts.size(); ++i) { + total_counts[i] += global_state->counts[i]; } + total_sum += global_state->sum; + total_observations += global_state->observations; } - } - // Store histogram values - for (const auto &[labels_key, agg_hist] : aggregated_histograms) { + // Store histogram values // Store explicit bucket counts - for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { - values.push_back({.as_uint64 = agg_hist->counts[i]}); + for (size_t i = 0; i < total_counts.size(); ++i) { + values.push_back({.as_uint64 = total_counts[i]}); } // Store +Inf bucket (total observations) - values.push_back({.as_uint64 = agg_hist->observations}); + values.push_back({.as_uint64 = total_observations}); // Store sum - values.push_back({.as_double = agg_hist->sum}); + values.push_back({.as_double = total_sum}); // Store count - values.push_back({.as_uint64 = agg_hist->observations}); + values.push_back({.as_uint64 = total_observations}); } } @@ -1107,8 +1139,12 @@ std::span render(ArenaAllocator &arena) { // Hold lock throughout both phases to prevent registry changes std::unique_lock _{Metric::mutex}; + // Build label sets once for both phases + LabelSets label_sets = build_label_sets(arena); + // Phase 1: Compute all metric values - ArenaVector metric_values = compute_metric_values(arena); + ArenaVector metric_values = + compute_metric_values(arena, label_sets); const MetricValue *next_value = metric_values.data(); ArenaVector output(&arena); @@ -1170,6 +1206,7 @@ std::span render(ArenaAllocator &arena) { }; // Format counters - ITERATION ORDER MUST MATCH COMPUTE PHASE + size_t counter_family_idx = 0; for (const auto &[name, family] : Metric::get_counter_families()) { output.push_back(format(arena, "# HELP %.*s %.*s\n", static_cast(name.length()), name.data(), @@ -1195,28 +1232,11 @@ std::span render(ArenaAllocator &arena) { value)); } - // Recreate aggregated values map for iteration order - std::map, - ArenaStlAllocator>> - aggregated_values{ - ArenaStlAllocator>(&arena)}; + // Use pre-built label sets (same as compute phase) + const auto &all_labels = label_sets.counter_labels[counter_family_idx++]; - // Populate map to get same iteration order (values ignored, using - // pre-computed) - for (const auto &[thread_id, per_thread] : family->per_thread_state) { - for (const auto &[labels_key, instance] : per_thread.instances) { - aggregated_values[labels_key] = 0.0; // Placeholder - } - } - for (const auto &[labels_key, global_state] : - family->global_accumulated_values) { - if (global_state) { - aggregated_values[labels_key] = 0.0; // Placeholder - } - } - - // Format aggregated counter values - for (const auto &[labels_key, ignored_value] : aggregated_values) { + // Format counter values using pre-computed values + for (const auto &labels_key : all_labels) { auto total_value = next_value++->as_double; labels_sv.clear(); for (size_t i = 0; i < labels_key.labels.size(); ++i) { @@ -1272,6 +1292,7 @@ std::span render(ArenaAllocator &arena) { } // Format histograms - ITERATION ORDER MUST MATCH COMPUTE PHASE + size_t histogram_family_idx = 0; for (const auto &[name, family] : Metric::get_histogram_families()) { output.push_back(format(arena, "# HELP %.*s %.*s\n", static_cast(name.length()), name.data(), @@ -1280,71 +1301,27 @@ std::span render(ArenaAllocator &arena) { output.push_back(format(arena, "# TYPE %.*s histogram\n", static_cast(name.length()), name.data())); - // Recreate aggregated histograms map for iteration order - struct AggregatedHistogram { - ArenaVector thresholds; - ArenaVector counts; - double sum; - uint64_t observations; - - AggregatedHistogram(ArenaAllocator &arena) - : thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} - }; - std::map< - LabelsKey, AggregatedHistogram *, std::less, - ArenaStlAllocator>> - aggregated_histograms{ArenaStlAllocator< - std::pair>(&arena)}; - - // Recreate map structure for iteration order (recompute thresholds for - // formatting) - for (const auto &[thread_id, per_thread] : family->per_thread_state) { - for (const auto &[labels_key, instance] : per_thread.instances) { - auto it = aggregated_histograms.find(labels_key); - if (it == aggregated_histograms.end()) { - auto *agg_hist = new (arena.allocate_raw( - sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) - AggregatedHistogram(arena); - // Copy thresholds for le= formatting - std::lock_guard lock(instance->mutex); - for (size_t i = 0; i < instance->thresholds.size(); ++i) { - agg_hist->thresholds.push_back(instance->thresholds[i]); - } - aggregated_histograms[labels_key] = agg_hist; - } - } - } - for (const auto &[labels_key, global_state] : - family->global_accumulated_values) { - if (global_state) { - auto it = aggregated_histograms.find(labels_key); - if (it == aggregated_histograms.end()) { - auto *agg_hist = new (arena.allocate_raw( - sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) - AggregatedHistogram(arena); - // Copy thresholds for le= formatting - for (size_t i = 0; i < global_state->thresholds.size(); ++i) { - agg_hist->thresholds.push_back(global_state->thresholds[i]); - } - aggregated_histograms[labels_key] = agg_hist; - } - } - } + // Use pre-built label sets (same as compute phase) + const auto &all_labels = + label_sets.histogram_labels[histogram_family_idx++]; ArenaVector> bucket_labels_sv( &arena); // Format histogram data using pre-computed values - for (const auto &[labels_key, agg_hist] : aggregated_histograms) { + for (const auto &labels_key : all_labels) { + // Get bucket thresholds from family config + size_t bucket_count = family->buckets.size(); + // Format explicit bucket counts - for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { + for (size_t i = 0; i < bucket_count; ++i) { auto count = next_value++->as_uint64; bucket_labels_sv.clear(); for (size_t j = 0; j < labels_key.labels.size(); ++j) { bucket_labels_sv.push_back(labels_key.labels[j]); } bucket_labels_sv.push_back( - {"le", static_format(arena, agg_hist->thresholds[i])}); + {"le", static_format(arena, family->buckets[i])}); auto labels = format_labels(bucket_labels_sv); output.push_back(format( arena, "%.*s_bucket%.*s %llu\n", static_cast(name.length()), @@ -1385,9 +1362,7 @@ std::span render(ArenaAllocator &arena) { } } - auto result = arena.allocate(output.size()); - std::copy(output.data(), output.data() + output.size(), result); - return std::span(result, output.size()); + return output; } // Template specialization implementations for register_callback