From 4d015fa3dc606688df7dc1c103d0595383d0acbc Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Mon, 15 Sep 2025 23:34:30 -0400 Subject: [PATCH] Make recording metrics never block --- src/metric.cpp | 99 +++++++++++++++++++++++++++++++------------------- src/metric.hpp | 11 +++--- 2 files changed, 67 insertions(+), 43 deletions(-) diff --git a/src/metric.cpp b/src/metric.cpp index b420c80..58a1135 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -350,7 +350,8 @@ struct Histogram::State { uint64_t observations = 0; // Total observation count }; - Counters counters; // Main counter data + Counters shared; // Protected by mutex, read by scrapes + Counters pending; // Lock-free accumulation when mutex busy std::mutex mutex; // Per-thread, per-histogram mutex for consistent reads/writes @@ -506,16 +507,14 @@ struct Metric { assert(global_state); // Accumulate bucket counts (mutex already held) - for (size_t i = 0; i < instance->counters.bucket_counts.size(); - ++i) { - global_state->counters.bucket_counts[i] += - instance->counters.bucket_counts[i]; + for (size_t i = 0; i < instance->shared.bucket_counts.size(); ++i) { + global_state->shared.bucket_counts[i] += + instance->shared.bucket_counts[i]; } // Accumulate sum and observations - global_state->counters.sum += instance->counters.sum; - global_state->counters.observations += - instance->counters.observations; + global_state->shared.sum += instance->shared.sum; + global_state->shared.observations += instance->shared.observations; } family->per_thread_state.erase(thread_it); } @@ -683,17 +682,23 @@ struct Metric { size_t bucket_count = family->p->buckets.size(); double *thresholds_data = get_thread_local_arena().allocate(bucket_count); - uint64_t *counts_data = - get_thread_local_arena().allocate(bucket_count); - // Copy thresholds and initialize counts + // Initialize thresholds std::memcpy(thresholds_data, family->p->buckets.data(), bucket_count * sizeof(double)); - std::memset(counts_data, 0, bucket_count * sizeof(uint64_t)); - ptr->thresholds = std::span(thresholds_data, bucket_count); - ptr->counters.bucket_counts = - std::span(counts_data, bucket_count); + + // Initialize shared counts + auto shared_counts_span = + get_thread_local_arena().allocate_span(bucket_count); + std::fill(shared_counts_span.begin(), shared_counts_span.end(), 0); + ptr->shared.bucket_counts = shared_counts_span; + + // Initialize pending counts + auto pending_counts_span = + get_thread_local_arena().allocate_span(bucket_count); + std::fill(pending_counts_span.begin(), pending_counts_span.end(), 0); + ptr->pending.bucket_counts = pending_counts_span; // Ensure global accumulator exists for this label set auto &global_state = family->p->global_accumulated_values[key]; @@ -703,17 +708,16 @@ struct Metric { // Allocate and copy thresholds, initialize counts double *global_thresholds_data = get_global_arena().allocate(bucket_count); - uint64_t *global_counts_data = - get_global_arena().allocate(bucket_count); - std::memcpy(global_thresholds_data, ptr->thresholds.data(), bucket_count * sizeof(double)); - std::memset(global_counts_data, 0, bucket_count * sizeof(uint64_t)); - global_state->thresholds = std::span(global_thresholds_data, bucket_count); - global_state->counters.bucket_counts = - std::span(global_counts_data, bucket_count); + + auto global_shared_counts_span = + get_global_arena().allocate_span(bucket_count); + std::fill(global_shared_counts_span.begin(), + global_shared_counts_span.end(), 0); + global_state->shared.bucket_counts = global_shared_counts_span; } } Histogram result; @@ -1139,12 +1143,11 @@ struct Metric { { std::lock_guard lock(instance->mutex); - for (size_t i = 0; i < instance->counters.bucket_counts.size(); - ++i) { - counts_snapshot[i] = instance->counters.bucket_counts[i]; + for (size_t i = 0; i < instance->shared.bucket_counts.size(); ++i) { + counts_snapshot[i] = instance->shared.bucket_counts[i]; } - sum_snapshot = instance->counters.sum; - observations_snapshot = instance->counters.observations; + sum_snapshot = instance->shared.sum; + observations_snapshot = instance->shared.observations; } for (size_t i = 0; i < bucket_count; ++i) { @@ -1157,12 +1160,12 @@ struct Metric { // Add global accumulated values if (instruction.aggregate_histogram.global_state) { auto *global_state = instruction.aggregate_histogram.global_state; - for (size_t i = 0; i < global_state->counters.bucket_counts.size(); + for (size_t i = 0; i < global_state->shared.bucket_counts.size(); ++i) { - total_counts[i] += global_state->counters.bucket_counts[i]; + total_counts[i] += global_state->shared.bucket_counts[i]; } - total_sum += global_state->counters.sum; - total_observations += global_state->counters.observations; + total_sum += global_state->shared.sum; + total_observations += global_state->shared.observations; } // Format explicit bucket counts @@ -1424,16 +1427,36 @@ update_histogram_buckets_simd(std::span thresholds, } void Histogram::observe(double x) { - assert(p->thresholds.size() == p->counters.bucket_counts.size()); + assert(p->thresholds.size() == p->shared.bucket_counts.size()); - std::lock_guard lock(p->mutex); + // Try to get lock immediately + if (p->mutex.try_lock()) { + // Fast path: got lock, flush any pending first + if (p->pending.observations > 0) { + // Add pending to shared + for (size_t i = 0; i < p->pending.bucket_counts.size(); ++i) { + p->shared.bucket_counts[i] += p->pending.bucket_counts[i]; + p->pending.bucket_counts[i] = 0; + } + p->shared.sum += p->pending.sum; + p->shared.observations += p->pending.observations; + p->pending.sum = 0.0; + p->pending.observations = 0; + } - // Update bucket counts using SIMD - update_histogram_buckets_simd(p->thresholds, p->counters.bucket_counts, x, 0); + // Update shared directly + update_histogram_buckets_simd(p->thresholds, p->shared.bucket_counts, x, 0); + p->shared.sum += x; + p->shared.observations++; - // Update sum and observation count - p->counters.sum += x; - p->counters.observations++; + p->mutex.unlock(); + } else { + // Slow path: accumulate in pending (lock-free) + update_histogram_buckets_simd(p->thresholds, p->pending.bucket_counts, x, + 0); + p->pending.sum += x; + p->pending.observations++; + } } template <> Family::Family() = default; diff --git a/src/metric.hpp b/src/metric.hpp index 3b0c37e..cf21209 100644 --- a/src/metric.hpp +++ b/src/metric.hpp @@ -73,7 +73,7 @@ template using MetricCallback = std::function; // 3. When rendered, the values of all Counter objects with the same labels // are summed together into a single total. struct Counter { - void inc(double = 1.0); // Increment counter (must be >= 0) + void inc(double = 1.0); // Increment counter (must be >= 0, never blocks) private: Counter(); @@ -94,9 +94,9 @@ private: // are cumulative. // 4. For independent gauges, create them with unique labels. struct Gauge { - void inc(double = 1.0); - void dec(double = 1.0); - void set(double); + void inc(double = 1.0); // (never blocks) + void dec(double = 1.0); // (never blocks) + void set(double); // (never blocks) private: Gauge(); @@ -116,7 +116,8 @@ private: // 3. When rendered, the observations from all Histogram objects with the // same labels are combined into a single histogram. struct Histogram { - void observe(double); // Record observation in appropriate bucket + void + observe(double); // Record observation in appropriate bucket (never blocks) private: Histogram();