Make recording metrics never block

This commit is contained in:
2025-09-15 23:34:30 -04:00
parent 0659319906
commit 4d015fa3dc
2 changed files with 67 additions and 43 deletions

View File

@@ -350,7 +350,8 @@ struct Histogram::State {
uint64_t observations = 0; // Total observation count uint64_t observations = 0; // Total observation count
}; };
Counters counters; // Main counter data Counters shared; // Protected by mutex, read by scrapes
Counters pending; // Lock-free accumulation when mutex busy
std::mutex std::mutex
mutex; // Per-thread, per-histogram mutex for consistent reads/writes mutex; // Per-thread, per-histogram mutex for consistent reads/writes
@@ -506,16 +507,14 @@ struct Metric {
assert(global_state); assert(global_state);
// Accumulate bucket counts (mutex already held) // Accumulate bucket counts (mutex already held)
for (size_t i = 0; i < instance->counters.bucket_counts.size(); for (size_t i = 0; i < instance->shared.bucket_counts.size(); ++i) {
++i) { global_state->shared.bucket_counts[i] +=
global_state->counters.bucket_counts[i] += instance->shared.bucket_counts[i];
instance->counters.bucket_counts[i];
} }
// Accumulate sum and observations // Accumulate sum and observations
global_state->counters.sum += instance->counters.sum; global_state->shared.sum += instance->shared.sum;
global_state->counters.observations += global_state->shared.observations += instance->shared.observations;
instance->counters.observations;
} }
family->per_thread_state.erase(thread_it); family->per_thread_state.erase(thread_it);
} }
@@ -683,17 +682,23 @@ struct Metric {
size_t bucket_count = family->p->buckets.size(); size_t bucket_count = family->p->buckets.size();
double *thresholds_data = double *thresholds_data =
get_thread_local_arena().allocate<double>(bucket_count); get_thread_local_arena().allocate<double>(bucket_count);
uint64_t *counts_data =
get_thread_local_arena().allocate<uint64_t>(bucket_count);
// Copy thresholds and initialize counts // Initialize thresholds
std::memcpy(thresholds_data, family->p->buckets.data(), std::memcpy(thresholds_data, family->p->buckets.data(),
bucket_count * sizeof(double)); bucket_count * sizeof(double));
std::memset(counts_data, 0, bucket_count * sizeof(uint64_t));
ptr->thresholds = std::span<const double>(thresholds_data, bucket_count); ptr->thresholds = std::span<const double>(thresholds_data, bucket_count);
ptr->counters.bucket_counts =
std::span<uint64_t>(counts_data, bucket_count); // Initialize shared counts
auto shared_counts_span =
get_thread_local_arena().allocate_span<uint64_t>(bucket_count);
std::fill(shared_counts_span.begin(), shared_counts_span.end(), 0);
ptr->shared.bucket_counts = shared_counts_span;
// Initialize pending counts
auto pending_counts_span =
get_thread_local_arena().allocate_span<uint64_t>(bucket_count);
std::fill(pending_counts_span.begin(), pending_counts_span.end(), 0);
ptr->pending.bucket_counts = pending_counts_span;
// Ensure global accumulator exists for this label set // Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key]; auto &global_state = family->p->global_accumulated_values[key];
@@ -703,17 +708,16 @@ struct Metric {
// Allocate and copy thresholds, initialize counts // Allocate and copy thresholds, initialize counts
double *global_thresholds_data = double *global_thresholds_data =
get_global_arena().allocate<double>(bucket_count); get_global_arena().allocate<double>(bucket_count);
uint64_t *global_counts_data =
get_global_arena().allocate<uint64_t>(bucket_count);
std::memcpy(global_thresholds_data, ptr->thresholds.data(), std::memcpy(global_thresholds_data, ptr->thresholds.data(),
bucket_count * sizeof(double)); bucket_count * sizeof(double));
std::memset(global_counts_data, 0, bucket_count * sizeof(uint64_t));
global_state->thresholds = global_state->thresholds =
std::span<const double>(global_thresholds_data, bucket_count); std::span<const double>(global_thresholds_data, bucket_count);
global_state->counters.bucket_counts =
std::span<uint64_t>(global_counts_data, bucket_count); auto global_shared_counts_span =
get_global_arena().allocate_span<uint64_t>(bucket_count);
std::fill(global_shared_counts_span.begin(),
global_shared_counts_span.end(), 0);
global_state->shared.bucket_counts = global_shared_counts_span;
} }
} }
Histogram result; Histogram result;
@@ -1139,12 +1143,11 @@ struct Metric {
{ {
std::lock_guard lock(instance->mutex); std::lock_guard lock(instance->mutex);
for (size_t i = 0; i < instance->counters.bucket_counts.size(); for (size_t i = 0; i < instance->shared.bucket_counts.size(); ++i) {
++i) { counts_snapshot[i] = instance->shared.bucket_counts[i];
counts_snapshot[i] = instance->counters.bucket_counts[i];
} }
sum_snapshot = instance->counters.sum; sum_snapshot = instance->shared.sum;
observations_snapshot = instance->counters.observations; observations_snapshot = instance->shared.observations;
} }
for (size_t i = 0; i < bucket_count; ++i) { for (size_t i = 0; i < bucket_count; ++i) {
@@ -1157,12 +1160,12 @@ struct Metric {
// Add global accumulated values // Add global accumulated values
if (instruction.aggregate_histogram.global_state) { if (instruction.aggregate_histogram.global_state) {
auto *global_state = instruction.aggregate_histogram.global_state; auto *global_state = instruction.aggregate_histogram.global_state;
for (size_t i = 0; i < global_state->counters.bucket_counts.size(); for (size_t i = 0; i < global_state->shared.bucket_counts.size();
++i) { ++i) {
total_counts[i] += global_state->counters.bucket_counts[i]; total_counts[i] += global_state->shared.bucket_counts[i];
} }
total_sum += global_state->counters.sum; total_sum += global_state->shared.sum;
total_observations += global_state->counters.observations; total_observations += global_state->shared.observations;
} }
// Format explicit bucket counts // Format explicit bucket counts
@@ -1424,16 +1427,36 @@ update_histogram_buckets_simd(std::span<const double> thresholds,
} }
void Histogram::observe(double x) { void Histogram::observe(double x) {
assert(p->thresholds.size() == p->counters.bucket_counts.size()); assert(p->thresholds.size() == p->shared.bucket_counts.size());
std::lock_guard lock(p->mutex); // Try to get lock immediately
if (p->mutex.try_lock()) {
// Fast path: got lock, flush any pending first
if (p->pending.observations > 0) {
// Add pending to shared
for (size_t i = 0; i < p->pending.bucket_counts.size(); ++i) {
p->shared.bucket_counts[i] += p->pending.bucket_counts[i];
p->pending.bucket_counts[i] = 0;
}
p->shared.sum += p->pending.sum;
p->shared.observations += p->pending.observations;
p->pending.sum = 0.0;
p->pending.observations = 0;
}
// Update bucket counts using SIMD // Update shared directly
update_histogram_buckets_simd(p->thresholds, p->counters.bucket_counts, x, 0); update_histogram_buckets_simd(p->thresholds, p->shared.bucket_counts, x, 0);
p->shared.sum += x;
p->shared.observations++;
// Update sum and observation count p->mutex.unlock();
p->counters.sum += x; } else {
p->counters.observations++; // Slow path: accumulate in pending (lock-free)
update_histogram_buckets_simd(p->thresholds, p->pending.bucket_counts, x,
0);
p->pending.sum += x;
p->pending.observations++;
}
} }
template <> Family<Counter>::Family() = default; template <> Family<Counter>::Family() = default;

View File

@@ -73,7 +73,7 @@ template <typename T> using MetricCallback = std::function<double()>;
// 3. When rendered, the values of all Counter objects with the same labels // 3. When rendered, the values of all Counter objects with the same labels
// are summed together into a single total. // are summed together into a single total.
struct Counter { struct Counter {
void inc(double = 1.0); // Increment counter (must be >= 0) void inc(double = 1.0); // Increment counter (must be >= 0, never blocks)
private: private:
Counter(); Counter();
@@ -94,9 +94,9 @@ private:
// are cumulative. // are cumulative.
// 4. For independent gauges, create them with unique labels. // 4. For independent gauges, create them with unique labels.
struct Gauge { struct Gauge {
void inc(double = 1.0); void inc(double = 1.0); // (never blocks)
void dec(double = 1.0); void dec(double = 1.0); // (never blocks)
void set(double); void set(double); // (never blocks)
private: private:
Gauge(); Gauge();
@@ -116,7 +116,8 @@ private:
// 3. When rendered, the observations from all Histogram objects with the // 3. When rendered, the observations from all Histogram objects with the
// same labels are combined into a single histogram. // same labels are combined into a single histogram.
struct Histogram { struct Histogram {
void observe(double); // Record observation in appropriate bucket void
observe(double); // Record observation in appropriate bucket (never blocks)
private: private:
Histogram(); Histogram();