Update potential misunderstanding about thread safety
This commit is contained in:
@@ -58,9 +58,6 @@ struct ContentionEnvironment {
|
|||||||
bg_counter.inc(1.0);
|
bg_counter.inc(1.0);
|
||||||
bg_gauge.set(dist(rng));
|
bg_gauge.set(dist(rng));
|
||||||
bg_histogram.observe(dist(rng));
|
bg_histogram.observe(dist(rng));
|
||||||
|
|
||||||
// Small delay to avoid spinning too fast
|
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(1));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -74,8 +71,6 @@ struct ContentionEnvironment {
|
|||||||
auto output = metric::render(arena);
|
auto output = metric::render(arena);
|
||||||
static_cast<void>(output); // Suppress unused variable warning
|
static_cast<void>(output); // Suppress unused variable warning
|
||||||
arena.reset();
|
arena.reset();
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(100));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
@@ -140,9 +135,6 @@ int main() {
|
|||||||
// Start background threads creating contention
|
// Start background threads creating contention
|
||||||
env.start_background_contention(8);
|
env.start_background_contention(8);
|
||||||
|
|
||||||
std::this_thread::sleep_for(
|
|
||||||
std::chrono::milliseconds(100)); // Let background threads start
|
|
||||||
|
|
||||||
bench.run("counter.inc() - 8 background threads", [&]() {
|
bench.run("counter.inc() - 8 background threads", [&]() {
|
||||||
env.counter.inc(1.0);
|
env.counter.inc(1.0);
|
||||||
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
||||||
@@ -172,8 +164,6 @@ int main() {
|
|||||||
env.start_background_contention(4);
|
env.start_background_contention(4);
|
||||||
env.start_render_thread();
|
env.start_render_thread();
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
|
||||||
|
|
||||||
bench.run("counter.inc() - with concurrent render", [&]() {
|
bench.run("counter.inc() - with concurrent render", [&]() {
|
||||||
env.counter.inc(1.0);
|
env.counter.inc(1.0);
|
||||||
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
||||||
@@ -192,29 +182,28 @@ int main() {
|
|||||||
|
|
||||||
// Shared gauge contention
|
// Shared gauge contention
|
||||||
{
|
{
|
||||||
// Test the multi-writer CAS behavior of gauges
|
// Test the multi-writer CAS behavior of gauges when multiple threads
|
||||||
|
// create gauges with the same labels. They will all point to the same
|
||||||
|
// underlying state, causing high contention.
|
||||||
auto gauge_family =
|
auto gauge_family =
|
||||||
metric::create_gauge("shared_gauge", "Shared gauge test");
|
metric::create_gauge("shared_gauge", "Shared gauge test");
|
||||||
auto shared_gauge = gauge_family.create({{"shared", "true"}});
|
|
||||||
|
|
||||||
// Background threads all writing to the SAME gauge (high CAS contention)
|
|
||||||
std::atomic<bool> stop_shared{false};
|
std::atomic<bool> stop_shared{false};
|
||||||
std::vector<std::thread> shared_threads;
|
std::vector<std::thread> shared_threads;
|
||||||
|
|
||||||
for (int i = 0; i < 8; ++i) {
|
for (int i = 0; i < 8; ++i) {
|
||||||
shared_threads.emplace_back([&shared_gauge, &stop_shared]() {
|
shared_threads.emplace_back([&gauge_family, &stop_shared]() {
|
||||||
|
auto gauge = gauge_family.create({{"shared", "true"}});
|
||||||
while (!stop_shared.load(std::memory_order_relaxed)) {
|
while (!stop_shared.load(std::memory_order_relaxed)) {
|
||||||
shared_gauge.inc(1.0);
|
gauge.inc(1.0);
|
||||||
std::this_thread::sleep_for(std::chrono::nanoseconds(100));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
std::this_thread::sleep_for(std::chrono::milliseconds(50));
|
auto gauge_for_benchmark = gauge_family.create({{"shared", "true"}});
|
||||||
|
bench.run("gauge.inc() - 8 threads, same labels (contention)", [&]() {
|
||||||
bench.run("gauge.inc() - 8 threads same gauge (CAS contention)", [&]() {
|
gauge_for_benchmark.inc(1.0);
|
||||||
shared_gauge.inc(1.0);
|
ankerl::nanobench::doNotOptimizeAway(gauge_for_benchmark);
|
||||||
ankerl::nanobench::doNotOptimizeAway(shared_gauge);
|
|
||||||
});
|
});
|
||||||
|
|
||||||
stop_shared.store(true);
|
stop_shared.store(true);
|
||||||
@@ -292,7 +281,6 @@ int main() {
|
|||||||
while (!stop_callback.load()) {
|
while (!stop_callback.load()) {
|
||||||
counter_value.fetch_add(1);
|
counter_value.fetch_add(1);
|
||||||
gauge_value.store(gauge_value.load() + 1);
|
gauge_value.store(gauge_value.load() + 1);
|
||||||
std::this_thread::sleep_for(std::chrono::microseconds(10));
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
@@ -55,16 +55,17 @@ template <typename T> struct Family;
|
|||||||
// render() mutex - no need to be thread-safe internally
|
// render() mutex - no need to be thread-safe internally
|
||||||
template <typename T> using MetricCallback = std::function<double()>;
|
template <typename T> using MetricCallback = std::function<double()>;
|
||||||
|
|
||||||
// Counter: Monotonically increasing metric with single-writer semantics
|
// Counter: A metric value that only increases.
|
||||||
// Use for: request counts, error counts, bytes processed, etc.
|
|
||||||
//
|
//
|
||||||
// THREAD SAFETY: Each counter instance has exactly ONE writer thread (the one
|
// THREAD SAFETY RULES:
|
||||||
// that created it). It is an error to call inc() from any thread other than the
|
// 1. Do not call inc() on the same Counter object from multiple threads.
|
||||||
// creating thread. Multiple readers can safely read the value from other
|
// Each object must have only one writer thread.
|
||||||
// threads.
|
// 2. To use Counters concurrently, each thread must create its own Counter
|
||||||
|
// object.
|
||||||
|
// 3. When rendered, the values of all Counter objects with the same labels
|
||||||
|
// are summed together into a single total.
|
||||||
struct Counter {
|
struct Counter {
|
||||||
void
|
void inc(double = 1.0); // Increment counter (must be >= 0)
|
||||||
inc(double = 1.0); // Increment counter (must be >= 0) - SINGLE WRITER ONLY
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Counter();
|
Counter();
|
||||||
@@ -74,17 +75,20 @@ private:
|
|||||||
State *p;
|
State *p;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Gauge: Can increase/decrease metric
|
// Gauge: A metric value that can be set, increased, or decreased.
|
||||||
// Use for: memory usage, active connections, queue depth, etc.
|
|
||||||
//
|
//
|
||||||
// THREAD SAFETY: Each gauge instance has exactly ONE writer thread (the one
|
// THREAD SAFETY RULES:
|
||||||
// that created it). It is an error to call inc()/dec()/set() from any thread
|
// 1. Do not call inc(), dec(), or set() on the same Gauge object from
|
||||||
// other than the creating thread.
|
// multiple threads. Each object must have only one writer thread.
|
||||||
// IMPLEMENTATION NOTE: Mutex protection is an internal implementation detail.
|
// 2. To use Gauges concurrently, each thread must create its own Gauge object.
|
||||||
|
// 3. If multiple Gauge objects are created with the same labels, their
|
||||||
|
// operations are combined. For example, increments from different objects
|
||||||
|
// are cumulative.
|
||||||
|
// 4. For independent gauges, create them with unique labels.
|
||||||
struct Gauge {
|
struct Gauge {
|
||||||
void inc(double = 1.0); // Increase gauge value - SINGLE WRITER ONLY
|
void inc(double = 1.0);
|
||||||
void dec(double = 1.0); // Decrease gauge value - SINGLE WRITER ONLY
|
void dec(double = 1.0);
|
||||||
void set(double); // Set absolute value - SINGLE WRITER ONLY
|
void set(double);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Gauge();
|
Gauge();
|
||||||
@@ -94,17 +98,17 @@ private:
|
|||||||
State *p;
|
State *p;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Histogram: Distribution tracking with single-writer semantics
|
// Histogram: A metric that samples observations into buckets.
|
||||||
// Use for: request latency, response size, processing time, etc.
|
|
||||||
// Buckets are automatically sorted, deduplicated, and include +Inf
|
|
||||||
//
|
//
|
||||||
// THREAD SAFETY: Each histogram instance has exactly ONE writer thread (the one
|
// THREAD SAFETY RULES:
|
||||||
// that created it). It is an error to call observe() from any thread other than
|
// 1. Do not call observe() on the same Histogram object from multiple
|
||||||
// the creating thread. Multiple readers can safely read bucket values from
|
// threads. Each object must have only one writer thread.
|
||||||
// other threads.
|
// 2. To use Histograms concurrently, each thread must create its own
|
||||||
|
// Histogram object.
|
||||||
|
// 3. When rendered, the observations from all Histogram objects with the
|
||||||
|
// same labels are combined into a single histogram.
|
||||||
struct Histogram {
|
struct Histogram {
|
||||||
void observe(
|
void observe(double); // Record observation in appropriate bucket
|
||||||
double); // Record observation in appropriate bucket - SINGLE WRITER ONLY
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Histogram();
|
Histogram();
|
||||||
|
|||||||
@@ -402,21 +402,22 @@ TEST_CASE("thread safety") {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SUBCASE("gauge multi-writer with CAS") {
|
SUBCASE("gauge multi-writer contention") {
|
||||||
auto gauge_family =
|
auto gauge_family =
|
||||||
metric::create_gauge("thread_test_gauge", "Thread test gauge");
|
metric::create_gauge("thread_test_gauge", "Thread test gauge");
|
||||||
auto shared_gauge = gauge_family.create({{"shared", "true"}});
|
|
||||||
|
|
||||||
std::vector<std::thread> threads;
|
std::vector<std::thread> threads;
|
||||||
std::latch start_latch{num_threads};
|
std::latch start_latch{num_threads};
|
||||||
|
|
||||||
// Multiple threads writing to same gauge (uses atomic CAS)
|
// Multiple threads create gauges with the same labels, writing to the same
|
||||||
|
// underlying state, testing CAS contention.
|
||||||
for (int i = 0; i < num_threads; ++i) {
|
for (int i = 0; i < num_threads; ++i) {
|
||||||
threads.emplace_back([&]() {
|
threads.emplace_back([&]() {
|
||||||
|
auto gauge = gauge_family.create({{"shared", "true"}});
|
||||||
start_latch.arrive_and_wait();
|
start_latch.arrive_and_wait();
|
||||||
|
|
||||||
for (int j = 0; j < ops_per_thread; ++j) {
|
for (int j = 0; j < ops_per_thread; ++j) {
|
||||||
shared_gauge.inc(1.0);
|
gauge.inc(1.0);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user