Separate compute and format phases for render
This commit is contained in:
330
src/metric.cpp
330
src/metric.cpp
@@ -922,9 +922,195 @@ bool is_valid_label_value(std::string_view value) {
|
|||||||
return simdutf::validate_utf8(value.data(), value.size());
|
return simdutf::validate_utf8(value.data(), value.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
union MetricValue {
|
||||||
|
double as_double;
|
||||||
|
uint64_t as_uint64;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Phase 1: Compute all metric values in deterministic order
|
||||||
|
static ArenaVector<MetricValue> compute_metric_values(ArenaAllocator &arena) {
|
||||||
|
ArenaVector<MetricValue> values(&arena);
|
||||||
|
|
||||||
|
// Compute counter values - ITERATION ORDER MUST MATCH FORMAT PHASE
|
||||||
|
for (const auto &[name, family] : Metric::get_counter_families()) {
|
||||||
|
// Callback values
|
||||||
|
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||||
|
auto value = callback();
|
||||||
|
values.push_back({.as_double = value});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Aggregate all counter values (thread-local + global accumulated)
|
||||||
|
std::map<LabelsKey, double, std::less<LabelsKey>,
|
||||||
|
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
|
||||||
|
aggregated_values{
|
||||||
|
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
|
||||||
|
|
||||||
|
// First, add thread-local values
|
||||||
|
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
||||||
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
|
// Atomic read to match atomic store in Counter::inc()
|
||||||
|
double value;
|
||||||
|
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED);
|
||||||
|
aggregated_values[labels_key] += value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then, add globally accumulated values from destroyed threads
|
||||||
|
for (const auto &[labels_key, global_state] :
|
||||||
|
family->global_accumulated_values) {
|
||||||
|
if (global_state) {
|
||||||
|
aggregated_values[labels_key] += global_state->value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store aggregated counter values
|
||||||
|
for (const auto &[labels_key, total_value] : aggregated_values) {
|
||||||
|
values.push_back({.as_double = total_value});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute gauge values - ITERATION ORDER MUST MATCH FORMAT PHASE
|
||||||
|
for (const auto &[name, family] : Metric::get_gauge_families()) {
|
||||||
|
// Callback values
|
||||||
|
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||||
|
auto value = callback();
|
||||||
|
values.push_back({.as_double = value});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Instance values
|
||||||
|
for (const auto &[labels_key, instance] : family->instances) {
|
||||||
|
auto value = std::bit_cast<double>(
|
||||||
|
instance->value.load(std::memory_order_relaxed));
|
||||||
|
values.push_back({.as_double = value});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Compute histogram values - ITERATION ORDER MUST MATCH FORMAT PHASE
|
||||||
|
for (const auto &[name, family] : Metric::get_histogram_families()) {
|
||||||
|
// Aggregate all histogram values (thread-local + global accumulated)
|
||||||
|
struct AggregatedHistogram {
|
||||||
|
ArenaVector<double> thresholds;
|
||||||
|
ArenaVector<uint64_t> counts;
|
||||||
|
double sum;
|
||||||
|
uint64_t observations;
|
||||||
|
|
||||||
|
AggregatedHistogram(ArenaAllocator &arena)
|
||||||
|
: thresholds(&arena), counts(&arena), sum(0.0), observations(0) {}
|
||||||
|
};
|
||||||
|
std::map<
|
||||||
|
LabelsKey, AggregatedHistogram *, std::less<LabelsKey>,
|
||||||
|
ArenaStlAllocator<std::pair<const LabelsKey, AggregatedHistogram *>>>
|
||||||
|
aggregated_histograms{ArenaStlAllocator<
|
||||||
|
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
|
||||||
|
|
||||||
|
// First, collect thread-local histogram data
|
||||||
|
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
||||||
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
|
// Extract data under lock - minimize critical section
|
||||||
|
ArenaVector<double> thresholds_snapshot(&arena);
|
||||||
|
ArenaVector<uint64_t> counts_snapshot(&arena);
|
||||||
|
double sum_snapshot;
|
||||||
|
uint64_t observations_snapshot;
|
||||||
|
|
||||||
|
// Copy data with minimal critical section
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(instance->mutex);
|
||||||
|
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
|
||||||
|
thresholds_snapshot.push_back(instance->thresholds[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < instance->counts.size(); ++i) {
|
||||||
|
counts_snapshot.push_back(instance->counts[i]);
|
||||||
|
}
|
||||||
|
sum_snapshot = instance->sum;
|
||||||
|
observations_snapshot = instance->observations;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize or aggregate into aggregated_histograms
|
||||||
|
auto it = aggregated_histograms.find(labels_key);
|
||||||
|
if (it == aggregated_histograms.end()) {
|
||||||
|
// Create new entry
|
||||||
|
auto *agg_hist = new (arena.allocate_raw(
|
||||||
|
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
||||||
|
AggregatedHistogram(arena);
|
||||||
|
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
|
||||||
|
agg_hist->thresholds.push_back(thresholds_snapshot[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
|
||||||
|
agg_hist->counts.push_back(counts_snapshot[i]);
|
||||||
|
}
|
||||||
|
agg_hist->sum = sum_snapshot;
|
||||||
|
agg_hist->observations = observations_snapshot;
|
||||||
|
aggregated_histograms[labels_key] = agg_hist;
|
||||||
|
} else {
|
||||||
|
// Aggregate with existing entry
|
||||||
|
auto *agg_hist = it->second;
|
||||||
|
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
|
||||||
|
agg_hist->counts[i] += counts_snapshot[i];
|
||||||
|
}
|
||||||
|
agg_hist->sum += sum_snapshot;
|
||||||
|
agg_hist->observations += observations_snapshot;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then, add globally accumulated values from destroyed threads
|
||||||
|
for (const auto &[labels_key, global_state] :
|
||||||
|
family->global_accumulated_values) {
|
||||||
|
if (global_state) {
|
||||||
|
auto it = aggregated_histograms.find(labels_key);
|
||||||
|
if (it == aggregated_histograms.end()) {
|
||||||
|
// Create new entry from global state
|
||||||
|
auto *agg_hist = new (arena.allocate_raw(
|
||||||
|
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
||||||
|
AggregatedHistogram(arena);
|
||||||
|
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
|
||||||
|
agg_hist->thresholds.push_back(global_state->thresholds[i]);
|
||||||
|
}
|
||||||
|
for (size_t i = 0; i < global_state->counts.size(); ++i) {
|
||||||
|
agg_hist->counts.push_back(global_state->counts[i]);
|
||||||
|
}
|
||||||
|
agg_hist->sum = global_state->sum;
|
||||||
|
agg_hist->observations = global_state->observations;
|
||||||
|
aggregated_histograms[labels_key] = agg_hist;
|
||||||
|
} else {
|
||||||
|
// Add global accumulated values to existing entry
|
||||||
|
auto *agg_hist = it->second;
|
||||||
|
for (size_t i = 0; i < global_state->counts.size(); ++i) {
|
||||||
|
agg_hist->counts[i] += global_state->counts[i];
|
||||||
|
}
|
||||||
|
agg_hist->sum += global_state->sum;
|
||||||
|
agg_hist->observations += global_state->observations;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Store histogram values
|
||||||
|
for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
|
||||||
|
// Store explicit bucket counts
|
||||||
|
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) {
|
||||||
|
values.push_back({.as_uint64 = agg_hist->counts[i]});
|
||||||
|
}
|
||||||
|
// Store +Inf bucket (total observations)
|
||||||
|
values.push_back({.as_uint64 = agg_hist->observations});
|
||||||
|
// Store sum
|
||||||
|
values.push_back({.as_double = agg_hist->sum});
|
||||||
|
// Store count
|
||||||
|
values.push_back({.as_uint64 = agg_hist->observations});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return values;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 2: Format metrics using pre-computed values
|
||||||
std::span<std::string_view> render(ArenaAllocator &arena) {
|
std::span<std::string_view> render(ArenaAllocator &arena) {
|
||||||
|
// Hold lock throughout both phases to prevent registry changes
|
||||||
std::unique_lock<std::mutex> _{Metric::mutex};
|
std::unique_lock<std::mutex> _{Metric::mutex};
|
||||||
|
|
||||||
|
// Phase 1: Compute all metric values
|
||||||
|
ArenaVector<MetricValue> metric_values = compute_metric_values(arena);
|
||||||
|
const MetricValue *next_value = metric_values.data();
|
||||||
|
|
||||||
ArenaVector<std::string_view> output(&arena);
|
ArenaVector<std::string_view> output(&arena);
|
||||||
|
|
||||||
auto format_labels =
|
auto format_labels =
|
||||||
@@ -983,7 +1169,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
return std::string_view(buf, p - buf);
|
return std::string_view(buf, p - buf);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Render counters
|
// Format counters - ITERATION ORDER MUST MATCH COMPUTE PHASE
|
||||||
for (const auto &[name, family] : Metric::get_counter_families()) {
|
for (const auto &[name, family] : Metric::get_counter_families()) {
|
||||||
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
||||||
static_cast<int>(name.length()), name.data(),
|
static_cast<int>(name.length()), name.data(),
|
||||||
@@ -994,8 +1180,10 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
|
|
||||||
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
|
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
|
||||||
&arena);
|
&arena);
|
||||||
|
|
||||||
|
// Format callback values
|
||||||
for (const auto &[labels_key, callback] : family->callbacks) {
|
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||||
auto value = callback();
|
auto value = next_value++->as_double;
|
||||||
labels_sv.clear();
|
labels_sv.clear();
|
||||||
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
||||||
labels_sv.push_back(labels_key.labels[i]);
|
labels_sv.push_back(labels_key.labels[i]);
|
||||||
@@ -1007,32 +1195,29 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
value));
|
value));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Aggregate all counter values (thread-local + global accumulated)
|
// Recreate aggregated values map for iteration order
|
||||||
std::map<LabelsKey, double, std::less<LabelsKey>,
|
std::map<LabelsKey, double, std::less<LabelsKey>,
|
||||||
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
|
ArenaStlAllocator<std::pair<const LabelsKey, double>>>
|
||||||
aggregated_values{
|
aggregated_values{
|
||||||
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
|
ArenaStlAllocator<std::pair<const LabelsKey, double>>(&arena)};
|
||||||
|
|
||||||
// First, add thread-local values
|
// Populate map to get same iteration order (values ignored, using
|
||||||
|
// pre-computed)
|
||||||
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
||||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
// Atomic read to match atomic store in Counter::inc()
|
aggregated_values[labels_key] = 0.0; // Placeholder
|
||||||
double value;
|
|
||||||
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED);
|
|
||||||
aggregated_values[labels_key] += value;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then, add globally accumulated values from destroyed threads
|
|
||||||
for (const auto &[labels_key, global_state] :
|
for (const auto &[labels_key, global_state] :
|
||||||
family->global_accumulated_values) {
|
family->global_accumulated_values) {
|
||||||
if (global_state) {
|
if (global_state) {
|
||||||
aggregated_values[labels_key] += global_state->value;
|
aggregated_values[labels_key] = 0.0; // Placeholder
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render aggregated counter values
|
// Format aggregated counter values
|
||||||
for (const auto &[labels_key, total_value] : aggregated_values) {
|
for (const auto &[labels_key, ignored_value] : aggregated_values) {
|
||||||
|
auto total_value = next_value++->as_double;
|
||||||
labels_sv.clear();
|
labels_sv.clear();
|
||||||
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
||||||
labels_sv.push_back(labels_key.labels[i]);
|
labels_sv.push_back(labels_key.labels[i]);
|
||||||
@@ -1045,7 +1230,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render gauges
|
// Format gauges - ITERATION ORDER MUST MATCH COMPUTE PHASE
|
||||||
for (const auto &[name, family] : Metric::get_gauge_families()) {
|
for (const auto &[name, family] : Metric::get_gauge_families()) {
|
||||||
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
||||||
static_cast<int>(name.length()), name.data(),
|
static_cast<int>(name.length()), name.data(),
|
||||||
@@ -1056,8 +1241,10 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
|
|
||||||
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
|
ArenaVector<std::pair<std::string_view, std::string_view>> labels_sv(
|
||||||
&arena);
|
&arena);
|
||||||
|
|
||||||
|
// Format callback values
|
||||||
for (const auto &[labels_key, callback] : family->callbacks) {
|
for (const auto &[labels_key, callback] : family->callbacks) {
|
||||||
auto value = callback();
|
auto value = next_value++->as_double;
|
||||||
labels_sv.clear();
|
labels_sv.clear();
|
||||||
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
||||||
labels_sv.push_back(labels_key.labels[i]);
|
labels_sv.push_back(labels_key.labels[i]);
|
||||||
@@ -1069,9 +1256,9 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
value));
|
value));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Format instance values
|
||||||
for (const auto &[labels_key, instance] : family->instances) {
|
for (const auto &[labels_key, instance] : family->instances) {
|
||||||
auto value = std::bit_cast<double>(
|
auto value = next_value++->as_double;
|
||||||
instance->value.load(std::memory_order_relaxed));
|
|
||||||
labels_sv.clear();
|
labels_sv.clear();
|
||||||
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
for (size_t i = 0; i < labels_key.labels.size(); ++i) {
|
||||||
labels_sv.push_back(labels_key.labels[i]);
|
labels_sv.push_back(labels_key.labels[i]);
|
||||||
@@ -1084,7 +1271,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render histograms
|
// Format histograms - ITERATION ORDER MUST MATCH COMPUTE PHASE
|
||||||
for (const auto &[name, family] : Metric::get_histogram_families()) {
|
for (const auto &[name, family] : Metric::get_histogram_families()) {
|
||||||
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
output.push_back(format(arena, "# HELP %.*s %.*s\n",
|
||||||
static_cast<int>(name.length()), name.data(),
|
static_cast<int>(name.length()), name.data(),
|
||||||
@@ -1093,8 +1280,7 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
output.push_back(format(arena, "# TYPE %.*s histogram\n",
|
output.push_back(format(arena, "# TYPE %.*s histogram\n",
|
||||||
static_cast<int>(name.length()), name.data()));
|
static_cast<int>(name.length()), name.data()));
|
||||||
|
|
||||||
// Aggregate all histogram values (thread-local + global accumulated)
|
// Recreate aggregated histograms map for iteration order
|
||||||
// Use a simpler structure to avoid tuple constructor issues
|
|
||||||
struct AggregatedHistogram {
|
struct AggregatedHistogram {
|
||||||
ArenaVector<double> thresholds;
|
ArenaVector<double> thresholds;
|
||||||
ArenaVector<uint64_t> counts;
|
ArenaVector<uint64_t> counts;
|
||||||
@@ -1110,115 +1296,64 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
aggregated_histograms{ArenaStlAllocator<
|
aggregated_histograms{ArenaStlAllocator<
|
||||||
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
|
std::pair<const LabelsKey, AggregatedHistogram *>>(&arena)};
|
||||||
|
|
||||||
ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv(
|
// Recreate map structure for iteration order (recompute thresholds for
|
||||||
&arena);
|
// formatting)
|
||||||
|
|
||||||
// First, collect thread-local histogram data
|
|
||||||
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
for (const auto &[thread_id, per_thread] : family->per_thread_state) {
|
||||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
// Extract data under lock - minimize critical section
|
|
||||||
// Note: thresholds and counts sizes never change after histogram
|
|
||||||
// creation
|
|
||||||
ArenaVector<double> thresholds_snapshot(&arena);
|
|
||||||
ArenaVector<uint64_t> counts_snapshot(&arena);
|
|
||||||
double sum_snapshot;
|
|
||||||
uint64_t observations_snapshot;
|
|
||||||
|
|
||||||
// Copy data with minimal critical section
|
|
||||||
{
|
|
||||||
std::lock_guard<std::mutex> lock(instance->mutex);
|
|
||||||
// Copy thresholds
|
|
||||||
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
|
|
||||||
thresholds_snapshot.push_back(instance->thresholds[i]);
|
|
||||||
}
|
|
||||||
// Copy counts
|
|
||||||
for (size_t i = 0; i < instance->counts.size(); ++i) {
|
|
||||||
counts_snapshot.push_back(instance->counts[i]);
|
|
||||||
}
|
|
||||||
sum_snapshot = instance->sum;
|
|
||||||
observations_snapshot = instance->observations;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize or aggregate into aggregated_histograms
|
|
||||||
auto it = aggregated_histograms.find(labels_key);
|
auto it = aggregated_histograms.find(labels_key);
|
||||||
if (it == aggregated_histograms.end()) {
|
if (it == aggregated_histograms.end()) {
|
||||||
// Create new entry
|
|
||||||
auto *agg_hist = new (arena.allocate_raw(
|
auto *agg_hist = new (arena.allocate_raw(
|
||||||
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
||||||
AggregatedHistogram(arena);
|
AggregatedHistogram(arena);
|
||||||
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
|
// Copy thresholds for le= formatting
|
||||||
agg_hist->thresholds.push_back(thresholds_snapshot[i]);
|
std::lock_guard<std::mutex> lock(instance->mutex);
|
||||||
|
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
|
||||||
|
agg_hist->thresholds.push_back(instance->thresholds[i]);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
|
|
||||||
agg_hist->counts.push_back(counts_snapshot[i]);
|
|
||||||
}
|
|
||||||
agg_hist->sum = sum_snapshot;
|
|
||||||
agg_hist->observations = observations_snapshot;
|
|
||||||
aggregated_histograms[labels_key] = agg_hist;
|
aggregated_histograms[labels_key] = agg_hist;
|
||||||
} else {
|
|
||||||
// Aggregate with existing entry
|
|
||||||
auto *agg_hist = it->second;
|
|
||||||
// Aggregate counts
|
|
||||||
for (size_t i = 0; i < counts_snapshot.size(); ++i) {
|
|
||||||
agg_hist->counts[i] += counts_snapshot[i];
|
|
||||||
}
|
|
||||||
agg_hist->sum += sum_snapshot;
|
|
||||||
agg_hist->observations += observations_snapshot;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Then, add globally accumulated values from destroyed threads
|
|
||||||
for (const auto &[labels_key, global_state] :
|
for (const auto &[labels_key, global_state] :
|
||||||
family->global_accumulated_values) {
|
family->global_accumulated_values) {
|
||||||
if (global_state) {
|
if (global_state) {
|
||||||
auto it = aggregated_histograms.find(labels_key);
|
auto it = aggregated_histograms.find(labels_key);
|
||||||
if (it == aggregated_histograms.end()) {
|
if (it == aggregated_histograms.end()) {
|
||||||
// Create new entry from global state
|
|
||||||
auto *agg_hist = new (arena.allocate_raw(
|
auto *agg_hist = new (arena.allocate_raw(
|
||||||
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
sizeof(AggregatedHistogram), alignof(AggregatedHistogram)))
|
||||||
AggregatedHistogram(arena);
|
AggregatedHistogram(arena);
|
||||||
|
// Copy thresholds for le= formatting
|
||||||
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
|
for (size_t i = 0; i < global_state->thresholds.size(); ++i) {
|
||||||
agg_hist->thresholds.push_back(global_state->thresholds[i]);
|
agg_hist->thresholds.push_back(global_state->thresholds[i]);
|
||||||
}
|
}
|
||||||
for (size_t i = 0; i < global_state->counts.size(); ++i) {
|
|
||||||
agg_hist->counts.push_back(global_state->counts[i]);
|
|
||||||
}
|
|
||||||
agg_hist->sum = global_state->sum;
|
|
||||||
agg_hist->observations = global_state->observations;
|
|
||||||
aggregated_histograms[labels_key] = agg_hist;
|
aggregated_histograms[labels_key] = agg_hist;
|
||||||
} else {
|
|
||||||
// Add global accumulated values to existing entry
|
|
||||||
auto *agg_hist = it->second;
|
|
||||||
for (size_t i = 0; i < global_state->counts.size(); ++i) {
|
|
||||||
agg_hist->counts[i] += global_state->counts[i];
|
|
||||||
}
|
|
||||||
agg_hist->sum += global_state->sum;
|
|
||||||
agg_hist->observations += global_state->observations;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render aggregated histogram data
|
ArenaVector<std::pair<std::string_view, std::string_view>> bucket_labels_sv(
|
||||||
for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
|
&arena);
|
||||||
|
|
||||||
// Render explicit bucket counts
|
// Format histogram data using pre-computed values
|
||||||
|
for (const auto &[labels_key, agg_hist] : aggregated_histograms) {
|
||||||
|
// Format explicit bucket counts
|
||||||
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) {
|
for (size_t i = 0; i < agg_hist->thresholds.size(); ++i) {
|
||||||
|
auto count = next_value++->as_uint64;
|
||||||
bucket_labels_sv.clear();
|
bucket_labels_sv.clear();
|
||||||
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
||||||
bucket_labels_sv.push_back(labels_key.labels[j]);
|
bucket_labels_sv.push_back(labels_key.labels[j]);
|
||||||
}
|
}
|
||||||
|
|
||||||
bucket_labels_sv.push_back(
|
bucket_labels_sv.push_back(
|
||||||
{"le", static_format(arena, agg_hist->thresholds[i])});
|
{"le", static_format(arena, agg_hist->thresholds[i])});
|
||||||
auto labels = format_labels(bucket_labels_sv);
|
auto labels = format_labels(bucket_labels_sv);
|
||||||
output.push_back(format(
|
output.push_back(format(
|
||||||
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
|
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
|
||||||
name.data(), static_cast<int>(labels.length()), labels.data(),
|
name.data(), static_cast<int>(labels.length()), labels.data(),
|
||||||
static_cast<unsigned long long>(agg_hist->counts[i])));
|
static_cast<unsigned long long>(count)));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Render +Inf bucket using total observations count
|
// Format +Inf bucket
|
||||||
|
auto observations = next_value++->as_uint64;
|
||||||
bucket_labels_sv.clear();
|
bucket_labels_sv.clear();
|
||||||
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
||||||
bucket_labels_sv.push_back(labels_key.labels[j]);
|
bucket_labels_sv.push_back(labels_key.labels[j]);
|
||||||
@@ -1228,24 +1363,25 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
output.push_back(format(
|
output.push_back(format(
|
||||||
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
|
arena, "%.*s_bucket%.*s %llu\n", static_cast<int>(name.length()),
|
||||||
name.data(), static_cast<int>(inf_labels.length()), inf_labels.data(),
|
name.data(), static_cast<int>(inf_labels.length()), inf_labels.data(),
|
||||||
static_cast<unsigned long long>(agg_hist->observations)));
|
static_cast<unsigned long long>(observations)));
|
||||||
|
|
||||||
// Render sum
|
// Format sum
|
||||||
|
auto sum = next_value++->as_double;
|
||||||
bucket_labels_sv.clear();
|
bucket_labels_sv.clear();
|
||||||
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
for (size_t j = 0; j < labels_key.labels.size(); ++j) {
|
||||||
bucket_labels_sv.push_back(labels_key.labels[j]);
|
bucket_labels_sv.push_back(labels_key.labels[j]);
|
||||||
}
|
}
|
||||||
auto labels = format_labels(bucket_labels_sv);
|
auto labels = format_labels(bucket_labels_sv);
|
||||||
output.push_back(format(arena, "%.*s_sum%.*s %.17g\n",
|
output.push_back(format(
|
||||||
|
arena, "%.*s_sum%.*s %.17g\n", static_cast<int>(name.length()),
|
||||||
|
name.data(), static_cast<int>(labels.length()), labels.data(), sum));
|
||||||
|
|
||||||
|
// Format count
|
||||||
|
auto count = next_value++->as_uint64;
|
||||||
|
output.push_back(format(arena, "%.*s_count%.*s %llu\n",
|
||||||
static_cast<int>(name.length()), name.data(),
|
static_cast<int>(name.length()), name.data(),
|
||||||
static_cast<int>(labels.length()), labels.data(),
|
static_cast<int>(labels.length()), labels.data(),
|
||||||
agg_hist->sum));
|
static_cast<unsigned long long>(count)));
|
||||||
|
|
||||||
// Render count
|
|
||||||
output.push_back(format(
|
|
||||||
arena, "%.*s_count%.*s %llu\n", static_cast<int>(name.length()),
|
|
||||||
name.data(), static_cast<int>(labels.length()), labels.data(),
|
|
||||||
static_cast<unsigned long long>(agg_hist->observations)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user