diff --git a/benchmarks/bench_metric.cpp b/benchmarks/bench_metric.cpp index 74b45f0..8ab4b76 100644 --- a/benchmarks/bench_metric.cpp +++ b/benchmarks/bench_metric.cpp @@ -20,29 +20,14 @@ struct ContentionEnvironment { std::unique_ptr contention_latch; std::unique_ptr render_latch; - // Metrics for testing - metric::Family counter_family; - metric::Family gauge_family; - metric::Family histogram_family; + metric::Family gauge_family = + metric::create_gauge("gauge", ""); + metric::Family counter_family = + metric::create_counter("counter", ""); + metric::Family histogram_family = metric::create_histogram( + "histogram", "", metric::exponential_buckets(0.001, 5, 7)); - // Test instances - metric::Counter counter; - metric::Gauge gauge; - metric::Histogram histogram; - - ContentionEnvironment() - : counter_family( - metric::create_counter("bench_counter", "Benchmark counter")), - gauge_family(metric::create_gauge("bench_gauge", "Benchmark gauge")), - histogram_family( - metric::create_histogram("bench_histogram", "Benchmark histogram", - // 7 explicit buckets + automatic +Inf = 8 - // total (optimal for SIMD: 2x4 buckets) - std::initializer_list{ - 0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0})), - counter(counter_family.create({{"benchmark", "contention"}})), - gauge(gauge_family.create({{"benchmark", "contention"}})), - histogram(histogram_family.create({{"benchmark", "contention"}})) {} + ContentionEnvironment() = default; void start_background_contention(int num_threads = 4) { stop_flag.store(false); @@ -50,12 +35,9 @@ struct ContentionEnvironment { for (int i = 0; i < num_threads; ++i) { background_threads.emplace_back([this, i]() { - // Each background thread creates its own metrics to avoid conflicts - auto bg_counter = - counter_family.create({{"thread", std::to_string(i)}}); - auto bg_gauge = gauge_family.create({{"bg_thread", std::to_string(i)}}); - auto bg_histogram = - histogram_family.create({{"bg_thread", std::to_string(i)}}); + auto bg_counter = counter_family.create({}); + auto bg_gauge = gauge_family.create({}); + auto bg_histogram = histogram_family.create({}); std::mt19937 rng(i); std::uniform_real_distribution dist(0.0, 10.0); @@ -67,6 +49,7 @@ struct ContentionEnvironment { // Simulate mixed workload bg_counter.inc(1.0); bg_gauge.set(dist(rng)); + bg_gauge.inc(1.0); bg_histogram.observe(dist(rng)); } }); @@ -111,21 +94,24 @@ int main() { ankerl::nanobench::Bench bench; bench.title("WeaselDB Metrics Performance").unit("operation").warmup(1000); + metric::Family gauge_family = + metric::create_gauge("gauge", ""); + metric::Family counter_family = + metric::create_counter("counter", ""); + metric::Family histogram_family = metric::create_histogram( + "histogram", "", metric::exponential_buckets(0.001, 5, 7)); + + auto counter = counter_family.create({}); + auto gauge = gauge_family.create({}); + auto histogram = histogram_family.create({}); + // Baseline performance without contention { - auto counter_family = - metric::create_counter("baseline_counter", "Baseline counter"); - auto counter = counter_family.create({{"type", "baseline"}}); - bench.run("counter.inc() - no contention", [&]() { counter.inc(1.0); ankerl::nanobench::doNotOptimizeAway(counter); }); - auto gauge_family = - metric::create_gauge("baseline_gauge", "Baseline gauge"); - auto gauge = gauge_family.create({{"type", "baseline"}}); - bench.run("gauge.inc() - no contention", [&]() { gauge.inc(1.0); ankerl::nanobench::doNotOptimizeAway(gauge); @@ -136,11 +122,6 @@ int main() { ankerl::nanobench::doNotOptimizeAway(gauge); }); - auto histogram_family = metric::create_histogram( - "baseline_histogram", "Baseline histogram", - std::initializer_list{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0}); - auto histogram = histogram_family.create({{"type", "baseline"}}); - bench.run("histogram.observe() - no contention", [&]() { histogram.observe(0.5); ankerl::nanobench::doNotOptimizeAway(histogram); @@ -154,25 +135,15 @@ int main() { // Start background threads creating contention env.start_background_contention(8); - bench.run("counter.inc() - 8 background threads", [&]() { - env.counter.inc(1.0); - ankerl::nanobench::doNotOptimizeAway(env.counter); - }); + bench.run("counter.inc() - 8 background threads", + [&]() { counter.inc(1.0); }); - bench.run("gauge.inc() - 8 background threads", [&]() { - env.gauge.inc(1.0); - ankerl::nanobench::doNotOptimizeAway(env.gauge); - }); + bench.run("gauge.inc() - 8 background threads", [&]() { gauge.inc(1.0); }); - bench.run("gauge.set() - 8 background threads", [&]() { - env.gauge.set(42.0); - ankerl::nanobench::doNotOptimizeAway(env.gauge); - }); + bench.run("gauge.set() - 8 background threads", [&]() { gauge.set(42.0); }); - bench.run("histogram.observe() - 8 background threads", [&]() { - env.histogram.observe(1.5); - ankerl::nanobench::doNotOptimizeAway(env.histogram); - }); + bench.run("histogram.observe() - 8 background threads", + [&]() { histogram.observe(1.5); }); } // Concurrent render contention @@ -183,58 +154,14 @@ int main() { env.start_background_contention(4); env.start_render_thread(); - bench.run("counter.inc() - with concurrent render", [&]() { - env.counter.inc(1.0); - ankerl::nanobench::doNotOptimizeAway(env.counter); - }); + bench.run("counter.inc() - with concurrent render", + [&]() { counter.inc(1.0); }); - bench.run("gauge.inc() - with concurrent render", [&]() { - env.gauge.inc(1.0); - ankerl::nanobench::doNotOptimizeAway(env.gauge); - }); + bench.run("gauge.inc() - with concurrent render", + [&]() { gauge.inc(1.0); }); - bench.run("histogram.observe() - with concurrent render", [&]() { - env.histogram.observe(2.0); - ankerl::nanobench::doNotOptimizeAway(env.histogram); - }); - } - - // Shared gauge contention - { - // Test the multi-writer CAS behavior of gauges when multiple threads - // create gauges with the same labels. They will all point to the same - // underlying state, causing high contention. - auto gauge_family = - metric::create_gauge("shared_gauge", "Shared gauge test"); - - std::atomic stop_shared{false}; - std::vector shared_threads; - std::latch start_latch{ - 9}; // Force threads to start concurrently (8 background + 1 benchmark) - - for (int i = 0; i < 8; ++i) { - shared_threads.emplace_back( - [&gauge_family, &stop_shared, &start_latch]() { - auto gauge = gauge_family.create({{"shared", "true"}}); - start_latch.arrive_and_wait(); // All threads start together - while (!stop_shared.load(std::memory_order_relaxed)) { - gauge.inc(1.0); - } - }); - } - - auto gauge_for_benchmark = gauge_family.create({{"shared", "true"}}); - start_latch - .arrive_and_wait(); // Benchmark thread waits for all background threads - bench.run("gauge.inc() - 8 threads, same labels (contention)", [&]() { - gauge_for_benchmark.inc(1.0); - ankerl::nanobench::doNotOptimizeAway(gauge_for_benchmark); - }); - - stop_shared.store(true); - for (auto &t : shared_threads) { - t.join(); - } + bench.run("histogram.observe() - with concurrent render", + [&]() { histogram.observe(2.0); }); } // Render performance scaling diff --git a/src/metric.cpp b/src/metric.cpp index 1cd5af4..385ea9c 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -25,6 +25,12 @@ #include "format.hpp" +// Verify that malloc provides sufficient alignment for atomic 128-bit +// operations +static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16, + "Default new alignment must be at least 16 bytes for atomic " + "128-bit stores"); + // WeaselDB Metrics System Design: // // THREADING MODEL: @@ -157,8 +163,8 @@ struct Gauge::State { struct Histogram::State { std::vector thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf) - std::vector - counts; // Count per bucket - single writer, no atomics needed + std::vector counts; // Count per bucket - single writer, malloc + // provides 16-byte alignment AtomicWord sum; // Sum of observations (double stored as uint64_t bits) AtomicWord observations; // Total observation count (uint64_t) friend struct Metric; @@ -286,7 +292,7 @@ void Counter::inc(double x) { std::to_string(new_value).c_str()); } - p->value = new_value; + __atomic_store(&p->value, &new_value, __ATOMIC_RELAXED); } Gauge::Gauge() = default; @@ -321,55 +327,74 @@ void Gauge::set(double x) { Histogram::Histogram() = default; // Vectorized histogram bucket updates using single-writer + atomic-read design -// Since histograms have single-writer semantics, we can bypass atomic writes! +// Since histograms have single-writer semantics, we can use architecturally +// atomic stores -// Default implementation -__attribute__((target("default"))) static void -update_histogram_buckets_vectorized(const std::vector &thresholds, - std::vector &counts, double x, - size_t start_idx) { - const size_t size = thresholds.size(); - - // Single writer - simple increment, no atomics needed - for (size_t i = start_idx; i < size; ++i) { - if (x <= thresholds[i]) { - counts[i]++; - } - } -} - -// AVX2 version - true vectorization with direct memory access #ifdef __x86_64__ -__attribute__((target("avx2"))) static void -update_histogram_buckets_vectorized(const std::vector &thresholds, - std::vector &counts, double x, - size_t start_idx) { +// x86-64: 128-bit vectorized with inline assembly atomic stores +__attribute__((target("avx"))) static void +update_histogram_buckets(const std::vector &thresholds, + std::vector &counts, double x, + size_t start_idx) { const size_t size = thresholds.size(); size_t i = start_idx; - // Process 4 buckets at a time with AVX2 - const __m256d x_vec = _mm256_set1_pd(x); + // Process 2 buckets at a time with 128-bit vectors + inline assembly + const __m128d x_vec = _mm_set1_pd(x); - for (; i + 4 <= size; i += 4) { - // Vectorized comparison - __m256d thresholds_vec = _mm256_loadu_pd(&thresholds[i]); - __m256d cmp_result = _mm256_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ); + for (; i + 2 <= size; i += 2) { + // Ensure alignment for atomic guarantee (malloc provides 16-byte alignment) + assert((reinterpret_cast(static_cast(&counts[i])) & + 15) == 0 && + "counts array must be 16-byte aligned for atomic 128-bit stores"); - // Convert to increment mask - __m256i cmp_as_int = _mm256_castpd_si256(cmp_result); - __m256i ones = _mm256_set1_epi64x(1); - __m256i increments = _mm256_and_si256(cmp_as_int, ones); + // 128-bit vectorized comparison and arithmetic + __m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]); + __m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ); + __m128i cmp_as_int = _mm_castpd_si128(cmp_result); + __m128i ones = _mm_set1_epi64x(1); + __m128i increments = _mm_and_si128(cmp_as_int, ones); - // Vectorized 4-lane add directly to memory - __m256i current_counts = _mm256_loadu_si256((__m256i *)&counts[i]); - __m256i updated_counts = _mm256_add_epi64(current_counts, increments); - _mm256_storeu_si256((__m256i *)&counts[i], updated_counts); + // Load current counts and add increments + __m128i current_counts = _mm_load_si128((__m128i *)&counts[i]); + __m128i updated_counts = _mm_add_epi64(current_counts, increments); + + // Processors that enumerate support for Intel® AVX (by setting the feature + // flag CPUID.01H:ECX.AVX[bit 28]) + // guarantee that the 16-byte memory operations performed by the + // following instructions will always be carried out atomically: ⢠+ // MOVAPD, MOVAPS, and MOVDQA. ⢠VMOVAPD, VMOVAPS, and VMOVDQA when + // encoded with VEX.128. ⢠VMOVAPD, VMOVAPS, VMOVDQA32, and VMOVDQA64 + // when encoded with EVEX.128 and k0 (masking disabled). (Note that + // these instructions require the linear addresses of their memory + // operands to be 16-byte aligned.) + __asm__ __volatile__( + "vmovdqa %%xmm0, %0" + : "=m"(*((__m128i *)&counts[i])) // Output: aligned memory location + : "x"(updated_counts) // Input: xmm register + : "memory" // Memory clobber + ); } - // Handle remainder + // Handle remainder with atomic stores for (; i < size; ++i) { if (x <= thresholds[i]) { - counts[i]++; + __atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED); + } + } +} +#else +// Fallback implementation for non-x86 architectures +static void +update_histogram_buckets_vectorized(const std::vector &thresholds, + std::vector &counts, double x, + size_t start_idx) { + const size_t size = thresholds.size(); + + // Scalar implementation with atomic stores for TSAN compatibility + for (size_t i = start_idx; i < size; ++i) { + if (x <= thresholds[i]) { + __atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED); } } } @@ -378,9 +403,7 @@ update_histogram_buckets_vectorized(const std::vector &thresholds, void Histogram::observe(double x) { assert(p->thresholds.size() == p->counts.size()); - // Use multiversioned auto-vectorized function - // Compiler automatically selects best implementation for current CPU - update_histogram_buckets_vectorized(p->thresholds, p->counts, x, 0); + update_histogram_buckets(p->thresholds, p->counts, x, 0); // DESIGN: Single writer per thread allows simple load-modify-store for sum // No CAS loop needed since only one thread writes to this histogram @@ -830,11 +853,6 @@ void Family::register_callback( } // Explicit template instantiations to provide member implementations -template void Family::register_callback( - std::vector>, MetricCallback); - -template void Family::register_callback( - std::vector>, MetricCallback); // Static member definitions std::mutex Metric::mutex;