Fix histogram thread death bug
This commit is contained in:
@@ -501,6 +501,20 @@ struct Metric {
|
|||||||
// Acquire lock to get consistent snapshot
|
// Acquire lock to get consistent snapshot
|
||||||
std::lock_guard lock(instance->mutex);
|
std::lock_guard lock(instance->mutex);
|
||||||
|
|
||||||
|
// BUGFIX: Flush pending observations into shared before
|
||||||
|
// accumulating
|
||||||
|
if (instance->pending.observations > 0) {
|
||||||
|
// Add pending to shared
|
||||||
|
for (size_t i = 0; i < instance->pending.bucket_counts.size();
|
||||||
|
++i) {
|
||||||
|
instance->shared.bucket_counts[i] +=
|
||||||
|
instance->pending.bucket_counts[i];
|
||||||
|
}
|
||||||
|
instance->shared.sum += instance->pending.sum;
|
||||||
|
instance->shared.observations += instance->pending.observations;
|
||||||
|
// No need to reset pending since instance is being destroyed
|
||||||
|
}
|
||||||
|
|
||||||
// Global accumulator should have been created when we made the
|
// Global accumulator should have been created when we made the
|
||||||
// histogram
|
// histogram
|
||||||
auto &global_state = family->global_accumulated_values[labels_key];
|
auto &global_state = family->global_accumulated_values[labels_key];
|
||||||
|
|||||||
@@ -627,6 +627,113 @@ TEST_CASE("memory management") {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_CASE("histogram pending buffer thread cleanup bug") {
|
||||||
|
for (int iterations = 0; iterations < 1000; ++iterations) {
|
||||||
|
// This test demonstrates the bug where pending histogram observations
|
||||||
|
// are lost when a thread dies because ThreadInit destructor doesn't
|
||||||
|
// flush pending data into shared before accumulating into global state.
|
||||||
|
|
||||||
|
metric::reset_metrics_for_testing();
|
||||||
|
|
||||||
|
auto hist_family = metric::create_histogram(
|
||||||
|
"pending_bug_test", "Test histogram for pending buffer bug",
|
||||||
|
{1.0}); // Single bucket for simplicity
|
||||||
|
|
||||||
|
std::atomic<bool> keep_rendering{true};
|
||||||
|
constexpr int num_threads = 100;
|
||||||
|
|
||||||
|
std::latch ready{2};
|
||||||
|
|
||||||
|
// Background thread that calls render in a tight loop to hold global mutex
|
||||||
|
std::thread render_thread([&]() {
|
||||||
|
ready.arrive_and_wait();
|
||||||
|
Arena arena;
|
||||||
|
while (keep_rendering.load(std::memory_order_relaxed)) {
|
||||||
|
metric::render(arena);
|
||||||
|
arena.reset();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
// Don't spawn threads until render thread is running
|
||||||
|
ready.arrive_and_wait();
|
||||||
|
|
||||||
|
// Spawn threads that observe once and exit
|
||||||
|
std::vector<std::thread> observer_threads;
|
||||||
|
for (int i = 0; i < num_threads; ++i) {
|
||||||
|
observer_threads.emplace_back([&hist_family]() {
|
||||||
|
auto hist = hist_family.create({{"test", "observer"}});
|
||||||
|
hist.observe(0.5); // Goes into first bucket (le="1.0")
|
||||||
|
// Thread dies here - pending observations should be lost due to bug
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Join all observer threads
|
||||||
|
for (auto &t : observer_threads) {
|
||||||
|
t.join();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop render thread
|
||||||
|
keep_rendering.store(false, std::memory_order_relaxed);
|
||||||
|
render_thread.join();
|
||||||
|
|
||||||
|
// Check if the worker's observations were preserved
|
||||||
|
Arena arena;
|
||||||
|
auto output = metric::render(arena);
|
||||||
|
|
||||||
|
// First, let's debug what we actually got
|
||||||
|
std::ostringstream debug_output;
|
||||||
|
for (const auto &line : output) {
|
||||||
|
debug_output << line;
|
||||||
|
}
|
||||||
|
std::string full_output = debug_output.str();
|
||||||
|
|
||||||
|
// Parse the output to find the worker's bucket count for le="2.0"
|
||||||
|
uint64_t worker_bucket_2_count = 0;
|
||||||
|
bool found_worker_metric = false;
|
||||||
|
|
||||||
|
// The render output alternates between metric name and value in separate
|
||||||
|
// string_views
|
||||||
|
for (size_t i = 0; i < output.size(); ++i) {
|
||||||
|
const auto &line = output[i];
|
||||||
|
// Look for: pending_bug_test_bucket{test="observer",le="1.0"}
|
||||||
|
if (line.find("pending_bug_test_bucket{test=\"observer\",le=\"1.0\"}") !=
|
||||||
|
std::string_view::npos) {
|
||||||
|
found_worker_metric = true;
|
||||||
|
// The value should be in the next element
|
||||||
|
if (i + 1 < output.size()) {
|
||||||
|
auto value_str = output[i + 1];
|
||||||
|
// Remove trailing newline if present
|
||||||
|
while (!value_str.empty() &&
|
||||||
|
(value_str.back() == '\n' || value_str.back() == '\r')) {
|
||||||
|
value_str.remove_suffix(1);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
worker_bucket_2_count = std::stoull(std::string(value_str));
|
||||||
|
} catch (const std::exception &e) {
|
||||||
|
MESSAGE("Failed to parse value: '"
|
||||||
|
<< value_str << "' from metric line: '" << line << "'");
|
||||||
|
MESSAGE("Full output:\n" << full_output);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
REQUIRE(found_worker_metric); // The metric should exist
|
||||||
|
|
||||||
|
// BUG: This will fail because pending observations are lost on thread death
|
||||||
|
// Expected: num_threads observations (each thread made 1 observation)
|
||||||
|
// Actual: less than num_threads (observations stuck in pending are lost
|
||||||
|
// when threads die)
|
||||||
|
CHECK_MESSAGE(
|
||||||
|
worker_bucket_2_count == num_threads,
|
||||||
|
"Expected "
|
||||||
|
<< num_threads << " observations but got " << worker_bucket_2_count
|
||||||
|
<< ". This indicates the pending buffer bug where observations "
|
||||||
|
<< "stuck in pending are lost when thread dies.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_CASE("render output deterministic order golden test") {
|
TEST_CASE("render output deterministic order golden test") {
|
||||||
// Clean slate - reset all metrics before this test
|
// Clean slate - reset all metrics before this test
|
||||||
metric::reset_metrics_for_testing();
|
metric::reset_metrics_for_testing();
|
||||||
|
|||||||
Reference in New Issue
Block a user