Prepare for try_lock optimization

So histogram observations never block
This commit is contained in:
2025-09-15 23:04:54 -04:00
parent 6f421629aa
commit 0659319906

View File

@@ -342,13 +342,20 @@ struct Gauge::State {
struct Histogram::State { struct Histogram::State {
std::span<const double> thresholds; // Bucket boundaries (sorted, std::span<const double> thresholds; // Bucket boundaries (sorted,
// deduplicated, sizes never change) // deduplicated, sizes never change)
std::span<uint64_t> counts; // Count per bucket
double sum; // Sum of observations // Histogram counter data
uint64_t observations; // Total observation count struct Counters {
std::span<uint64_t> bucket_counts; // Count per bucket
double sum = 0.0; // Sum of observations
uint64_t observations = 0; // Total observation count
};
Counters counters; // Main counter data
std::mutex std::mutex
mutex; // Per-thread, per-histogram mutex for consistent reads/writes mutex; // Per-thread, per-histogram mutex for consistent reads/writes
State() : sum(0.0), observations(0) {} State() {}
friend struct Metric; friend struct Metric;
}; };
@@ -499,13 +506,16 @@ struct Metric {
assert(global_state); assert(global_state);
// Accumulate bucket counts (mutex already held) // Accumulate bucket counts (mutex already held)
for (size_t i = 0; i < instance->counts.size(); ++i) { for (size_t i = 0; i < instance->counters.bucket_counts.size();
global_state->counts[i] += instance->counts[i]; ++i) {
global_state->counters.bucket_counts[i] +=
instance->counters.bucket_counts[i];
} }
// Accumulate sum and observations // Accumulate sum and observations
global_state->sum += instance->sum; global_state->counters.sum += instance->counters.sum;
global_state->observations += instance->observations; global_state->counters.observations +=
instance->counters.observations;
} }
family->per_thread_state.erase(thread_it); family->per_thread_state.erase(thread_it);
} }
@@ -682,7 +692,8 @@ struct Metric {
std::memset(counts_data, 0, bucket_count * sizeof(uint64_t)); std::memset(counts_data, 0, bucket_count * sizeof(uint64_t));
ptr->thresholds = std::span<const double>(thresholds_data, bucket_count); ptr->thresholds = std::span<const double>(thresholds_data, bucket_count);
ptr->counts = std::span<uint64_t>(counts_data, bucket_count); ptr->counters.bucket_counts =
std::span<uint64_t>(counts_data, bucket_count);
// Ensure global accumulator exists for this label set // Ensure global accumulator exists for this label set
auto &global_state = family->p->global_accumulated_values[key]; auto &global_state = family->p->global_accumulated_values[key];
@@ -701,7 +712,7 @@ struct Metric {
global_state->thresholds = global_state->thresholds =
std::span<const double>(global_thresholds_data, bucket_count); std::span<const double>(global_thresholds_data, bucket_count);
global_state->counts = global_state->counters.bucket_counts =
std::span<uint64_t>(global_counts_data, bucket_count); std::span<uint64_t>(global_counts_data, bucket_count);
} }
} }
@@ -1128,11 +1139,12 @@ struct Metric {
{ {
std::lock_guard lock(instance->mutex); std::lock_guard lock(instance->mutex);
for (size_t i = 0; i < instance->counts.size(); ++i) { for (size_t i = 0; i < instance->counters.bucket_counts.size();
counts_snapshot[i] = instance->counts[i]; ++i) {
counts_snapshot[i] = instance->counters.bucket_counts[i];
} }
sum_snapshot = instance->sum; sum_snapshot = instance->counters.sum;
observations_snapshot = instance->observations; observations_snapshot = instance->counters.observations;
} }
for (size_t i = 0; i < bucket_count; ++i) { for (size_t i = 0; i < bucket_count; ++i) {
@@ -1145,11 +1157,12 @@ struct Metric {
// Add global accumulated values // Add global accumulated values
if (instruction.aggregate_histogram.global_state) { if (instruction.aggregate_histogram.global_state) {
auto *global_state = instruction.aggregate_histogram.global_state; auto *global_state = instruction.aggregate_histogram.global_state;
for (size_t i = 0; i < global_state->counts.size(); ++i) { for (size_t i = 0; i < global_state->counters.bucket_counts.size();
total_counts[i] += global_state->counts[i]; ++i) {
total_counts[i] += global_state->counters.bucket_counts[i];
} }
total_sum += global_state->sum; total_sum += global_state->counters.sum;
total_observations += global_state->observations; total_observations += global_state->counters.observations;
} }
// Format explicit bucket counts // Format explicit bucket counts
@@ -1411,16 +1424,16 @@ update_histogram_buckets_simd(std::span<const double> thresholds,
} }
void Histogram::observe(double x) { void Histogram::observe(double x) {
assert(p->thresholds.size() == p->counts.size()); assert(p->thresholds.size() == p->counters.bucket_counts.size());
std::lock_guard lock(p->mutex); std::lock_guard lock(p->mutex);
// Update bucket counts using SIMD // Update bucket counts using SIMD
update_histogram_buckets_simd(p->thresholds, p->counts, x, 0); update_histogram_buckets_simd(p->thresholds, p->counters.bucket_counts, x, 0);
// Update sum and observation count // Update sum and observation count
p->sum += x; p->counters.sum += x;
p->observations++; p->counters.observations++;
} }
template <> Family<Counter>::Family() = default; template <> Family<Counter>::Family() = default;