Actually have contention in benchmark
This commit is contained in:
@@ -20,29 +20,14 @@ struct ContentionEnvironment {
|
||||
std::unique_ptr<std::latch> contention_latch;
|
||||
std::unique_ptr<std::latch> render_latch;
|
||||
|
||||
// Metrics for testing
|
||||
metric::Family<metric::Counter> counter_family;
|
||||
metric::Family<metric::Gauge> gauge_family;
|
||||
metric::Family<metric::Histogram> histogram_family;
|
||||
metric::Family<metric::Gauge> gauge_family =
|
||||
metric::create_gauge("gauge", "");
|
||||
metric::Family<metric::Counter> counter_family =
|
||||
metric::create_counter("counter", "");
|
||||
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
||||
"histogram", "", metric::exponential_buckets(0.001, 5, 7));
|
||||
|
||||
// Test instances
|
||||
metric::Counter counter;
|
||||
metric::Gauge gauge;
|
||||
metric::Histogram histogram;
|
||||
|
||||
ContentionEnvironment()
|
||||
: counter_family(
|
||||
metric::create_counter("bench_counter", "Benchmark counter")),
|
||||
gauge_family(metric::create_gauge("bench_gauge", "Benchmark gauge")),
|
||||
histogram_family(
|
||||
metric::create_histogram("bench_histogram", "Benchmark histogram",
|
||||
// 7 explicit buckets + automatic +Inf = 8
|
||||
// total (optimal for SIMD: 2x4 buckets)
|
||||
std::initializer_list<double>{
|
||||
0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0})),
|
||||
counter(counter_family.create({{"benchmark", "contention"}})),
|
||||
gauge(gauge_family.create({{"benchmark", "contention"}})),
|
||||
histogram(histogram_family.create({{"benchmark", "contention"}})) {}
|
||||
ContentionEnvironment() = default;
|
||||
|
||||
void start_background_contention(int num_threads = 4) {
|
||||
stop_flag.store(false);
|
||||
@@ -50,12 +35,9 @@ struct ContentionEnvironment {
|
||||
|
||||
for (int i = 0; i < num_threads; ++i) {
|
||||
background_threads.emplace_back([this, i]() {
|
||||
// Each background thread creates its own metrics to avoid conflicts
|
||||
auto bg_counter =
|
||||
counter_family.create({{"thread", std::to_string(i)}});
|
||||
auto bg_gauge = gauge_family.create({{"bg_thread", std::to_string(i)}});
|
||||
auto bg_histogram =
|
||||
histogram_family.create({{"bg_thread", std::to_string(i)}});
|
||||
auto bg_counter = counter_family.create({});
|
||||
auto bg_gauge = gauge_family.create({});
|
||||
auto bg_histogram = histogram_family.create({});
|
||||
|
||||
std::mt19937 rng(i);
|
||||
std::uniform_real_distribution<double> dist(0.0, 10.0);
|
||||
@@ -67,6 +49,7 @@ struct ContentionEnvironment {
|
||||
// Simulate mixed workload
|
||||
bg_counter.inc(1.0);
|
||||
bg_gauge.set(dist(rng));
|
||||
bg_gauge.inc(1.0);
|
||||
bg_histogram.observe(dist(rng));
|
||||
}
|
||||
});
|
||||
@@ -111,21 +94,24 @@ int main() {
|
||||
ankerl::nanobench::Bench bench;
|
||||
bench.title("WeaselDB Metrics Performance").unit("operation").warmup(1000);
|
||||
|
||||
metric::Family<metric::Gauge> gauge_family =
|
||||
metric::create_gauge("gauge", "");
|
||||
metric::Family<metric::Counter> counter_family =
|
||||
metric::create_counter("counter", "");
|
||||
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
||||
"histogram", "", metric::exponential_buckets(0.001, 5, 7));
|
||||
|
||||
auto counter = counter_family.create({});
|
||||
auto gauge = gauge_family.create({});
|
||||
auto histogram = histogram_family.create({});
|
||||
|
||||
// Baseline performance without contention
|
||||
{
|
||||
auto counter_family =
|
||||
metric::create_counter("baseline_counter", "Baseline counter");
|
||||
auto counter = counter_family.create({{"type", "baseline"}});
|
||||
|
||||
bench.run("counter.inc() - no contention", [&]() {
|
||||
counter.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(counter);
|
||||
});
|
||||
|
||||
auto gauge_family =
|
||||
metric::create_gauge("baseline_gauge", "Baseline gauge");
|
||||
auto gauge = gauge_family.create({{"type", "baseline"}});
|
||||
|
||||
bench.run("gauge.inc() - no contention", [&]() {
|
||||
gauge.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(gauge);
|
||||
@@ -136,11 +122,6 @@ int main() {
|
||||
ankerl::nanobench::doNotOptimizeAway(gauge);
|
||||
});
|
||||
|
||||
auto histogram_family = metric::create_histogram(
|
||||
"baseline_histogram", "Baseline histogram",
|
||||
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0});
|
||||
auto histogram = histogram_family.create({{"type", "baseline"}});
|
||||
|
||||
bench.run("histogram.observe() - no contention", [&]() {
|
||||
histogram.observe(0.5);
|
||||
ankerl::nanobench::doNotOptimizeAway(histogram);
|
||||
@@ -154,25 +135,15 @@ int main() {
|
||||
// Start background threads creating contention
|
||||
env.start_background_contention(8);
|
||||
|
||||
bench.run("counter.inc() - 8 background threads", [&]() {
|
||||
env.counter.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
||||
});
|
||||
bench.run("counter.inc() - 8 background threads",
|
||||
[&]() { counter.inc(1.0); });
|
||||
|
||||
bench.run("gauge.inc() - 8 background threads", [&]() {
|
||||
env.gauge.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.gauge);
|
||||
});
|
||||
bench.run("gauge.inc() - 8 background threads", [&]() { gauge.inc(1.0); });
|
||||
|
||||
bench.run("gauge.set() - 8 background threads", [&]() {
|
||||
env.gauge.set(42.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.gauge);
|
||||
});
|
||||
bench.run("gauge.set() - 8 background threads", [&]() { gauge.set(42.0); });
|
||||
|
||||
bench.run("histogram.observe() - 8 background threads", [&]() {
|
||||
env.histogram.observe(1.5);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.histogram);
|
||||
});
|
||||
bench.run("histogram.observe() - 8 background threads",
|
||||
[&]() { histogram.observe(1.5); });
|
||||
}
|
||||
|
||||
// Concurrent render contention
|
||||
@@ -183,58 +154,14 @@ int main() {
|
||||
env.start_background_contention(4);
|
||||
env.start_render_thread();
|
||||
|
||||
bench.run("counter.inc() - with concurrent render", [&]() {
|
||||
env.counter.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.counter);
|
||||
});
|
||||
bench.run("counter.inc() - with concurrent render",
|
||||
[&]() { counter.inc(1.0); });
|
||||
|
||||
bench.run("gauge.inc() - with concurrent render", [&]() {
|
||||
env.gauge.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.gauge);
|
||||
});
|
||||
bench.run("gauge.inc() - with concurrent render",
|
||||
[&]() { gauge.inc(1.0); });
|
||||
|
||||
bench.run("histogram.observe() - with concurrent render", [&]() {
|
||||
env.histogram.observe(2.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(env.histogram);
|
||||
});
|
||||
}
|
||||
|
||||
// Shared gauge contention
|
||||
{
|
||||
// Test the multi-writer CAS behavior of gauges when multiple threads
|
||||
// create gauges with the same labels. They will all point to the same
|
||||
// underlying state, causing high contention.
|
||||
auto gauge_family =
|
||||
metric::create_gauge("shared_gauge", "Shared gauge test");
|
||||
|
||||
std::atomic<bool> stop_shared{false};
|
||||
std::vector<std::thread> shared_threads;
|
||||
std::latch start_latch{
|
||||
9}; // Force threads to start concurrently (8 background + 1 benchmark)
|
||||
|
||||
for (int i = 0; i < 8; ++i) {
|
||||
shared_threads.emplace_back(
|
||||
[&gauge_family, &stop_shared, &start_latch]() {
|
||||
auto gauge = gauge_family.create({{"shared", "true"}});
|
||||
start_latch.arrive_and_wait(); // All threads start together
|
||||
while (!stop_shared.load(std::memory_order_relaxed)) {
|
||||
gauge.inc(1.0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
auto gauge_for_benchmark = gauge_family.create({{"shared", "true"}});
|
||||
start_latch
|
||||
.arrive_and_wait(); // Benchmark thread waits for all background threads
|
||||
bench.run("gauge.inc() - 8 threads, same labels (contention)", [&]() {
|
||||
gauge_for_benchmark.inc(1.0);
|
||||
ankerl::nanobench::doNotOptimizeAway(gauge_for_benchmark);
|
||||
});
|
||||
|
||||
stop_shared.store(true);
|
||||
for (auto &t : shared_threads) {
|
||||
t.join();
|
||||
}
|
||||
bench.run("histogram.observe() - with concurrent render",
|
||||
[&]() { histogram.observe(2.0); });
|
||||
}
|
||||
|
||||
// Render performance scaling
|
||||
|
||||
110
src/metric.cpp
110
src/metric.cpp
@@ -25,6 +25,12 @@
|
||||
|
||||
#include "format.hpp"
|
||||
|
||||
// Verify that malloc provides sufficient alignment for atomic 128-bit
|
||||
// operations
|
||||
static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16,
|
||||
"Default new alignment must be at least 16 bytes for atomic "
|
||||
"128-bit stores");
|
||||
|
||||
// WeaselDB Metrics System Design:
|
||||
//
|
||||
// THREADING MODEL:
|
||||
@@ -157,8 +163,8 @@ struct Gauge::State {
|
||||
struct Histogram::State {
|
||||
std::vector<double>
|
||||
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
||||
std::vector<uint64_t>
|
||||
counts; // Count per bucket - single writer, no atomics needed
|
||||
std::vector<uint64_t> counts; // Count per bucket - single writer, malloc
|
||||
// provides 16-byte alignment
|
||||
AtomicWord sum; // Sum of observations (double stored as uint64_t bits)
|
||||
AtomicWord observations; // Total observation count (uint64_t)
|
||||
friend struct Metric;
|
||||
@@ -286,7 +292,7 @@ void Counter::inc(double x) {
|
||||
std::to_string(new_value).c_str());
|
||||
}
|
||||
|
||||
p->value = new_value;
|
||||
__atomic_store(&p->value, &new_value, __ATOMIC_RELAXED);
|
||||
}
|
||||
|
||||
Gauge::Gauge() = default;
|
||||
@@ -321,55 +327,74 @@ void Gauge::set(double x) {
|
||||
Histogram::Histogram() = default;
|
||||
|
||||
// Vectorized histogram bucket updates using single-writer + atomic-read design
|
||||
// Since histograms have single-writer semantics, we can bypass atomic writes!
|
||||
// Since histograms have single-writer semantics, we can use architecturally
|
||||
// atomic stores
|
||||
|
||||
// Default implementation
|
||||
__attribute__((target("default"))) static void
|
||||
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||
std::vector<uint64_t> &counts, double x,
|
||||
size_t start_idx) {
|
||||
const size_t size = thresholds.size();
|
||||
|
||||
// Single writer - simple increment, no atomics needed
|
||||
for (size_t i = start_idx; i < size; ++i) {
|
||||
if (x <= thresholds[i]) {
|
||||
counts[i]++;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// AVX2 version - true vectorization with direct memory access
|
||||
#ifdef __x86_64__
|
||||
__attribute__((target("avx2"))) static void
|
||||
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||
// x86-64: 128-bit vectorized with inline assembly atomic stores
|
||||
__attribute__((target("avx"))) static void
|
||||
update_histogram_buckets(const std::vector<double> &thresholds,
|
||||
std::vector<uint64_t> &counts, double x,
|
||||
size_t start_idx) {
|
||||
const size_t size = thresholds.size();
|
||||
size_t i = start_idx;
|
||||
|
||||
// Process 4 buckets at a time with AVX2
|
||||
const __m256d x_vec = _mm256_set1_pd(x);
|
||||
// Process 2 buckets at a time with 128-bit vectors + inline assembly
|
||||
const __m128d x_vec = _mm_set1_pd(x);
|
||||
|
||||
for (; i + 4 <= size; i += 4) {
|
||||
// Vectorized comparison
|
||||
__m256d thresholds_vec = _mm256_loadu_pd(&thresholds[i]);
|
||||
__m256d cmp_result = _mm256_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
|
||||
for (; i + 2 <= size; i += 2) {
|
||||
// Ensure alignment for atomic guarantee (malloc provides 16-byte alignment)
|
||||
assert((reinterpret_cast<uintptr_t>(static_cast<void *>(&counts[i])) &
|
||||
15) == 0 &&
|
||||
"counts array must be 16-byte aligned for atomic 128-bit stores");
|
||||
|
||||
// Convert to increment mask
|
||||
__m256i cmp_as_int = _mm256_castpd_si256(cmp_result);
|
||||
__m256i ones = _mm256_set1_epi64x(1);
|
||||
__m256i increments = _mm256_and_si256(cmp_as_int, ones);
|
||||
// 128-bit vectorized comparison and arithmetic
|
||||
__m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]);
|
||||
__m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
|
||||
__m128i cmp_as_int = _mm_castpd_si128(cmp_result);
|
||||
__m128i ones = _mm_set1_epi64x(1);
|
||||
__m128i increments = _mm_and_si128(cmp_as_int, ones);
|
||||
|
||||
// Vectorized 4-lane add directly to memory
|
||||
__m256i current_counts = _mm256_loadu_si256((__m256i *)&counts[i]);
|
||||
__m256i updated_counts = _mm256_add_epi64(current_counts, increments);
|
||||
_mm256_storeu_si256((__m256i *)&counts[i], updated_counts);
|
||||
// Load current counts and add increments
|
||||
__m128i current_counts = _mm_load_si128((__m128i *)&counts[i]);
|
||||
__m128i updated_counts = _mm_add_epi64(current_counts, increments);
|
||||
|
||||
// Processors that enumerate support for Intel® AVX (by setting the feature
|
||||
// flag CPUID.01H:ECX.AVX[bit 28])
|
||||
// guarantee that the 16-byte memory operations performed by the
|
||||
// following instructions will always be carried out atomically: â¢
|
||||
// MOVAPD, MOVAPS, and MOVDQA. ⢠VMOVAPD, VMOVAPS, and VMOVDQA when
|
||||
// encoded with VEX.128. ⢠VMOVAPD, VMOVAPS, VMOVDQA32, and VMOVDQA64
|
||||
// when encoded with EVEX.128 and k0 (masking disabled). (Note that
|
||||
// these instructions require the linear addresses of their memory
|
||||
// operands to be 16-byte aligned.)
|
||||
__asm__ __volatile__(
|
||||
"vmovdqa %%xmm0, %0"
|
||||
: "=m"(*((__m128i *)&counts[i])) // Output: aligned memory location
|
||||
: "x"(updated_counts) // Input: xmm register
|
||||
: "memory" // Memory clobber
|
||||
);
|
||||
}
|
||||
|
||||
// Handle remainder
|
||||
// Handle remainder with atomic stores
|
||||
for (; i < size; ++i) {
|
||||
if (x <= thresholds[i]) {
|
||||
counts[i]++;
|
||||
__atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED);
|
||||
}
|
||||
}
|
||||
}
|
||||
#else
|
||||
// Fallback implementation for non-x86 architectures
|
||||
static void
|
||||
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||
std::vector<uint64_t> &counts, double x,
|
||||
size_t start_idx) {
|
||||
const size_t size = thresholds.size();
|
||||
|
||||
// Scalar implementation with atomic stores for TSAN compatibility
|
||||
for (size_t i = start_idx; i < size; ++i) {
|
||||
if (x <= thresholds[i]) {
|
||||
__atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -378,9 +403,7 @@ update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||
void Histogram::observe(double x) {
|
||||
assert(p->thresholds.size() == p->counts.size());
|
||||
|
||||
// Use multiversioned auto-vectorized function
|
||||
// Compiler automatically selects best implementation for current CPU
|
||||
update_histogram_buckets_vectorized(p->thresholds, p->counts, x, 0);
|
||||
update_histogram_buckets(p->thresholds, p->counts, x, 0);
|
||||
|
||||
// DESIGN: Single writer per thread allows simple load-modify-store for sum
|
||||
// No CAS loop needed since only one thread writes to this histogram
|
||||
@@ -830,11 +853,6 @@ void Family<Gauge>::register_callback(
|
||||
}
|
||||
|
||||
// Explicit template instantiations to provide member implementations
|
||||
template void Family<Counter>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>>, MetricCallback<Counter>);
|
||||
|
||||
template void Family<Gauge>::register_callback(
|
||||
std::vector<std::pair<std::string, std::string>>, MetricCallback<Gauge>);
|
||||
|
||||
// Static member definitions
|
||||
std::mutex Metric::mutex;
|
||||
|
||||
Reference in New Issue
Block a user