diff --git a/benchmarks/bench_metric.cpp b/benchmarks/bench_metric.cpp index 8ab4b76..4fae4be 100644 --- a/benchmarks/bench_metric.cpp +++ b/benchmarks/bench_metric.cpp @@ -25,7 +25,7 @@ struct ContentionEnvironment { metric::Family counter_family = metric::create_counter("counter", ""); metric::Family histogram_family = metric::create_histogram( - "histogram", "", metric::exponential_buckets(0.001, 5, 7)); + "histogram", "", metric::exponential_buckets(0.001, 5, 8)); ContentionEnvironment() = default; @@ -99,7 +99,7 @@ int main() { metric::Family counter_family = metric::create_counter("counter", ""); metric::Family histogram_family = metric::create_histogram( - "histogram", "", metric::exponential_buckets(0.001, 5, 7)); + "histogram", "", metric::exponential_buckets(0.001, 5, 8)); auto counter = counter_family.create({}); auto gauge = gauge_family.create({}); @@ -176,7 +176,8 @@ int main() { auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge"); auto histogram_family = metric::create_histogram( "scale_histogram", "Scale histogram", - std::initializer_list{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0}); + std::initializer_list{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0, + 50.0}); // Create varying numbers of metrics for (int scale : {10, 100, 1000}) { diff --git a/src/metric.cpp b/src/metric.cpp index ea934b0..a5eb903 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -35,7 +35,8 @@ static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16, // WeaselDB Metrics System Design: // // THREADING MODEL: -// - Counters and Histograms: Per-thread storage, single writer per thread +// - Counters: Per-thread storage, single writer per thread +// - Histograms: Per-thread storage with mutex protection for consistent reads // - Gauges: Global storage with mutex protection (multi-writer) // // PRECISION STRATEGY: @@ -159,16 +160,14 @@ struct Gauge::State { friend struct Metric; }; -// Histogram: Thread-local buckets, single writer per thread +// Histogram: Thread-local buckets with mutex protection per thread struct Histogram::State { std::vector thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf) - std::vector counts; // Count per bucket - single writer, malloc - // provides 16-byte alignment - // TODO this should just be a double like in counter - std::atomic - sum; // Sum of observations (double stored as uint64_t bits) - std::atomic observations; // Total observation count (uint64_t) + std::vector counts; // Count per bucket + double sum; // Sum of observations + uint64_t observations; // Total observation count + std::mutex mutex; // Per-histogram mutex for consistent reads/writes friend struct Metric; }; @@ -273,10 +272,10 @@ struct Metric { // Use buckets from family configuration ptr->thresholds = family->p->buckets; // Already sorted and deduplicated - // Single writer semantics - no atomics needed for bucket counts + // Initialize with zero values, mutex protects all operations ptr->counts = std::vector(ptr->thresholds.size(), 0); - ptr->sum.store(0, std::memory_order_relaxed); - ptr->observations.store(0, std::memory_order_relaxed); + ptr->sum = 0.0; + ptr->observations = 0; } Histogram result; result.p = ptr.get(); @@ -331,28 +330,20 @@ void Gauge::set(double x) { Histogram::Histogram() = default; -// Vectorized histogram bucket updates using single-writer + atomic-read design -// Since histograms have single-writer semantics, we can use architecturally -// atomic stores +// Vectorized histogram bucket updates with mutex protection for consistency +// AVX-optimized implementation for high performance -#ifdef __x86_64__ -// x86-64: 128-bit vectorized with inline assembly atomic stores __attribute__((target("avx"))) static void -update_histogram_buckets(const std::vector &thresholds, - std::vector &counts, double x, - size_t start_idx) { +update_histogram_buckets_simd(const std::vector &thresholds, + std::vector &counts, double x, + size_t start_idx) { const size_t size = thresholds.size(); size_t i = start_idx; - // Process 2 buckets at a time with 128-bit vectors + inline assembly + // Process 2 buckets at a time with 128-bit vectors const __m128d x_vec = _mm_set1_pd(x); for (; i + 2 <= size; i += 2) { - // Ensure alignment for atomic guarantee (malloc provides 16-byte alignment) - assert((reinterpret_cast(static_cast(&counts[i])) & - 15) == 0 && - "counts array must be 16-byte aligned for atomic 128-bit stores"); - // 128-bit vectorized comparison and arithmetic __m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]); __m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ); @@ -361,65 +352,32 @@ update_histogram_buckets(const std::vector &thresholds, __m128i increments = _mm_and_si128(cmp_as_int, ones); // Load current counts and add increments - __m128i current_counts = _mm_load_si128((__m128i *)&counts[i]); + __m128i current_counts = _mm_loadu_si128((__m128i *)&counts[i]); __m128i updated_counts = _mm_add_epi64(current_counts, increments); - // Processors that enumerate support for Intel® AVX (by setting the feature - // flag CPUID.01H:ECX.AVX[bit 28]) - // guarantee that the 16-byte memory operations performed by the - // following instructions will always be carried out atomically: ⢠- // MOVAPD, MOVAPS, and MOVDQA. ⢠VMOVAPD, VMOVAPS, and VMOVDQA when - // encoded with VEX.128. ⢠VMOVAPD, VMOVAPS, VMOVDQA32, and VMOVDQA64 - // when encoded with EVEX.128 and k0 (masking disabled). (Note that - // these instructions require the linear addresses of their memory - // operands to be 16-byte aligned.) - __asm__ __volatile__( - "vmovdqa %%xmm0, %0" - : "=m"(*((__m128i *)&counts[i])) // Output: aligned memory location - : "x"(updated_counts) // Input: xmm register - : "memory" // Memory clobber - ); - // We don't actually need it to be atomic across 128-bits, but that's - // sufficient to guarantee each 64 bit half is atomic. + // Store updated counts + _mm_storeu_si128((__m128i *)&counts[i], updated_counts); } - // Handle remainder with atomic stores + // Handle remainder with scalar operations for (; i < size; ++i) { if (x <= thresholds[i]) { - __atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED); + counts[i]++; } } } -#else -// Fallback implementation for non-x86 architectures -static void -update_histogram_buckets_vectorized(const std::vector &thresholds, - std::vector &counts, double x, - size_t start_idx) { - const size_t size = thresholds.size(); - - // Scalar implementation with atomic stores for TSAN compatibility - for (size_t i = start_idx; i < size; ++i) { - if (x <= thresholds[i]) { - __atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED); - } - } -} -#endif void Histogram::observe(double x) { assert(p->thresholds.size() == p->counts.size()); - update_histogram_buckets(p->thresholds, p->counts, x, 0); + std::lock_guard lock(p->mutex); - // DESIGN: Single writer per thread allows simple load-modify-store for sum - // No CAS loop needed since only one thread writes to this histogram - auto current_sum = - std::bit_cast(p->sum.load(std::memory_order_relaxed)); - p->sum.store(std::bit_cast(current_sum + x), - std::memory_order_relaxed); + // Update bucket counts using SIMD + update_histogram_buckets_simd(p->thresholds, p->counts, x, 0); - p->observations.fetch_add(1, std::memory_order_relaxed); + // Update sum and observation count + p->sum += x; + p->observations++; } template <> Family::Family() = default; @@ -504,11 +462,8 @@ Family create_histogram(std::string name, std::string help, familyPtr->buckets.erase( std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()), familyPtr->buckets.end()); - // +Inf bucket captures all observations (Prometheus requirement) - if (familyPtr->buckets.empty() || - familyPtr->buckets.back() != std::numeric_limits::infinity()) { - familyPtr->buckets.push_back(std::numeric_limits::infinity()); - } + // Note: +Inf bucket is not stored explicitly - we use total observations + // count } else { validate_or_abort( familyPtr->help == help, @@ -519,10 +474,8 @@ Family create_histogram(std::string name, std::string help, new_buckets_vec.erase( std::unique(new_buckets_vec.begin(), new_buckets_vec.end()), new_buckets_vec.end()); - if (new_buckets_vec.empty() || - new_buckets_vec.back() != std::numeric_limits::infinity()) { - new_buckets_vec.push_back(std::numeric_limits::infinity()); - } + // Note: +Inf bucket is not stored explicitly - we use total observations + // count validate_or_abort(familyPtr->buckets == new_buckets_vec, "metric family already registered with different buckets", name.c_str()); @@ -766,43 +719,60 @@ std::span render(ArenaAllocator &arena) { std::vector> bucket_labels_sv; for (const auto &[thread_id, per_thread] : family->perThreadState) { for (const auto &[labels_key, instance] : per_thread.instances) { - for (size_t i = 0; i < instance->thresholds.size(); ++i) { + // Extract data under lock - minimize critical section + std::vector thresholds_snapshot; + std::vector counts_snapshot; + double sum_snapshot; + uint64_t observations_snapshot; + + { + std::lock_guard lock(instance->mutex); + thresholds_snapshot = instance->thresholds; + counts_snapshot = instance->counts; + sum_snapshot = instance->sum; + observations_snapshot = instance->observations; + } + + // Render explicit bucket counts outside critical section + for (size_t i = 0; i < thresholds_snapshot.size(); ++i) { bucket_labels_sv.clear(); for (const auto &l : labels_key.labels) bucket_labels_sv.push_back(l); - if (std::isinf(instance->thresholds[i])) { - bucket_labels_sv.push_back({"le", "+Inf"}); - } else { - bucket_labels_sv.push_back( - {"le", format(arena, "%.17g", instance->thresholds[i])}); - } - // Atomic read from render thread - single writer doesn't need atomic - // writes - auto count = __atomic_load_n(&instance->counts[i], __ATOMIC_RELAXED); + bucket_labels_sv.push_back( + {"le", static_format(arena, thresholds_snapshot[i])}); auto labels = format_labels(bucket_labels_sv); - output.push_back(format(arena, "%s_bucket%.*s %llu\n", name.c_str(), - static_cast(labels.length()), - labels.data(), - static_cast(count))); + output.push_back( + format(arena, "%s_bucket%.*s %llu\n", name.c_str(), + static_cast(labels.length()), labels.data(), + static_cast(counts_snapshot[i]))); } - auto sum_value = std::bit_cast( - instance->sum.load(std::memory_order_relaxed)); + // Render +Inf bucket using total observations count + bucket_labels_sv.clear(); + for (const auto &l : labels_key.labels) + bucket_labels_sv.push_back(l); + bucket_labels_sv.push_back({"le", "+Inf"}); + auto inf_labels = format_labels(bucket_labels_sv); + output.push_back( + format(arena, "%s_bucket%.*s %llu\n", name.c_str(), + static_cast(inf_labels.length()), inf_labels.data(), + static_cast(observations_snapshot))); + + // Render sum outside critical section bucket_labels_sv.clear(); for (const auto &l : labels_key.labels) bucket_labels_sv.push_back(l); auto labels = format_labels(bucket_labels_sv); output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(), static_cast(labels.length()), - labels.data(), sum_value)); + labels.data(), sum_snapshot)); - auto count_value = - instance->observations.load(std::memory_order_relaxed); - output.push_back(format(arena, "%s_count%.*s %llu\n", name.c_str(), - static_cast(labels.length()), - labels.data(), - static_cast(count_value))); + // Render count outside critical section + output.push_back( + format(arena, "%s_count%.*s %llu\n", name.c_str(), + static_cast(labels.length()), labels.data(), + static_cast(observations_snapshot))); } } }