From 9f8562e30f07f04267be94e28d5688285f271978 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Thu, 18 Sep 2025 12:39:47 -0400 Subject: [PATCH] Fix histogram thread death bug --- src/metric.cpp | 14 ++++++ tests/test_metric.cpp | 107 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 121 insertions(+) diff --git a/src/metric.cpp b/src/metric.cpp index 58a1135..0c1efae 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -501,6 +501,20 @@ struct Metric { // Acquire lock to get consistent snapshot std::lock_guard lock(instance->mutex); + // BUGFIX: Flush pending observations into shared before + // accumulating + if (instance->pending.observations > 0) { + // Add pending to shared + for (size_t i = 0; i < instance->pending.bucket_counts.size(); + ++i) { + instance->shared.bucket_counts[i] += + instance->pending.bucket_counts[i]; + } + instance->shared.sum += instance->pending.sum; + instance->shared.observations += instance->pending.observations; + // No need to reset pending since instance is being destroyed + } + // Global accumulator should have been created when we made the // histogram auto &global_state = family->global_accumulated_values[labels_key]; diff --git a/tests/test_metric.cpp b/tests/test_metric.cpp index 0d2187c..de4b851 100644 --- a/tests/test_metric.cpp +++ b/tests/test_metric.cpp @@ -627,6 +627,113 @@ TEST_CASE("memory management") { } } +TEST_CASE("histogram pending buffer thread cleanup bug") { + for (int iterations = 0; iterations < 1000; ++iterations) { + // This test demonstrates the bug where pending histogram observations + // are lost when a thread dies because ThreadInit destructor doesn't + // flush pending data into shared before accumulating into global state. + + metric::reset_metrics_for_testing(); + + auto hist_family = metric::create_histogram( + "pending_bug_test", "Test histogram for pending buffer bug", + {1.0}); // Single bucket for simplicity + + std::atomic keep_rendering{true}; + constexpr int num_threads = 100; + + std::latch ready{2}; + + // Background thread that calls render in a tight loop to hold global mutex + std::thread render_thread([&]() { + ready.arrive_and_wait(); + Arena arena; + while (keep_rendering.load(std::memory_order_relaxed)) { + metric::render(arena); + arena.reset(); + } + }); + // Don't spawn threads until render thread is running + ready.arrive_and_wait(); + + // Spawn threads that observe once and exit + std::vector observer_threads; + for (int i = 0; i < num_threads; ++i) { + observer_threads.emplace_back([&hist_family]() { + auto hist = hist_family.create({{"test", "observer"}}); + hist.observe(0.5); // Goes into first bucket (le="1.0") + // Thread dies here - pending observations should be lost due to bug + }); + } + + // Join all observer threads + for (auto &t : observer_threads) { + t.join(); + } + + // Stop render thread + keep_rendering.store(false, std::memory_order_relaxed); + render_thread.join(); + + // Check if the worker's observations were preserved + Arena arena; + auto output = metric::render(arena); + + // First, let's debug what we actually got + std::ostringstream debug_output; + for (const auto &line : output) { + debug_output << line; + } + std::string full_output = debug_output.str(); + + // Parse the output to find the worker's bucket count for le="2.0" + uint64_t worker_bucket_2_count = 0; + bool found_worker_metric = false; + + // The render output alternates between metric name and value in separate + // string_views + for (size_t i = 0; i < output.size(); ++i) { + const auto &line = output[i]; + // Look for: pending_bug_test_bucket{test="observer",le="1.0"} + if (line.find("pending_bug_test_bucket{test=\"observer\",le=\"1.0\"}") != + std::string_view::npos) { + found_worker_metric = true; + // The value should be in the next element + if (i + 1 < output.size()) { + auto value_str = output[i + 1]; + // Remove trailing newline if present + while (!value_str.empty() && + (value_str.back() == '\n' || value_str.back() == '\r')) { + value_str.remove_suffix(1); + } + try { + worker_bucket_2_count = std::stoull(std::string(value_str)); + } catch (const std::exception &e) { + MESSAGE("Failed to parse value: '" + << value_str << "' from metric line: '" << line << "'"); + MESSAGE("Full output:\n" << full_output); + throw; + } + } + break; + } + } + + REQUIRE(found_worker_metric); // The metric should exist + + // BUG: This will fail because pending observations are lost on thread death + // Expected: num_threads observations (each thread made 1 observation) + // Actual: less than num_threads (observations stuck in pending are lost + // when threads die) + CHECK_MESSAGE( + worker_bucket_2_count == num_threads, + "Expected " + << num_threads << " observations but got " << worker_bucket_2_count + << ". This indicates the pending buffer bug where observations " + << "stuck in pending are lost when thread dies."); + } +} + TEST_CASE("render output deterministic order golden test") { // Clean slate - reset all metrics before this test metric::reset_metrics_for_testing();