Make histograms atomic
E.g. count and sum should be consistent with each other
This commit is contained in:
@@ -25,7 +25,7 @@ struct ContentionEnvironment {
|
|||||||
metric::Family<metric::Counter> counter_family =
|
metric::Family<metric::Counter> counter_family =
|
||||||
metric::create_counter("counter", "");
|
metric::create_counter("counter", "");
|
||||||
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
||||||
"histogram", "", metric::exponential_buckets(0.001, 5, 7));
|
"histogram", "", metric::exponential_buckets(0.001, 5, 8));
|
||||||
|
|
||||||
ContentionEnvironment() = default;
|
ContentionEnvironment() = default;
|
||||||
|
|
||||||
@@ -99,7 +99,7 @@ int main() {
|
|||||||
metric::Family<metric::Counter> counter_family =
|
metric::Family<metric::Counter> counter_family =
|
||||||
metric::create_counter("counter", "");
|
metric::create_counter("counter", "");
|
||||||
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
metric::Family<metric::Histogram> histogram_family = metric::create_histogram(
|
||||||
"histogram", "", metric::exponential_buckets(0.001, 5, 7));
|
"histogram", "", metric::exponential_buckets(0.001, 5, 8));
|
||||||
|
|
||||||
auto counter = counter_family.create({});
|
auto counter = counter_family.create({});
|
||||||
auto gauge = gauge_family.create({});
|
auto gauge = gauge_family.create({});
|
||||||
@@ -176,7 +176,8 @@ int main() {
|
|||||||
auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge");
|
auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge");
|
||||||
auto histogram_family = metric::create_histogram(
|
auto histogram_family = metric::create_histogram(
|
||||||
"scale_histogram", "Scale histogram",
|
"scale_histogram", "Scale histogram",
|
||||||
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0});
|
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0,
|
||||||
|
50.0});
|
||||||
|
|
||||||
// Create varying numbers of metrics
|
// Create varying numbers of metrics
|
||||||
for (int scale : {10, 100, 1000}) {
|
for (int scale : {10, 100, 1000}) {
|
||||||
|
|||||||
172
src/metric.cpp
172
src/metric.cpp
@@ -35,7 +35,8 @@ static_assert(__STDCPP_DEFAULT_NEW_ALIGNMENT__ >= 16,
|
|||||||
// WeaselDB Metrics System Design:
|
// WeaselDB Metrics System Design:
|
||||||
//
|
//
|
||||||
// THREADING MODEL:
|
// THREADING MODEL:
|
||||||
// - Counters and Histograms: Per-thread storage, single writer per thread
|
// - Counters: Per-thread storage, single writer per thread
|
||||||
|
// - Histograms: Per-thread storage with mutex protection for consistent reads
|
||||||
// - Gauges: Global storage with mutex protection (multi-writer)
|
// - Gauges: Global storage with mutex protection (multi-writer)
|
||||||
//
|
//
|
||||||
// PRECISION STRATEGY:
|
// PRECISION STRATEGY:
|
||||||
@@ -159,16 +160,14 @@ struct Gauge::State {
|
|||||||
friend struct Metric;
|
friend struct Metric;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Histogram: Thread-local buckets, single writer per thread
|
// Histogram: Thread-local buckets with mutex protection per thread
|
||||||
struct Histogram::State {
|
struct Histogram::State {
|
||||||
std::vector<double>
|
std::vector<double>
|
||||||
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
||||||
std::vector<uint64_t> counts; // Count per bucket - single writer, malloc
|
std::vector<uint64_t> counts; // Count per bucket
|
||||||
// provides 16-byte alignment
|
double sum; // Sum of observations
|
||||||
// TODO this should just be a double like in counter
|
uint64_t observations; // Total observation count
|
||||||
std::atomic<uint64_t>
|
std::mutex mutex; // Per-histogram mutex for consistent reads/writes
|
||||||
sum; // Sum of observations (double stored as uint64_t bits)
|
|
||||||
std::atomic<uint64_t> observations; // Total observation count (uint64_t)
|
|
||||||
friend struct Metric;
|
friend struct Metric;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -273,10 +272,10 @@ struct Metric {
|
|||||||
// Use buckets from family configuration
|
// Use buckets from family configuration
|
||||||
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated
|
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated
|
||||||
|
|
||||||
// Single writer semantics - no atomics needed for bucket counts
|
// Initialize with zero values, mutex protects all operations
|
||||||
ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0);
|
ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0);
|
||||||
ptr->sum.store(0, std::memory_order_relaxed);
|
ptr->sum = 0.0;
|
||||||
ptr->observations.store(0, std::memory_order_relaxed);
|
ptr->observations = 0;
|
||||||
}
|
}
|
||||||
Histogram result;
|
Histogram result;
|
||||||
result.p = ptr.get();
|
result.p = ptr.get();
|
||||||
@@ -331,28 +330,20 @@ void Gauge::set(double x) {
|
|||||||
|
|
||||||
Histogram::Histogram() = default;
|
Histogram::Histogram() = default;
|
||||||
|
|
||||||
// Vectorized histogram bucket updates using single-writer + atomic-read design
|
// Vectorized histogram bucket updates with mutex protection for consistency
|
||||||
// Since histograms have single-writer semantics, we can use architecturally
|
// AVX-optimized implementation for high performance
|
||||||
// atomic stores
|
|
||||||
|
|
||||||
#ifdef __x86_64__
|
|
||||||
// x86-64: 128-bit vectorized with inline assembly atomic stores
|
|
||||||
__attribute__((target("avx"))) static void
|
__attribute__((target("avx"))) static void
|
||||||
update_histogram_buckets(const std::vector<double> &thresholds,
|
update_histogram_buckets_simd(const std::vector<double> &thresholds,
|
||||||
std::vector<uint64_t> &counts, double x,
|
std::vector<uint64_t> &counts, double x,
|
||||||
size_t start_idx) {
|
size_t start_idx) {
|
||||||
const size_t size = thresholds.size();
|
const size_t size = thresholds.size();
|
||||||
size_t i = start_idx;
|
size_t i = start_idx;
|
||||||
|
|
||||||
// Process 2 buckets at a time with 128-bit vectors + inline assembly
|
// Process 2 buckets at a time with 128-bit vectors
|
||||||
const __m128d x_vec = _mm_set1_pd(x);
|
const __m128d x_vec = _mm_set1_pd(x);
|
||||||
|
|
||||||
for (; i + 2 <= size; i += 2) {
|
for (; i + 2 <= size; i += 2) {
|
||||||
// Ensure alignment for atomic guarantee (malloc provides 16-byte alignment)
|
|
||||||
assert((reinterpret_cast<uintptr_t>(static_cast<void *>(&counts[i])) &
|
|
||||||
15) == 0 &&
|
|
||||||
"counts array must be 16-byte aligned for atomic 128-bit stores");
|
|
||||||
|
|
||||||
// 128-bit vectorized comparison and arithmetic
|
// 128-bit vectorized comparison and arithmetic
|
||||||
__m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]);
|
__m128d thresholds_vec = _mm_loadu_pd(&thresholds[i]);
|
||||||
__m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
|
__m128d cmp_result = _mm_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
|
||||||
@@ -361,65 +352,32 @@ update_histogram_buckets(const std::vector<double> &thresholds,
|
|||||||
__m128i increments = _mm_and_si128(cmp_as_int, ones);
|
__m128i increments = _mm_and_si128(cmp_as_int, ones);
|
||||||
|
|
||||||
// Load current counts and add increments
|
// Load current counts and add increments
|
||||||
__m128i current_counts = _mm_load_si128((__m128i *)&counts[i]);
|
__m128i current_counts = _mm_loadu_si128((__m128i *)&counts[i]);
|
||||||
__m128i updated_counts = _mm_add_epi64(current_counts, increments);
|
__m128i updated_counts = _mm_add_epi64(current_counts, increments);
|
||||||
|
|
||||||
// Processors that enumerate support for Intel® AVX (by setting the feature
|
// Store updated counts
|
||||||
// flag CPUID.01H:ECX.AVX[bit 28])
|
_mm_storeu_si128((__m128i *)&counts[i], updated_counts);
|
||||||
// guarantee that the 16-byte memory operations performed by the
|
|
||||||
// following instructions will always be carried out atomically: â¢
|
|
||||||
// MOVAPD, MOVAPS, and MOVDQA. ⢠VMOVAPD, VMOVAPS, and VMOVDQA when
|
|
||||||
// encoded with VEX.128. ⢠VMOVAPD, VMOVAPS, VMOVDQA32, and VMOVDQA64
|
|
||||||
// when encoded with EVEX.128 and k0 (masking disabled). (Note that
|
|
||||||
// these instructions require the linear addresses of their memory
|
|
||||||
// operands to be 16-byte aligned.)
|
|
||||||
__asm__ __volatile__(
|
|
||||||
"vmovdqa %%xmm0, %0"
|
|
||||||
: "=m"(*((__m128i *)&counts[i])) // Output: aligned memory location
|
|
||||||
: "x"(updated_counts) // Input: xmm register
|
|
||||||
: "memory" // Memory clobber
|
|
||||||
);
|
|
||||||
// We don't actually need it to be atomic across 128-bits, but that's
|
|
||||||
// sufficient to guarantee each 64 bit half is atomic.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle remainder with atomic stores
|
// Handle remainder with scalar operations
|
||||||
for (; i < size; ++i) {
|
for (; i < size; ++i) {
|
||||||
if (x <= thresholds[i]) {
|
if (x <= thresholds[i]) {
|
||||||
__atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED);
|
counts[i]++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#else
|
|
||||||
// Fallback implementation for non-x86 architectures
|
|
||||||
static void
|
|
||||||
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
|
||||||
std::vector<uint64_t> &counts, double x,
|
|
||||||
size_t start_idx) {
|
|
||||||
const size_t size = thresholds.size();
|
|
||||||
|
|
||||||
// Scalar implementation with atomic stores for TSAN compatibility
|
|
||||||
for (size_t i = start_idx; i < size; ++i) {
|
|
||||||
if (x <= thresholds[i]) {
|
|
||||||
__atomic_store_n(&counts[i], counts[i] + 1, __ATOMIC_RELAXED);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void Histogram::observe(double x) {
|
void Histogram::observe(double x) {
|
||||||
assert(p->thresholds.size() == p->counts.size());
|
assert(p->thresholds.size() == p->counts.size());
|
||||||
|
|
||||||
update_histogram_buckets(p->thresholds, p->counts, x, 0);
|
std::lock_guard<std::mutex> lock(p->mutex);
|
||||||
|
|
||||||
// DESIGN: Single writer per thread allows simple load-modify-store for sum
|
// Update bucket counts using SIMD
|
||||||
// No CAS loop needed since only one thread writes to this histogram
|
update_histogram_buckets_simd(p->thresholds, p->counts, x, 0);
|
||||||
auto current_sum =
|
|
||||||
std::bit_cast<double>(p->sum.load(std::memory_order_relaxed));
|
|
||||||
p->sum.store(std::bit_cast<uint64_t>(current_sum + x),
|
|
||||||
std::memory_order_relaxed);
|
|
||||||
|
|
||||||
p->observations.fetch_add(1, std::memory_order_relaxed);
|
// Update sum and observation count
|
||||||
|
p->sum += x;
|
||||||
|
p->observations++;
|
||||||
}
|
}
|
||||||
|
|
||||||
template <> Family<Counter>::Family() = default;
|
template <> Family<Counter>::Family() = default;
|
||||||
@@ -504,11 +462,8 @@ Family<Histogram> create_histogram(std::string name, std::string help,
|
|||||||
familyPtr->buckets.erase(
|
familyPtr->buckets.erase(
|
||||||
std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()),
|
std::unique(familyPtr->buckets.begin(), familyPtr->buckets.end()),
|
||||||
familyPtr->buckets.end());
|
familyPtr->buckets.end());
|
||||||
// +Inf bucket captures all observations (Prometheus requirement)
|
// Note: +Inf bucket is not stored explicitly - we use total observations
|
||||||
if (familyPtr->buckets.empty() ||
|
// count
|
||||||
familyPtr->buckets.back() != std::numeric_limits<double>::infinity()) {
|
|
||||||
familyPtr->buckets.push_back(std::numeric_limits<double>::infinity());
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
validate_or_abort(
|
validate_or_abort(
|
||||||
familyPtr->help == help,
|
familyPtr->help == help,
|
||||||
@@ -519,10 +474,8 @@ Family<Histogram> create_histogram(std::string name, std::string help,
|
|||||||
new_buckets_vec.erase(
|
new_buckets_vec.erase(
|
||||||
std::unique(new_buckets_vec.begin(), new_buckets_vec.end()),
|
std::unique(new_buckets_vec.begin(), new_buckets_vec.end()),
|
||||||
new_buckets_vec.end());
|
new_buckets_vec.end());
|
||||||
if (new_buckets_vec.empty() ||
|
// Note: +Inf bucket is not stored explicitly - we use total observations
|
||||||
new_buckets_vec.back() != std::numeric_limits<double>::infinity()) {
|
// count
|
||||||
new_buckets_vec.push_back(std::numeric_limits<double>::infinity());
|
|
||||||
}
|
|
||||||
validate_or_abort(familyPtr->buckets == new_buckets_vec,
|
validate_or_abort(familyPtr->buckets == new_buckets_vec,
|
||||||
"metric family already registered with different buckets",
|
"metric family already registered with different buckets",
|
||||||
name.c_str());
|
name.c_str());
|
||||||
@@ -766,43 +719,60 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv;
|
std::vector<std::pair<std::string_view, std::string_view>> bucket_labels_sv;
|
||||||
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
||||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
for (size_t i = 0; i < instance->thresholds.size(); ++i) {
|
// Extract data under lock - minimize critical section
|
||||||
|
std::vector<double> thresholds_snapshot;
|
||||||
|
std::vector<uint64_t> counts_snapshot;
|
||||||
|
double sum_snapshot;
|
||||||
|
uint64_t observations_snapshot;
|
||||||
|
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(instance->mutex);
|
||||||
|
thresholds_snapshot = instance->thresholds;
|
||||||
|
counts_snapshot = instance->counts;
|
||||||
|
sum_snapshot = instance->sum;
|
||||||
|
observations_snapshot = instance->observations;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Render explicit bucket counts outside critical section
|
||||||
|
for (size_t i = 0; i < thresholds_snapshot.size(); ++i) {
|
||||||
bucket_labels_sv.clear();
|
bucket_labels_sv.clear();
|
||||||
for (const auto &l : labels_key.labels)
|
for (const auto &l : labels_key.labels)
|
||||||
bucket_labels_sv.push_back(l);
|
bucket_labels_sv.push_back(l);
|
||||||
|
|
||||||
if (std::isinf(instance->thresholds[i])) {
|
bucket_labels_sv.push_back(
|
||||||
bucket_labels_sv.push_back({"le", "+Inf"});
|
{"le", static_format(arena, thresholds_snapshot[i])});
|
||||||
} else {
|
|
||||||
bucket_labels_sv.push_back(
|
|
||||||
{"le", format(arena, "%.17g", instance->thresholds[i])});
|
|
||||||
}
|
|
||||||
// Atomic read from render thread - single writer doesn't need atomic
|
|
||||||
// writes
|
|
||||||
auto count = __atomic_load_n(&instance->counts[i], __ATOMIC_RELAXED);
|
|
||||||
auto labels = format_labels(bucket_labels_sv);
|
auto labels = format_labels(bucket_labels_sv);
|
||||||
output.push_back(format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
output.push_back(
|
||||||
static_cast<int>(labels.length()),
|
format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
||||||
labels.data(),
|
static_cast<int>(labels.length()), labels.data(),
|
||||||
static_cast<unsigned long long>(count)));
|
static_cast<unsigned long long>(counts_snapshot[i])));
|
||||||
}
|
}
|
||||||
|
|
||||||
auto sum_value = std::bit_cast<double>(
|
// Render +Inf bucket using total observations count
|
||||||
instance->sum.load(std::memory_order_relaxed));
|
bucket_labels_sv.clear();
|
||||||
|
for (const auto &l : labels_key.labels)
|
||||||
|
bucket_labels_sv.push_back(l);
|
||||||
|
bucket_labels_sv.push_back({"le", "+Inf"});
|
||||||
|
auto inf_labels = format_labels(bucket_labels_sv);
|
||||||
|
output.push_back(
|
||||||
|
format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
||||||
|
static_cast<int>(inf_labels.length()), inf_labels.data(),
|
||||||
|
static_cast<unsigned long long>(observations_snapshot)));
|
||||||
|
|
||||||
|
// Render sum outside critical section
|
||||||
bucket_labels_sv.clear();
|
bucket_labels_sv.clear();
|
||||||
for (const auto &l : labels_key.labels)
|
for (const auto &l : labels_key.labels)
|
||||||
bucket_labels_sv.push_back(l);
|
bucket_labels_sv.push_back(l);
|
||||||
auto labels = format_labels(bucket_labels_sv);
|
auto labels = format_labels(bucket_labels_sv);
|
||||||
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(),
|
output.push_back(format(arena, "%s_sum%.*s %.17g\n", name.c_str(),
|
||||||
static_cast<int>(labels.length()),
|
static_cast<int>(labels.length()),
|
||||||
labels.data(), sum_value));
|
labels.data(), sum_snapshot));
|
||||||
|
|
||||||
auto count_value =
|
// Render count outside critical section
|
||||||
instance->observations.load(std::memory_order_relaxed);
|
output.push_back(
|
||||||
output.push_back(format(arena, "%s_count%.*s %llu\n", name.c_str(),
|
format(arena, "%s_count%.*s %llu\n", name.c_str(),
|
||||||
static_cast<int>(labels.length()),
|
static_cast<int>(labels.length()), labels.data(),
|
||||||
labels.data(),
|
static_cast<unsigned long long>(observations_snapshot)));
|
||||||
static_cast<unsigned long long>(count_value)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user