From 21ddcb75fb2845eea8098faed47cf523a3a13321 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Sat, 30 Aug 2025 16:20:35 -0400 Subject: [PATCH] Fix thread destroy bug --- src/metric.cpp | 239 ++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 187 insertions(+), 52 deletions(-) diff --git a/src/metric.cpp b/src/metric.cpp index 1bd815b..d982cae 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -121,6 +121,10 @@ template <> struct Family::State { }; std::unordered_map perThreadState; + // Global accumulation state for destroyed threads + std::unordered_map> + globalAccumulatedValues; + // Callback-based metrics (global, not per-thread) std::unordered_map> callbacks; }; @@ -144,6 +148,10 @@ template <> struct Family::State { }; std::unordered_map perThreadState; + // Global accumulation state for destroyed threads + std::unordered_map> + globalAccumulatedValues; + // Note: No callbacks map - histograms don't support callback-based metrics }; @@ -190,21 +198,62 @@ struct Metric { // Thread registration happens lazily when metrics are created } ~ThreadInit() { - // TODO we need to accumulate this threads counts into a global. Otherwise - // it can go backwards when we destroy a thread. - - // Clean up this thread's storage from all families + // Accumulate thread-local state into global state before cleanup std::unique_lock _{mutex}; auto thread_id = std::this_thread::get_id(); - // Clean up counter families + // Accumulate counter families for (auto &[name, family] : counterFamilies) { - family->perThreadState.erase(thread_id); + auto thread_it = family->perThreadState.find(thread_id); + if (thread_it != family->perThreadState.end()) { + for (auto &[labels_key, instance] : thread_it->second.instances) { + // Get current thread-local value + double current_value = instance->value; + + // Ensure global accumulator exists + auto &global_state = family->globalAccumulatedValues[labels_key]; + if (!global_state) { + global_state = std::make_unique(); + global_state->value = 0.0; + } + + // Add thread-local value to global accumulator (mutex already held) + global_state->value += current_value; + } + family->perThreadState.erase(thread_it); + } } - // Clean up histogram families + // Accumulate histogram families for (auto &[name, family] : histogramFamilies) { - family->perThreadState.erase(thread_id); + auto thread_it = family->perThreadState.find(thread_id); + if (thread_it != family->perThreadState.end()) { + for (auto &[labels_key, instance] : thread_it->second.instances) { + // Acquire lock to get consistent snapshot + std::lock_guard lock(instance->mutex); + + // Ensure global accumulator exists + auto &global_state = family->globalAccumulatedValues[labels_key]; + if (!global_state) { + global_state = std::make_unique(); + global_state->thresholds = instance->thresholds; + global_state->counts = + std::vector(instance->counts.size(), 0); + global_state->sum = 0.0; + global_state->observations = 0; + } + + // Accumulate bucket counts (mutex already held) + for (size_t i = 0; i < instance->counts.size(); ++i) { + global_state->counts[i] += instance->counts[i]; + } + + // Accumulate sum and observations + global_state->sum += instance->sum; + global_state->observations += instance->observations; + } + family->perThreadState.erase(thread_it); + } } // Gauges are global, no per-thread cleanup needed @@ -234,6 +283,13 @@ struct Metric { if (!ptr) { ptr = std::make_unique(); ptr->value = 0.0; + + // Ensure global accumulator exists for this label set + auto &global_state = family->p->globalAccumulatedValues[key]; + if (!global_state) { + global_state = std::make_unique(); + global_state->value = 0.0; + } } Counter result; result.p = ptr.get(); @@ -282,6 +338,16 @@ struct Metric { ptr->counts = std::vector(ptr->thresholds.size(), 0); ptr->sum = 0.0; ptr->observations = 0; + + // Ensure global accumulator exists for this label set + auto &global_state = family->p->globalAccumulatedValues[key]; + if (!global_state) { + global_state = std::make_unique(); + global_state->thresholds = ptr->thresholds; + global_state->counts = std::vector(ptr->thresholds.size(), 0); + global_state->sum = 0.0; + global_state->observations = 0; + } } Histogram result; result.p = ptr.get(); @@ -292,8 +358,8 @@ struct Metric { Counter::Counter() = default; void Counter::inc(double x) { - // DESIGN: Single writer per thread allows simple increment - // No atomics needed since only one thread writes to this counter + // DESIGN: Single writer per thread, but render thread reads concurrently + // Need atomic store since render thread reads without writer's coordination auto new_value = p->value + x; // Validate monotonic property (counter never decreases) @@ -665,22 +731,38 @@ std::span render(ArenaAllocator &arena) { value)); } + // Aggregate all counter values (thread-local + global accumulated) + std::unordered_map aggregated_values; + + // First, add thread-local values for (const auto &[thread_id, per_thread] : family->perThreadState) { for (const auto &[labels_key, instance] : per_thread.instances) { - // Atomic read from render thread - single writer doesn't need atomic - // writes + // Atomic read to match atomic store in Counter::inc() double value; __atomic_load(&instance->value, &value, __ATOMIC_RELAXED); - labels_sv.clear(); - for (const auto &l : labels_key.labels) - labels_sv.push_back(l); - auto labels = format_labels(labels_sv); - output.push_back(format(arena, "%.*s%.*s %.17g\n", - static_cast(name.length()), name.data(), - static_cast(labels.length()), - labels.data(), value)); + aggregated_values[labels_key] += value; } } + + // Then, add globally accumulated values from destroyed threads + for (const auto &[labels_key, global_state] : + family->globalAccumulatedValues) { + if (global_state) { + aggregated_values[labels_key] += global_state->value; + } + } + + // Render aggregated counter values + for (const auto &[labels_key, total_value] : aggregated_values) { + labels_sv.clear(); + for (const auto &l : labels_key.labels) + labels_sv.push_back(l); + auto labels = format_labels(labels_sv); + output.push_back(format(arena, "%.*s%.*s %.17g\n", + static_cast(name.length()), name.data(), + static_cast(labels.length()), labels.data(), + total_value)); + } } // Render gauges @@ -722,7 +804,15 @@ std::span render(ArenaAllocator &arena) { format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str())); + // Aggregate all histogram values (thread-local + global accumulated) + std::unordered_map, std::vector, + double, uint64_t>> + aggregated_histograms; + std::vector> bucket_labels_sv; + + // First, collect thread-local histogram data for (const auto &[thread_id, per_thread] : family->perThreadState) { for (const auto &[labels_key, instance] : per_thread.instances) { // Extract data under lock - minimize critical section @@ -739,47 +829,92 @@ std::span render(ArenaAllocator &arena) { observations_snapshot = instance->observations; } - // Render explicit bucket counts outside critical section - for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { - bucket_labels_sv.clear(); - for (const auto &l : labels_key.labels) - bucket_labels_sv.push_back(l); - - bucket_labels_sv.push_back( - {"le", static_format(arena, thresholds_snapshot[i])}); - auto labels = format_labels(bucket_labels_sv); - output.push_back( - format(arena, "%s_bucket%.*s %llu\n", name.c_str(), - static_cast(labels.length()), labels.data(), - static_cast(counts_snapshot[i]))); + // Initialize or aggregate into aggregated_histograms + auto &[thresholds, counts, sum, observations] = + aggregated_histograms[labels_key]; + if (thresholds.empty()) { + thresholds = thresholds_snapshot; + counts = counts_snapshot; + sum = sum_snapshot; + observations = observations_snapshot; + } else { + // Aggregate counts + for (size_t i = 0; i < counts_snapshot.size(); ++i) { + counts[i] += counts_snapshot[i]; + } + sum += sum_snapshot; + observations += observations_snapshot; } + } + } - // Render +Inf bucket using total observations count + // Then, add globally accumulated values from destroyed threads + for (const auto &[labels_key, global_state] : + family->globalAccumulatedValues) { + if (global_state) { + auto &[thresholds, counts, sum, observations] = + aggregated_histograms[labels_key]; + if (thresholds.empty()) { + thresholds = global_state->thresholds; + counts = global_state->counts; + sum = global_state->sum; + observations = global_state->observations; + } else { + // Add global accumulated values + for (size_t i = 0; i < global_state->counts.size(); ++i) { + counts[i] += global_state->counts[i]; + } + sum += global_state->sum; + observations += global_state->observations; + } + } + } + + // Render aggregated histogram data + for (const auto &[labels_key, histogram_data] : aggregated_histograms) { + const auto &[thresholds_snapshot, counts_snapshot, sum_snapshot, + observations_snapshot] = histogram_data; + + // Render explicit bucket counts + for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { bucket_labels_sv.clear(); for (const auto &l : labels_key.labels) bucket_labels_sv.push_back(l); - bucket_labels_sv.push_back({"le", "+Inf"}); - auto inf_labels = format_labels(bucket_labels_sv); + + bucket_labels_sv.push_back( + {"le", static_format(arena, thresholds_snapshot[i])}); + auto labels = format_labels(bucket_labels_sv); output.push_back( format(arena, "%s_bucket%.*s %llu\n", name.c_str(), - static_cast(inf_labels.length()), inf_labels.data(), - static_cast(observations_snapshot))); - - // Render sum outside critical section - bucket_labels_sv.clear(); - for (const auto &l : labels_key.labels) - bucket_labels_sv.push_back(l); - auto labels = format_labels(bucket_labels_sv); - output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(), - static_cast(labels.length()), - labels.data(), sum_snapshot)); - - // Render count outside critical section - output.push_back( - format(arena, "%s_count%.*s %llu\n", name.c_str(), static_cast(labels.length()), labels.data(), - static_cast(observations_snapshot))); + static_cast(counts_snapshot[i]))); } + + // Render +Inf bucket using total observations count + bucket_labels_sv.clear(); + for (const auto &l : labels_key.labels) + bucket_labels_sv.push_back(l); + bucket_labels_sv.push_back({"le", "+Inf"}); + auto inf_labels = format_labels(bucket_labels_sv); + output.push_back( + format(arena, "%s_bucket%.*s %llu\n", name.c_str(), + static_cast(inf_labels.length()), inf_labels.data(), + static_cast(observations_snapshot))); + + // Render sum + bucket_labels_sv.clear(); + for (const auto &l : labels_key.labels) + bucket_labels_sv.push_back(l); + auto labels = format_labels(bucket_labels_sv); + output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(), + static_cast(labels.length()), labels.data(), + sum_snapshot)); + + // Render count + output.push_back( + format(arena, "%s_count%.*s %llu\n", name.c_str(), + static_cast(labels.length()), labels.data(), + static_cast(observations_snapshot))); } }