From 953ec3ad43ec8d4d33ee9782c9153bfb45bd3d2b Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 1 Sep 2025 15:05:27 -0400 Subject: [PATCH] Separate compute and format phases for render --- src/metric.cpp | 330 ++++++++++++++++++++++++++++++++++--------------- 1 file changed, 233 insertions(+), 97 deletions(-) diff --git a/src/metric.cpp b/src/metric.cpp index 2b33d5b..c327bc8 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -922,9 +922,195 @@ bool is_valid_label_value(std::string_view value) { return simdutf::validate_utf8(value.data(), value.size()); } +union MetricValue { + double as_double; + uint64_t as_uint64; +}; + +// Phase 1: Compute all metric values in deterministic order +static ArenaVector compute_metric_values(ArenaAllocator &arena) { + ArenaVector values(&arena); + + // Compute counter values - ITERATION ORDER MUST MATCH FORMAT PHASE + for (const auto &[name, family] : Metric::get_counter_families()) { + // Callback values + for (const auto &[labels_key, callback] : family->callbacks) { + auto value = callback(); + values.push_back({.as_double = value}); + } + + // Aggregate all counter values (thread-local + global accumulated) + std::map, + ArenaStlAllocator>> + aggregated_values{ + ArenaStlAllocator>(&arena)}; + + // First, add thread-local values + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + for (const auto &[labels_key, instance] : per_thread.instances) { + // Atomic read to match atomic store in Counter::inc() + double value; + __atomic_load(&instance->value, &value, __ATOMIC_RELAXED); + aggregated_values[labels_key] += value; + } + } + + // Then, add globally accumulated values from destroyed threads + for (const auto &[labels_key, global_state] : + family->global_accumulated_values) { + if (global_state) { + aggregated_values[labels_key] += global_state->value; + } + } + + // Store aggregated counter values + for (const auto &[labels_key, total_value] : aggregated_values) { + values.push_back({.as_double = total_value}); + } + } + + // Compute gauge values - ITERATION ORDER MUST MATCH FORMAT PHASE + for (const auto &[name, family] : Metric::get_gauge_families()) { + // Callback values + for (const auto &[labels_key, callback] : family->callbacks) { + auto value = callback(); + values.push_back({.as_double = value}); + } + + // Instance values + for (const auto &[labels_key, instance] : family->instances) { + auto value = std::bit_cast( + instance->value.load(std::memory_order_relaxed)); + values.push_back({.as_double = value}); + } + } + + // Compute histogram values - ITERATION ORDER MUST MATCH FORMAT PHASE + for (const auto &[name, family] : Metric::get_histogram_families()) { + // Aggregate all histogram values (thread-local + global accumulated) + struct AggregatedHistogram { + ArenaVector thresholds; + ArenaVector counts; + double sum; + uint64_t observations; + + AggregatedHistogram(ArenaAllocator &arena) + : thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} + }; + std::map< + LabelsKey, AggregatedHistogram *, std::less, + ArenaStlAllocator>> + aggregated_histograms{ArenaStlAllocator< + std::pair>(&arena)}; + + // First, collect thread-local histogram data + for (const auto &[thread_id, per_thread] : family->per_thread_state) { + for (const auto &[labels_key, instance] : per_thread.instances) { + // Extract data under lock - minimize critical section + ArenaVector thresholds_snapshot(&arena); + ArenaVector counts_snapshot(&arena); + double sum_snapshot; + uint64_t observations_snapshot; + + // Copy data with minimal critical section + { + std::lock_guard lock(instance->mutex); + for (size_t i = 0; i < instance->thresholds.size(); ++i) { + thresholds_snapshot.push_back(instance->thresholds[i]); + } + for (size_t i = 0; i < instance->counts.size(); ++i) { + counts_snapshot.push_back(instance->counts[i]); + } + sum_snapshot = instance->sum; + observations_snapshot = instance->observations; + } + + // Initialize or aggregate into aggregated_histograms + auto it = aggregated_histograms.find(labels_key); + if (it == aggregated_histograms.end()) { + // Create new entry + auto *agg_hist = new (arena.allocate_raw( + sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) + AggregatedHistogram(arena); + for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { + agg_hist->thresholds.push_back(thresholds_snapshot[i]); + } + for (size_t i = 0; i < counts_snapshot.size(); ++i) { + agg_hist->counts.push_back(counts_snapshot[i]); + } + agg_hist->sum = sum_snapshot; + agg_hist->observations = observations_snapshot; + aggregated_histograms[labels_key] = agg_hist; + } else { + // Aggregate with existing entry + auto *agg_hist = it->second; + for (size_t i = 0; i < counts_snapshot.size(); ++i) { + agg_hist->counts[i] += counts_snapshot[i]; + } + agg_hist->sum += sum_snapshot; + agg_hist->observations += observations_snapshot; + } + } + } + + // Then, add globally accumulated values from destroyed threads + for (const auto &[labels_key, global_state] : + family->global_accumulated_values) { + if (global_state) { + auto it = aggregated_histograms.find(labels_key); + if (it == aggregated_histograms.end()) { + // Create new entry from global state + auto *agg_hist = new (arena.allocate_raw( + sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) + AggregatedHistogram(arena); + for (size_t i = 0; i < global_state->thresholds.size(); ++i) { + agg_hist->thresholds.push_back(global_state->thresholds[i]); + } + for (size_t i = 0; i < global_state->counts.size(); ++i) { + agg_hist->counts.push_back(global_state->counts[i]); + } + agg_hist->sum = global_state->sum; + agg_hist->observations = global_state->observations; + aggregated_histograms[labels_key] = agg_hist; + } else { + // Add global accumulated values to existing entry + auto *agg_hist = it->second; + for (size_t i = 0; i < global_state->counts.size(); ++i) { + agg_hist->counts[i] += global_state->counts[i]; + } + agg_hist->sum += global_state->sum; + agg_hist->observations += global_state->observations; + } + } + } + + // Store histogram values + for (const auto &[labels_key, agg_hist] : aggregated_histograms) { + // Store explicit bucket counts + for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { + values.push_back({.as_uint64 = agg_hist->counts[i]}); + } + // Store +Inf bucket (total observations) + values.push_back({.as_uint64 = agg_hist->observations}); + // Store sum + values.push_back({.as_double = agg_hist->sum}); + // Store count + values.push_back({.as_uint64 = agg_hist->observations}); + } + } + + return values; +} + +// Phase 2: Format metrics using pre-computed values std::span render(ArenaAllocator &arena) { + // Hold lock throughout both phases to prevent registry changes std::unique_lock _{Metric::mutex}; + // Phase 1: Compute all metric values + ArenaVector metric_values = compute_metric_values(arena); + const MetricValue *next_value = metric_values.data(); + ArenaVector output(&arena); auto format_labels = @@ -983,7 +1169,7 @@ std::span render(ArenaAllocator &arena) { return std::string_view(buf, p - buf); }; - // Render counters + // Format counters - ITERATION ORDER MUST MATCH COMPUTE PHASE for (const auto &[name, family] : Metric::get_counter_families()) { output.push_back(format(arena, "# HELP %.*s %.*s\n", static_cast(name.length()), name.data(), @@ -994,8 +1180,10 @@ std::span render(ArenaAllocator &arena) { ArenaVector> labels_sv( &arena); + + // Format callback values for (const auto &[labels_key, callback] : family->callbacks) { - auto value = callback(); + auto value = next_value++->as_double; labels_sv.clear(); for (size_t i = 0; i < labels_key.labels.size(); ++i) { labels_sv.push_back(labels_key.labels[i]); @@ -1007,32 +1195,29 @@ std::span render(ArenaAllocator &arena) { value)); } - // Aggregate all counter values (thread-local + global accumulated) + // Recreate aggregated values map for iteration order std::map, ArenaStlAllocator>> aggregated_values{ ArenaStlAllocator>(&arena)}; - // First, add thread-local values + // Populate map to get same iteration order (values ignored, using + // pre-computed) for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[labels_key, instance] : per_thread.instances) { - // Atomic read to match atomic store in Counter::inc() - double value; - __atomic_load(&instance->value, &value, __ATOMIC_RELAXED); - aggregated_values[labels_key] += value; + aggregated_values[labels_key] = 0.0; // Placeholder } } - - // Then, add globally accumulated values from destroyed threads for (const auto &[labels_key, global_state] : family->global_accumulated_values) { if (global_state) { - aggregated_values[labels_key] += global_state->value; + aggregated_values[labels_key] = 0.0; // Placeholder } } - // Render aggregated counter values - for (const auto &[labels_key, total_value] : aggregated_values) { + // Format aggregated counter values + for (const auto &[labels_key, ignored_value] : aggregated_values) { + auto total_value = next_value++->as_double; labels_sv.clear(); for (size_t i = 0; i < labels_key.labels.size(); ++i) { labels_sv.push_back(labels_key.labels[i]); @@ -1045,7 +1230,7 @@ std::span render(ArenaAllocator &arena) { } } - // Render gauges + // Format gauges - ITERATION ORDER MUST MATCH COMPUTE PHASE for (const auto &[name, family] : Metric::get_gauge_families()) { output.push_back(format(arena, "# HELP %.*s %.*s\n", static_cast(name.length()), name.data(), @@ -1056,8 +1241,10 @@ std::span render(ArenaAllocator &arena) { ArenaVector> labels_sv( &arena); + + // Format callback values for (const auto &[labels_key, callback] : family->callbacks) { - auto value = callback(); + auto value = next_value++->as_double; labels_sv.clear(); for (size_t i = 0; i < labels_key.labels.size(); ++i) { labels_sv.push_back(labels_key.labels[i]); @@ -1069,9 +1256,9 @@ std::span render(ArenaAllocator &arena) { value)); } + // Format instance values for (const auto &[labels_key, instance] : family->instances) { - auto value = std::bit_cast( - instance->value.load(std::memory_order_relaxed)); + auto value = next_value++->as_double; labels_sv.clear(); for (size_t i = 0; i < labels_key.labels.size(); ++i) { labels_sv.push_back(labels_key.labels[i]); @@ -1084,7 +1271,7 @@ std::span render(ArenaAllocator &arena) { } } - // Render histograms + // Format histograms - ITERATION ORDER MUST MATCH COMPUTE PHASE for (const auto &[name, family] : Metric::get_histogram_families()) { output.push_back(format(arena, "# HELP %.*s %.*s\n", static_cast(name.length()), name.data(), @@ -1093,8 +1280,7 @@ std::span render(ArenaAllocator &arena) { output.push_back(format(arena, "# TYPE %.*s histogram\n", static_cast(name.length()), name.data())); - // Aggregate all histogram values (thread-local + global accumulated) - // Use a simpler structure to avoid tuple constructor issues + // Recreate aggregated histograms map for iteration order struct AggregatedHistogram { ArenaVector thresholds; ArenaVector counts; @@ -1110,115 +1296,64 @@ std::span render(ArenaAllocator &arena) { aggregated_histograms{ArenaStlAllocator< std::pair>(&arena)}; - ArenaVector> bucket_labels_sv( - &arena); - - // First, collect thread-local histogram data + // Recreate map structure for iteration order (recompute thresholds for + // formatting) for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[labels_key, instance] : per_thread.instances) { - // Extract data under lock - minimize critical section - // Note: thresholds and counts sizes never change after histogram - // creation - ArenaVector thresholds_snapshot(&arena); - ArenaVector counts_snapshot(&arena); - double sum_snapshot; - uint64_t observations_snapshot; - - // Copy data with minimal critical section - { - std::lock_guard lock(instance->mutex); - // Copy thresholds - for (size_t i = 0; i < instance->thresholds.size(); ++i) { - thresholds_snapshot.push_back(instance->thresholds[i]); - } - // Copy counts - for (size_t i = 0; i < instance->counts.size(); ++i) { - counts_snapshot.push_back(instance->counts[i]); - } - sum_snapshot = instance->sum; - observations_snapshot = instance->observations; - } - - // Initialize or aggregate into aggregated_histograms auto it = aggregated_histograms.find(labels_key); if (it == aggregated_histograms.end()) { - // Create new entry auto *agg_hist = new (arena.allocate_raw( sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) AggregatedHistogram(arena); - for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { - agg_hist->thresholds.push_back(thresholds_snapshot[i]); + // Copy thresholds for le= formatting + std::lock_guard lock(instance->mutex); + for (size_t i = 0; i < instance->thresholds.size(); ++i) { + agg_hist->thresholds.push_back(instance->thresholds[i]); } - for (size_t i = 0; i < counts_snapshot.size(); ++i) { - agg_hist->counts.push_back(counts_snapshot[i]); - } - agg_hist->sum = sum_snapshot; - agg_hist->observations = observations_snapshot; aggregated_histograms[labels_key] = agg_hist; - } else { - // Aggregate with existing entry - auto *agg_hist = it->second; - // Aggregate counts - for (size_t i = 0; i < counts_snapshot.size(); ++i) { - agg_hist->counts[i] += counts_snapshot[i]; - } - agg_hist->sum += sum_snapshot; - agg_hist->observations += observations_snapshot; } } } - - // Then, add globally accumulated values from destroyed threads for (const auto &[labels_key, global_state] : family->global_accumulated_values) { if (global_state) { auto it = aggregated_histograms.find(labels_key); if (it == aggregated_histograms.end()) { - // Create new entry from global state auto *agg_hist = new (arena.allocate_raw( sizeof(AggregatedHistogram), alignof(AggregatedHistogram))) AggregatedHistogram(arena); + // Copy thresholds for le= formatting for (size_t i = 0; i < global_state->thresholds.size(); ++i) { agg_hist->thresholds.push_back(global_state->thresholds[i]); } - for (size_t i = 0; i < global_state->counts.size(); ++i) { - agg_hist->counts.push_back(global_state->counts[i]); - } - agg_hist->sum = global_state->sum; - agg_hist->observations = global_state->observations; aggregated_histograms[labels_key] = agg_hist; - } else { - // Add global accumulated values to existing entry - auto *agg_hist = it->second; - for (size_t i = 0; i < global_state->counts.size(); ++i) { - agg_hist->counts[i] += global_state->counts[i]; - } - agg_hist->sum += global_state->sum; - agg_hist->observations += global_state->observations; } } } - // Render aggregated histogram data - for (const auto &[labels_key, agg_hist] : aggregated_histograms) { + ArenaVector> bucket_labels_sv( + &arena); - // Render explicit bucket counts + // Format histogram data using pre-computed values + for (const auto &[labels_key, agg_hist] : aggregated_histograms) { + // Format explicit bucket counts for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { + auto count = next_value++->as_uint64; bucket_labels_sv.clear(); for (size_t j = 0; j < labels_key.labels.size(); ++j) { bucket_labels_sv.push_back(labels_key.labels[j]); } - bucket_labels_sv.push_back( {"le", static_format(arena, agg_hist->thresholds[i])}); auto labels = format_labels(bucket_labels_sv); output.push_back(format( arena, "%.*s_bucket%.*s %llu\n", static_cast(name.length()), name.data(), static_cast(labels.length()), labels.data(), - static_cast(agg_hist->counts[i]))); + static_cast(count))); } - // Render +Inf bucket using total observations count + // Format +Inf bucket + auto observations = next_value++->as_uint64; bucket_labels_sv.clear(); for (size_t j = 0; j < labels_key.labels.size(); ++j) { bucket_labels_sv.push_back(labels_key.labels[j]); @@ -1228,24 +1363,25 @@ std::span render(ArenaAllocator &arena) { output.push_back(format( arena, "%.*s_bucket%.*s %llu\n", static_cast(name.length()), name.data(), static_cast(inf_labels.length()), inf_labels.data(), - static_cast(agg_hist->observations))); + static_cast(observations))); - // Render sum + // Format sum + auto sum = next_value++->as_double; bucket_labels_sv.clear(); for (size_t j = 0; j < labels_key.labels.size(); ++j) { bucket_labels_sv.push_back(labels_key.labels[j]); } auto labels = format_labels(bucket_labels_sv); - output.push_back(format(arena, "%.*s_sum%.*s %.17g\n", + output.push_back(format( + arena, "%.*s_sum%.*s %.17g\n", static_cast(name.length()), + name.data(), static_cast(labels.length()), labels.data(), sum)); + + // Format count + auto count = next_value++->as_uint64; + output.push_back(format(arena, "%.*s_count%.*s %llu\n", static_cast(name.length()), name.data(), static_cast(labels.length()), labels.data(), - agg_hist->sum)); - - // Render count - output.push_back(format( - arena, "%.*s_count%.*s %llu\n", static_cast(name.length()), - name.data(), static_cast(labels.length()), labels.data(), - static_cast(agg_hist->observations))); + static_cast(count))); } }