Fix thread destroy bug

This commit is contained in:
2025-08-30 16:20:35 -04:00
parent dcf8af6d43
commit 21ddcb75fb

View File

@@ -121,6 +121,10 @@ template <> struct Family<Counter>::State {
}; };
std::unordered_map<std::thread::id, PerThreadState> perThreadState; std::unordered_map<std::thread::id, PerThreadState> perThreadState;
// Global accumulation state for destroyed threads
std::unordered_map<LabelsKey, std::unique_ptr<Counter::State>>
globalAccumulatedValues;
// Callback-based metrics (global, not per-thread) // Callback-based metrics (global, not per-thread)
std::unordered_map<LabelsKey, MetricCallback<Counter>> callbacks; std::unordered_map<LabelsKey, MetricCallback<Counter>> callbacks;
}; };
@@ -144,6 +148,10 @@ template <> struct Family<Histogram>::State {
}; };
std::unordered_map<std::thread::id, PerThreadState> perThreadState; std::unordered_map<std::thread::id, PerThreadState> perThreadState;
// Global accumulation state for destroyed threads
std::unordered_map<LabelsKey, std::unique_ptr<Histogram::State>>
globalAccumulatedValues;
// Note: No callbacks map - histograms don't support callback-based metrics // Note: No callbacks map - histograms don't support callback-based metrics
}; };
@@ -190,21 +198,62 @@ struct Metric {
// Thread registration happens lazily when metrics are created // Thread registration happens lazily when metrics are created
} }
~ThreadInit() { ~ThreadInit() {
// TODO we need to accumulate this threads counts into a global. Otherwise // Accumulate thread-local state into global state before cleanup
// it can go backwards when we destroy a thread.
// Clean up this thread's storage from all families
std::unique_lock<std::mutex> _{mutex}; std::unique_lock<std::mutex> _{mutex};
auto thread_id = std::this_thread::get_id(); auto thread_id = std::this_thread::get_id();
// Clean up counter families // Accumulate counter families
for (auto &[name, family] : counterFamilies) { for (auto &[name, family] : counterFamilies) {
family->perThreadState.erase(thread_id); auto thread_it = family->perThreadState.find(thread_id);
if (thread_it != family->perThreadState.end()) {
for (auto &[labels_key, instance] : thread_it->second.instances) {
// Get current thread-local value
double current_value = instance->value;
// Ensure global accumulator exists
auto &global_state = family->globalAccumulatedValues[labels_key];
if (!global_state) {
global_state = std::make_unique<Counter::State>();
global_state->value = 0.0;
}
// Add thread-local value to global accumulator (mutex already held)
global_state->value += current_value;
}
family->perThreadState.erase(thread_it);
}
} }
// Clean up histogram families // Accumulate histogram families
for (auto &[name, family] : histogramFamilies) { for (auto &[name, family] : histogramFamilies) {
family->perThreadState.erase(thread_id); auto thread_it = family->perThreadState.find(thread_id);
if (thread_it != family->perThreadState.end()) {
for (auto &[labels_key, instance] : thread_it->second.instances) {
// Acquire lock to get consistent snapshot
std::lock_guard<std::mutex> lock(instance->mutex);
// Ensure global accumulator exists
auto &global_state = family->globalAccumulatedValues[labels_key];
if (!global_state) {
global_state = std::make_unique<Histogram::State>();
global_state->thresholds = instance->thresholds;
global_state->counts =
std::vector<uint64_t>(instance->counts.size(), 0);
global_state->sum = 0.0;
global_state->observations = 0;
}
// Accumulate bucket counts (mutex already held)
for (size_t i = 0; i < instance->counts.size(); ++i) {
global_state->counts[i] += instance->counts[i];
}
// Accumulate sum and observations
global_state->sum += instance->sum;
global_state->observations += instance->observations;
}
family->perThreadState.erase(thread_it);
}
} }
// Gauges are global, no per-thread cleanup needed // Gauges are global, no per-thread cleanup needed
@@ -234,6 +283,13 @@ struct Metric {
if (!ptr) { if (!ptr) {
ptr = std::make_unique<Counter::State>(); ptr = std::make_unique<Counter::State>();
ptr->value = 0.0; ptr->value = 0.0;
// Ensure global accumulator exists for this label set
auto &global_state = family->p->globalAccumulatedValues[key];
if (!global_state) {
global_state = std::make_unique<Counter::State>();
global_state->value = 0.0;
}
} }
Counter result; Counter result;
result.p = ptr.get(); result.p = ptr.get();
@@ -282,6 +338,16 @@ struct Metric {
ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0); ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0);
ptr->sum = 0.0; ptr->sum = 0.0;
ptr->observations = 0; ptr->observations = 0;
// Ensure global accumulator exists for this label set
auto &global_state = family->p->globalAccumulatedValues[key];
if (!global_state) {
global_state = std::make_unique<Histogram::State>();
global_state->thresholds = ptr->thresholds;
global_state->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0);
global_state->sum = 0.0;
global_state->observations = 0;
}
} }
Histogram result; Histogram result;
result.p = ptr.get(); result.p = ptr.get();
@@ -292,8 +358,8 @@ struct Metric {
Counter::Counter() = default; Counter::Counter() = default;
void Counter::inc(double x) { void Counter::inc(double x) {
// DESIGN: Single writer per thread allows simple increment // DESIGN: Single writer per thread, but render thread reads concurrently
// No atomics needed since only one thread writes to this counter // Need atomic store since render thread reads without writer's coordination
auto new_value = p->value + x; auto new_value = p->value + x;
// Validate monotonic property (counter never decreases) // Validate monotonic property (counter never decreases)
@@ -665,22 +731,38 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
value)); value));
} }
// Aggregate all counter values (thread-local + global accumulated)
std::unordered_map<LabelsKey, double> aggregated_values;
// First, add thread-local values
for (const auto &[thread_id, per_thread] : family->perThreadState) { for (const auto &[thread_id, per_thread] : family->perThreadState) {
for (const auto &[labels_key, instance] : per_thread.instances) { for (const auto &[labels_key, instance] : per_thread.instances) {
// Atomic read from render thread - single writer doesn't need atomic // Atomic read to match atomic store in Counter::inc()
// writes
double value; double value;
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED); __atomic_load(&instance->value, &value, __ATOMIC_RELAXED);
labels_sv.clear(); aggregated_values[labels_key] += value;
for (const auto &l : labels_key.labels)
labels_sv.push_back(l);
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()),
labels.data(), value));
} }
} }
// Then, add globally accumulated values from destroyed threads
for (const auto &[labels_key, global_state] :
family->globalAccumulatedValues) {
if (global_state) {
aggregated_values[labels_key] += global_state->value;
}
}
// Render aggregated counter values
for (const auto &[labels_key, total_value] : aggregated_values) {
labels_sv.clear();
for (const auto &l : labels_key.labels)
labels_sv.push_back(l);
auto labels = format_labels(labels_sv);
output.push_back(format(arena, "%.*s%.*s %.17g\n",
static_cast<int>(name.length()), name.data(),
static_cast<int>(labels.length()), labels.data(),
total_value));
}
} }
// Render gauges // Render gauges
@@ -722,7 +804,15 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str())); format(arena, "# HELP %s %s\n", name.c_str(), family->help.c_str()));
output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str())); output.push_back(format(arena, "# TYPE %s histogram\n", name.c_str()));
// Aggregate all histogram values (thread-local + global accumulated)
std::unordered_map<LabelsKey,
std::tuple<std::vector<double>, std::vector<uint64_t>,
double, uint64_t>>
aggregated_histograms;
std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv; std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv;
// First, collect thread-local histogram data
for (const auto &[thread_id, per_thread] : family->perThreadState) { for (const auto &[thread_id, per_thread] : family->perThreadState) {
for (const auto &[labels_key, instance] : per_thread.instances) { for (const auto &[labels_key, instance] : per_thread.instances) {
// Extract data under lock - minimize critical section // Extract data under lock - minimize critical section
@@ -739,47 +829,92 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
observations_snapshot = instance->observations; observations_snapshot = instance->observations;
} }
// Render explicit bucket counts outside critical section // Initialize or aggregate into aggregated_histograms
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { auto &[thresholds, counts, sum, observations] =
bucket_labels_sv.clear(); aggregated_histograms[labels_key];
for (const auto &l : labels_key.labels) if (thresholds.empty()) {
bucket_labels_sv.push_back(l); thresholds = thresholds_snapshot;
counts = counts_snapshot;
bucket_labels_sv.push_back( sum = sum_snapshot;
{"le", static_format(arena, thresholds_snapshot[i])}); observations = observations_snapshot;
auto labels = format_labels(bucket_labels_sv); } else {
output.push_back( // Aggregate counts
format(arena, "%s_bucket%.*s %llu\n", name.c_str(), for (size_t i = 0; i < counts_snapshot.size(); ++i) {
static_cast<int>(labels.length()), labels.data(), counts[i] += counts_snapshot[i];
static_cast<unsigned long long>(counts_snapshot[i]))); }
sum += sum_snapshot;
observations += observations_snapshot;
} }
}
}
// Render +Inf bucket using total observations count // Then, add globally accumulated values from destroyed threads
for (const auto &[labels_key, global_state] :
family->globalAccumulatedValues) {
if (global_state) {
auto &[thresholds, counts, sum, observations] =
aggregated_histograms[labels_key];
if (thresholds.empty()) {
thresholds = global_state->thresholds;
counts = global_state->counts;
sum = global_state->sum;
observations = global_state->observations;
} else {
// Add global accumulated values
for (size_t i = 0; i < global_state->counts.size(); ++i) {
counts[i] += global_state->counts[i];
}
sum += global_state->sum;
observations += global_state->observations;
}
}
}
// Render aggregated histogram data
for (const auto &[labels_key, histogram_data] : aggregated_histograms) {
const auto &[thresholds_snapshot, counts_snapshot, sum_snapshot,
observations_snapshot] = histogram_data;
// Render explicit bucket counts
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
bucket_labels_sv.clear(); bucket_labels_sv.clear();
for (const auto &l : labels_key.labels) for (const auto &l : labels_key.labels)
bucket_labels_sv.push_back(l); bucket_labels_sv.push_back(l);
bucket_labels_sv.push_back({"le", "+Inf"});
auto inf_labels = format_labels(bucket_labels_sv); bucket_labels_sv.push_back(
{"le", static_format(arena, thresholds_snapshot[i])});
auto labels = format_labels(bucket_labels_sv);
output.push_back( output.push_back(
format(arena, "%s_bucket%.*s %llu\n", name.c_str(), format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
static_cast<int>(inf_labels.length()), inf_labels.data(),
static_cast<unsigned long long>(observations_snapshot)));
// Render sum outside critical section
bucket_labels_sv.clear();
for (const auto &l : labels_key.labels)
bucket_labels_sv.push_back(l);
auto labels = format_labels(bucket_labels_sv);
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(),
static_cast<int>(labels.length()),
labels.data(), sum_snapshot));
// Render count outside critical section
output.push_back(
format(arena, "%s_count%.*s %llu\n", name.c_str(),
static_cast<int>(labels.length()), labels.data(), static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(observations_snapshot))); static_cast<unsigned long long>(counts_snapshot[i])));
} }
// Render +Inf bucket using total observations count
bucket_labels_sv.clear();
for (const auto &l : labels_key.labels)
bucket_labels_sv.push_back(l);
bucket_labels_sv.push_back({"le", "+Inf"});
auto inf_labels = format_labels(bucket_labels_sv);
output.push_back(
format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
static_cast<int>(inf_labels.length()), inf_labels.data(),
static_cast<unsigned long long>(observations_snapshot)));
// Render sum
bucket_labels_sv.clear();
for (const auto &l : labels_key.labels)
bucket_labels_sv.push_back(l);
auto labels = format_labels(bucket_labels_sv);
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(),
static_cast<int>(labels.length()), labels.data(),
sum_snapshot));
// Render count
output.push_back(
format(arena, "%s_count%.*s %llu\n", name.c_str(),
static_cast<int>(labels.length()), labels.data(),
static_cast<unsigned long long>(observations_snapshot)));
} }
} }