Change iteration order to avoid temporary map

This commit is contained in:
2025-09-01 16:52:40 -04:00
parent 953ec3ad43
commit 31e751fe75
2 changed files with 174 additions and 192 deletions

View File

@@ -9,6 +9,7 @@
#include <iostream> #include <iostream>
#include <limits> #include <limits>
#include <new> #include <new>
#include <span>
#include <type_traits> #include <type_traits>
#include <typeinfo> #include <typeinfo>
#include <utility> #include <utility>
@@ -639,6 +640,12 @@ template <typename T> struct ArenaVector {
void clear() { size_ = 0; } void clear() { size_ = 0; }
// Implicit conversion to std::span
operator std::span<T>() { return std::span<T>(data_, size_); }
operator std::span<const T>() const {
return std::span<const T>(data_, size_);
}
// Iterator support for range-based for loops // Iterator support for range-based for loops
T *begin() { return data_; } T *begin() { return data_; }
const T *begin() const { return data_; } const T *begin() const { return data_; }

View File

@@ -13,6 +13,7 @@
#include <functional> #include <functional>
#include <map> #include <map>
#include <mutex> #include <mutex>
#include <set>
#include <string> #include <string>
#include <thread> #include <thread>
#include <type_traits> #include <type_traits>
@@ -927,11 +928,83 @@ union MetricValue {
uint64_t as_uint64; uint64_t as_uint64;
}; };
// Label sets for each family type, built once and reused
struct LabelSets {
std::vector<
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>>
counter_labels;
std::vector<
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>>
gauge_labels;
std::vector<
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>>
histogram_labels;
};
// Build label sets once for reuse in both phases
static LabelSets build_label_sets(ArenaAllocator &arena) {
LabelSets label_sets;
// Build counter label sets
for (const auto &[name, family] : Metric::get_counter_families()) {
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>
all_labels{ArenaStlAllocator<LabelsKey>(&arena)};
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
all_labels.insert(labels_key);
}
}
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
all_labels.insert(labels_key);
}
}
label_sets.counter_labels.push_back(std::move(all_labels));
}
// Build gauge label sets (none needed - gauges iterate directly over
// instances)
for (const auto &[name, family] : Metric::get_gauge_families()) {
(void)name;
(void)family; // Suppress unused variable warnings
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>
empty_set{ArenaStlAllocator<LabelsKey>(&arena)};
label_sets.gauge_labels.push_back(std::move(empty_set));
}
// Build histogram label sets
for (const auto &[name, family] : Metric::get_histogram_families()) {
std::set<LabelsKey, std::less<LabelsKey>, ArenaStlAllocator<LabelsKey>>
all_labels{ArenaStlAllocator<LabelsKey>(&arena)};
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
all_labels.insert(labels_key);
}
}
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
all_labels.insert(labels_key);
}
}
label_sets.histogram_labels.push_back(std::move(all_labels));
}
return label_sets;
}
// Phase 1: Compute all metric values in deterministic order // Phase 1: Compute all metric values in deterministic order
static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) { static ArenaVector<MetricValue>
compute_metric_values(ArenaAllocator &arena, const LabelSets &label_sets) {
ArenaVector<MetricValue> values(&arena); ArenaVector<MetricValue> values(&arena);
// Compute counter values - ITERATION ORDER MUST MATCH FORMAT PHASE // Compute counter values - ITERATION ORDER MUST MATCH FORMAT PHASE
size_t counter_family_idx = 0;
for (const auto &[name, family] : Metric::get_counter_families()) { for (const auto &[name, family] : Metric::get_counter_families()) {
// Callback values // Callback values
for (const auto &[labels_key, callback] : family->callbacks) { for (const auto &[labels_key, callback] : family->callbacks) {
@@ -939,32 +1012,31 @@ static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) {
values.push_back({.as_double = value}); values.push_back({.as_double = value});
} }
// Aggregate all counter values (thread-local + global accumulated) // Use pre-built label sets
std::map<LabelsKey, double, std::less<LabelsKey>, const auto &all_labels = label_sets.counter_labels[counter_family_idx++];
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
aggregated_values{
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
// First, add thread-local values // Iterate by label, lookup per thread (O(1) unordered_map lookup)
for (const auto &labels_key : all_labels) {
double total_value = 0.0;
// Sum thread-local values for this label set
for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) { auto it = per_thread.instances.find(labels_key);
if (it != per_thread.instances.end()) {
// Atomic read to match atomic store in Counter::inc() // Atomic read to match atomic store in Counter::inc()
double value; double value;
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED); __atomic_load(&it->second->value, &value, __ATOMIC_RELAXED);
aggregated_values[labels_key] += value; total_value += value;
} }
} }
// Then, add globally accumulated values from destroyed threads // Add global accumulated value for this label set
for (const auto &[labels_key, global_state] : auto global_it = family->global_accumulated_values.find(labels_key);
family->global_accumulated_values) { if (global_it != family->global_accumulated_values.end() &&
if (global_state) { global_it->second) {
aggregated_values[labels_key] += global_state->value; total_value += global_it->second->value;
}
} }
// Store aggregated counter values
for (const auto &[labels_key, total_value] : aggregated_values) {
values.push_back({.as_double = total_value}); values.push_back({.as_double = total_value});
} }
} }
@@ -977,7 +1049,7 @@ static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) {
values.push_back({.as_double = value}); values.push_back({.as_double = value});
} }
// Instance values // Instance values (gauges don't aggregate, just direct values)
for (const auto &[labels_key, instance] : family->instances) { for (const auto &[labels_key, instance] : family->instances) {
auto value = std::bit_cast<double>( auto value = std::bit_cast<double>(
instance->value.load(std::memory_order_relaxed)); instance->value.load(std::memory_order_relaxed));
@@ -986,38 +1058,37 @@ static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) {
} }
// Compute histogram values - ITERATION ORDER MUST MATCH FORMAT PHASE // Compute histogram values - ITERATION ORDER MUST MATCH FORMAT PHASE
size_t histogram_family_idx = 0;
for (const auto &[name, family] : Metric::get_histogram_families()) { for (const auto &[name, family] : Metric::get_histogram_families()) {
// Aggregate all histogram values (thread-local + global accumulated) // Use pre-built label sets
struct AggregatedHistogram { const auto &all_labels =
ArenaVector<double> thresholds; label_sets.histogram_labels[histogram_family_idx++];
ArenaVector<uint64_t> counts;
double sum;
uint64_t observations;
AggregatedHistogram(ArenaAllocator &arena) // Iterate by label, lookup per thread (O(1) unordered_map lookup)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {} for (const auto &labels_key : all_labels) {
}; // Get bucket count from family config or first instance
std::map< size_t bucket_count = family->buckets.size();
LabelsKey, AggregatedHistogram *, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, AggregatedHistogram *>>>
aggregated_histograms{ArenaStlAllocator<
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
// First, collect thread-local histogram data ArenaVector<uint64_t> total_counts(&arena);
for (size_t i = 0; i < bucket_count; ++i) {
total_counts.push_back(0);
}
double total_sum = 0.0;
uint64_t total_observations = 0;
// Sum thread-local values for this label set
for (const auto &[thread_id, per_thread] : family->per_thread_state) { for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) { auto it = per_thread.instances.find(labels_key);
if (it != per_thread.instances.end()) {
auto *instance = it->second;
// Extract data under lock - minimize critical section // Extract data under lock - minimize critical section
ArenaVector<double> thresholds_snapshot(&arena);
ArenaVector<uint64_t> counts_snapshot(&arena); ArenaVector<uint64_t> counts_snapshot(&arena);
double sum_snapshot; double sum_snapshot;
uint64_t observations_snapshot; uint64_t observations_snapshot;
// Copy data with minimal critical section
{ {
std::lock_guard<std::mutex> lock(instance->mutex); std::lock_guard<std::mutex> lock(instance->mutex);
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
thresholds_snapshot.push_back(instance->thresholds[i]);
}
for (size_t i = 0; i < instance->counts.size(); ++i) { for (size_t i = 0; i < instance->counts.size(); ++i) {
counts_snapshot.push_back(instance->counts[i]); counts_snapshot.push_back(instance->counts[i]);
} }
@@ -1025,77 +1096,38 @@ static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) {
observations_snapshot = instance->observations; observations_snapshot = instance->observations;
} }
// Initialize or aggregate into aggregated_histograms // Add to totals
auto it = aggregated_histograms.find(labels_key);
if (it == aggregated_histograms.end()) {
// Create new entry
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
agg_hist->thresholds.push_back(thresholds_snapshot[i]);
}
for (size_t i = 0; i < counts_snapshot.size(); ++i) { for (size_t i = 0; i < counts_snapshot.size(); ++i) {
agg_hist->counts.push_back(counts_snapshot[i]); total_counts[i] += counts_snapshot[i];
}
agg_hist->sum = sum_snapshot;
agg_hist->observations = observations_snapshot;
aggregated_histograms[labels_key] = agg_hist;
} else {
// Aggregate with existing entry
auto *agg_hist = it->second;
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
agg_hist->counts[i] += counts_snapshot[i];
}
agg_hist->sum += sum_snapshot;
agg_hist->observations += observations_snapshot;
} }
total_sum += sum_snapshot;
total_observations += observations_snapshot;
} }
} }
// Then, add globally accumulated values from destroyed threads // Add global accumulated value for this label set
for (const auto &[labels_key, global_state] : auto global_it = family->global_accumulated_values.find(labels_key);
family->global_accumulated_values) { if (global_it != family->global_accumulated_values.end() &&
if (global_state) { global_it->second) {
auto it = aggregated_histograms.find(labels_key); auto *global_state = global_it->second;
if (it == aggregated_histograms.end()) {
// Create new entry from global state
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
agg_hist->thresholds.push_back(global_state->thresholds[i]);
}
for (size_t i = 0; i < global_state->counts.size(); ++i) { for (size_t i = 0; i < global_state->counts.size(); ++i) {
agg_hist->counts.push_back(global_state->counts[i]); total_counts[i] += global_state->counts[i];
}
agg_hist->sum = global_state->sum;
agg_hist->observations = global_state->observations;
aggregated_histograms[labels_key] = agg_hist;
} else {
// Add global accumulated values to existing entry
auto *agg_hist = it->second;
for (size_t i = 0; i < global_state->counts.size(); ++i) {
agg_hist->counts[i] += global_state->counts[i];
}
agg_hist->sum += global_state->sum;
agg_hist->observations += global_state->observations;
}
} }
total_sum += global_state->sum;
total_observations += global_state->observations;
} }
// Store histogram values // Store histogram values
for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
// Store explicit bucket counts // Store explicit bucket counts
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { for (size_t i = 0; i < total_counts.size(); ++i) {
values.push_back({.as_uint64 = agg_hist->counts[i]}); values.push_back({.as_uint64 = total_counts[i]});
} }
// Store +Inf bucket (total observations) // Store +Inf bucket (total observations)
values.push_back({.as_uint64 = agg_hist->observations}); values.push_back({.as_uint64 = total_observations});
// Store sum // Store sum
values.push_back({.as_double = agg_hist->sum}); values.push_back({.as_double = total_sum});
// Store count // Store count
values.push_back({.as_uint64 = agg_hist->observations}); values.push_back({.as_uint64 = total_observations});
} }
} }
@@ -1107,8 +1139,12 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
// Hold lock throughout both phases to prevent registry changes // Hold lock throughout both phases to prevent registry changes
std::unique_lock<std::mutex> _{Metric::mutex}; std::unique_lock<std::mutex> _{Metric::mutex};
// Build label sets once for both phases
LabelSets label_sets = build_label_sets(arena);
// Phase 1: Compute all metric values // Phase 1: Compute all metric values
ArenaVector<MetricValue> metric_values = compute_metric_values(arena); ArenaVector<MetricValue> metric_values =
compute_metric_values(arena, label_sets);
const MetricValue *next_value = metric_values.data(); const MetricValue *next_value = metric_values.data();
ArenaVector<std::string_view> output(&arena); ArenaVector<std::string_view> output(&arena);
@@ -1170,6 +1206,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
}; };
// Format counters - ITERATION ORDER MUST MATCH COMPUTE PHASE // Format counters - ITERATION ORDER MUST MATCH COMPUTE PHASE
size_t counter_family_idx = 0;
for (const auto &[name, family] : Metric::get_counter_families()) { for (const auto &[name, family] : Metric::get_counter_families()) {
output.push_back(format(arena, "# HELP %.*s %.*s\n", output.push_back(format(arena, "# HELP %.*s %.*s\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -1195,28 +1232,11 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
value)); value));
} }
// Recreate aggregated values map for iteration order // Use pre-built label sets (same as compute phase)
std::map<LabelsKey, double, std::less<LabelsKey>, const auto &all_labels = label_sets.counter_labels[counter_family_idx++];
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
aggregated_values{
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
// Populate map to get same iteration order (values ignored, using // Format counter values using pre-computed values
// pre-computed) for (const auto &labels_key : all_labels) {
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
aggregated_values[labels_key] = 0.0; // Placeholder
}
}
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
aggregated_values[labels_key] = 0.0; // Placeholder
}
}
// Format aggregated counter values
for (const auto &[labels_key, ignored_value] : aggregated_values) {
auto total_value = next_value++->as_double; auto total_value = next_value++->as_double;
labels_sv.clear(); labels_sv.clear();
for (size_t i = 0; i < labels_key.labels.size(); ++i) { for (size_t i = 0; i < labels_key.labels.size(); ++i) {
@@ -1272,6 +1292,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
} }
// Format histograms - ITERATION ORDER MUST MATCH COMPUTE PHASE // Format histograms - ITERATION ORDER MUST MATCH COMPUTE PHASE
size_t histogram_family_idx = 0;
for (const auto &[name, family] : Metric::get_histogram_families()) { for (const auto &[name, family] : Metric::get_histogram_families()) {
output.push_back(format(arena, "# HELP %.*s %.*s\n", output.push_back(format(arena, "# HELP %.*s %.*s\n",
static_cast<int>(name.length()), name.data(), static_cast<int>(name.length()), name.data(),
@@ -1280,71 +1301,27 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
output.push_back(format(arena, "# TYPE %.*s histogram\n", output.push_back(format(arena, "# TYPE %.*s histogram\n",
static_cast<int>(name.length()), name.data())); static_cast<int>(name.length()), name.data()));
// Recreate aggregated histograms map for iteration order // Use pre-built label sets (same as compute phase)
struct AggregatedHistogram { const auto &all_labels =
ArenaVector<double> thresholds; label_sets.histogram_labels[histogram_family_idx++];
ArenaVector<uint64_t> counts;
double sum;
uint64_t observations;
AggregatedHistogram(ArenaAllocator &arena)
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
};
std::map<
LabelsKey, AggregatedHistogram *, std::less<LabelsKey>,
ArenaStlAllocator<std::pair<const LabelsKey, AggregatedHistogram *>>>
aggregated_histograms{ArenaStlAllocator<
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
// Recreate map structure for iteration order (recompute thresholds for
// formatting)
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
for (const auto &[labels_key, instance] : per_thread.instances) {
auto it = aggregated_histograms.find(labels_key);
if (it == aggregated_histograms.end()) {
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
// Copy thresholds for le= formatting
std::lock_guard<std::mutex> lock(instance->mutex);
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
agg_hist->thresholds.push_back(instance->thresholds[i]);
}
aggregated_histograms[labels_key] = agg_hist;
}
}
}
for (const auto &[labels_key, global_state] :
family->global_accumulated_values) {
if (global_state) {
auto it = aggregated_histograms.find(labels_key);
if (it == aggregated_histograms.end()) {
auto *agg_hist = new (arena.allocate_raw(
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
AggregatedHistogram(arena);
// Copy thresholds for le= formatting
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
agg_hist->thresholds.push_back(global_state->thresholds[i]);
}
aggregated_histograms[labels_key] = agg_hist;
}
}
}
ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv( ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv(
&arena); &arena);
// Format histogram data using pre-computed values // Format histogram data using pre-computed values
for (const auto &[labels_key, agg_hist] : aggregated_histograms) { for (const auto &labels_key : all_labels) {
// Get bucket thresholds from family config
size_t bucket_count = family->buckets.size();
// Format explicit bucket counts // Format explicit bucket counts
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) { for (size_t i = 0; i < bucket_count; ++i) {
auto count = next_value++->as_uint64; auto count = next_value++->as_uint64;
bucket_labels_sv.clear(); bucket_labels_sv.clear();
for (size_t j = 0; j < labels_key.labels.size(); ++j) { for (size_t j = 0; j < labels_key.labels.size(); ++j) {
bucket_labels_sv.push_back(labels_key.labels[j]); bucket_labels_sv.push_back(labels_key.labels[j]);
} }
bucket_labels_sv.push_back( bucket_labels_sv.push_back(
{"le", static_format(arena, agg_hist->thresholds[i])}); {"le", static_format(arena, family->buckets[i])});
auto labels = format_labels(bucket_labels_sv); auto labels = format_labels(bucket_labels_sv);
output.push_back(format( output.push_back(format(
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()), arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
@@ -1385,9 +1362,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
} }
} }
auto result = arena.allocate<std::string_view>(output.size()); return output;
std::copy(output.data(), output.data() + output.size(), result);
return std::span<std::string_view>(result, output.size());
} }
// Template specialization implementations for register_callback // Template specialization implementations for register_callback