Use plain arrays and atomic read with intrinsics for render
This commit is contained in:
@@ -34,9 +34,12 @@ struct ContentionEnvironment {
|
|||||||
: counter_family(
|
: counter_family(
|
||||||
metric::create_counter("bench_counter", "Benchmark counter")),
|
metric::create_counter("bench_counter", "Benchmark counter")),
|
||||||
gauge_family(metric::create_gauge("bench_gauge", "Benchmark gauge")),
|
gauge_family(metric::create_gauge("bench_gauge", "Benchmark gauge")),
|
||||||
histogram_family(metric::create_histogram(
|
histogram_family(
|
||||||
"bench_histogram", "Benchmark histogram",
|
metric::create_histogram("bench_histogram", "Benchmark histogram",
|
||||||
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0})),
|
// 7 explicit buckets + automatic +Inf = 8
|
||||||
|
// total (optimal for SIMD: 2x4 buckets)
|
||||||
|
std::initializer_list<double>{
|
||||||
|
0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0})),
|
||||||
counter(counter_family.create({{"benchmark", "contention"}})),
|
counter(counter_family.create({{"benchmark", "contention"}})),
|
||||||
gauge(gauge_family.create({{"benchmark", "contention"}})),
|
gauge(gauge_family.create({{"benchmark", "contention"}})),
|
||||||
histogram(histogram_family.create({{"benchmark", "contention"}})) {}
|
histogram(histogram_family.create({{"benchmark", "contention"}})) {}
|
||||||
@@ -133,9 +136,9 @@ int main() {
|
|||||||
ankerl::nanobench::doNotOptimizeAway(gauge);
|
ankerl::nanobench::doNotOptimizeAway(gauge);
|
||||||
});
|
});
|
||||||
|
|
||||||
auto histogram_family =
|
auto histogram_family = metric::create_histogram(
|
||||||
metric::create_histogram("baseline_histogram", "Baseline histogram",
|
"baseline_histogram", "Baseline histogram",
|
||||||
std::initializer_list<double>{0.1, 0.5, 1.0});
|
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0});
|
||||||
auto histogram = histogram_family.create({{"type", "baseline"}});
|
auto histogram = histogram_family.create({{"type", "baseline"}});
|
||||||
|
|
||||||
bench.run("histogram.observe() - no contention", [&]() {
|
bench.run("histogram.observe() - no contention", [&]() {
|
||||||
@@ -244,9 +247,9 @@ int main() {
|
|||||||
auto counter_family =
|
auto counter_family =
|
||||||
metric::create_counter("scale_counter", "Scale counter");
|
metric::create_counter("scale_counter", "Scale counter");
|
||||||
auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge");
|
auto gauge_family = metric::create_gauge("scale_gauge", "Scale gauge");
|
||||||
auto histogram_family =
|
auto histogram_family = metric::create_histogram(
|
||||||
metric::create_histogram("scale_histogram", "Scale histogram",
|
"scale_histogram", "Scale histogram",
|
||||||
std::initializer_list<double>{0.1, 0.5, 1.0});
|
std::initializer_list<double>{0.1, 0.5, 1.0, 2.5, 5.0, 10.0, 25.0});
|
||||||
|
|
||||||
// Create varying numbers of metrics
|
// Create varying numbers of metrics
|
||||||
for (int scale : {10, 100, 1000}) {
|
for (int scale : {10, 100, 1000}) {
|
||||||
|
|||||||
102
src/metric.cpp
102
src/metric.cpp
@@ -20,6 +20,7 @@
|
|||||||
#include <unordered_map>
|
#include <unordered_map>
|
||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
|
#include <immintrin.h>
|
||||||
#include <simdutf.h>
|
#include <simdutf.h>
|
||||||
|
|
||||||
#include "format.hpp"
|
#include "format.hpp"
|
||||||
@@ -142,7 +143,7 @@ template <> struct Family<Histogram>::State {
|
|||||||
|
|
||||||
// Counter: Thread-local, monotonically increasing, single writer per thread
|
// Counter: Thread-local, monotonically increasing, single writer per thread
|
||||||
struct Counter::State {
|
struct Counter::State {
|
||||||
AtomicWord value; // Stores double as uint64_t bits
|
double value; // Single writer, no atomics needed
|
||||||
friend struct Metric;
|
friend struct Metric;
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -156,7 +157,8 @@ struct Gauge::State {
|
|||||||
struct Histogram::State {
|
struct Histogram::State {
|
||||||
std::vector<double>
|
std::vector<double>
|
||||||
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
thresholds; // Bucket boundaries (sorted, deduplicated, includes +Inf)
|
||||||
std::vector<AtomicWord> counts; // Count per bucket (uint64_t)
|
std::vector<uint64_t>
|
||||||
|
counts; // Count per bucket - single writer, no atomics needed
|
||||||
AtomicWord sum; // Sum of observations (double stored as uint64_t bits)
|
AtomicWord sum; // Sum of observations (double stored as uint64_t bits)
|
||||||
AtomicWord observations; // Total observation count (uint64_t)
|
AtomicWord observations; // Total observation count (uint64_t)
|
||||||
friend struct Metric;
|
friend struct Metric;
|
||||||
@@ -218,7 +220,7 @@ struct Metric {
|
|||||||
family->p->perThreadState[std::this_thread::get_id()].instances[key];
|
family->p->perThreadState[std::this_thread::get_id()].instances[key];
|
||||||
if (!ptr) {
|
if (!ptr) {
|
||||||
ptr = std::make_unique<Counter::State>();
|
ptr = std::make_unique<Counter::State>();
|
||||||
ptr->value.store(0, std::memory_order_relaxed);
|
ptr->value = 0.0;
|
||||||
}
|
}
|
||||||
Counter result;
|
Counter result;
|
||||||
result.p = ptr.get();
|
result.p = ptr.get();
|
||||||
@@ -260,13 +262,8 @@ struct Metric {
|
|||||||
// Use buckets from family configuration
|
// Use buckets from family configuration
|
||||||
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated
|
ptr->thresholds = family->p->buckets; // Already sorted and deduplicated
|
||||||
|
|
||||||
// DESIGN: std::atomic is not copy-constructible
|
// Single writer semantics - no atomics needed for bucket counts
|
||||||
// Initialize vector with correct size, all atomics explicitly initialized
|
ptr->counts = std::vector<uint64_t>(ptr->thresholds.size(), 0);
|
||||||
// to 0
|
|
||||||
ptr->counts = std::vector<AtomicWord>(ptr->thresholds.size());
|
|
||||||
for (auto &count : ptr->counts) {
|
|
||||||
count.store(0, std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
ptr->sum.store(0, std::memory_order_relaxed);
|
ptr->sum.store(0, std::memory_order_relaxed);
|
||||||
ptr->observations.store(0, std::memory_order_relaxed);
|
ptr->observations.store(0, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
@@ -279,19 +276,17 @@ struct Metric {
|
|||||||
Counter::Counter() = default;
|
Counter::Counter() = default;
|
||||||
|
|
||||||
void Counter::inc(double x) {
|
void Counter::inc(double x) {
|
||||||
// DESIGN: Single writer per thread allows simple load-modify-store
|
// DESIGN: Single writer per thread allows simple increment
|
||||||
// No CAS loop needed since only one thread writes to this counter
|
// No atomics needed since only one thread writes to this counter
|
||||||
auto current_value =
|
auto new_value = p->value + x;
|
||||||
std::bit_cast<double>(p->value.load(std::memory_order_relaxed));
|
|
||||||
auto new_value = current_value + x;
|
|
||||||
|
|
||||||
// Validate monotonic property (counter never decreases)
|
// Validate monotonic property (counter never decreases)
|
||||||
if (new_value < current_value) [[unlikely]] {
|
if (new_value < p->value) [[unlikely]] {
|
||||||
validate_or_abort(false, "counter value overflow/wraparound detected",
|
validate_or_abort(false, "counter value overflow/wraparound detected",
|
||||||
std::to_string(new_value).c_str());
|
std::to_string(new_value).c_str());
|
||||||
}
|
}
|
||||||
|
|
||||||
p->value.store(std::bit_cast<uint64_t>(new_value), std::memory_order_relaxed);
|
p->value = new_value;
|
||||||
}
|
}
|
||||||
|
|
||||||
Gauge::Gauge() = default;
|
Gauge::Gauge() = default;
|
||||||
@@ -325,14 +320,67 @@ void Gauge::set(double x) {
|
|||||||
|
|
||||||
Histogram::Histogram() = default;
|
Histogram::Histogram() = default;
|
||||||
|
|
||||||
|
// Vectorized histogram bucket updates using single-writer + atomic-read design
|
||||||
|
// Since histograms have single-writer semantics, we can bypass atomic writes!
|
||||||
|
|
||||||
|
// Default implementation
|
||||||
|
__attribute__((target("default"))) static void
|
||||||
|
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||||
|
std::vector<uint64_t> &counts, double x,
|
||||||
|
size_t start_idx) {
|
||||||
|
const size_t size = thresholds.size();
|
||||||
|
|
||||||
|
// Single writer - simple increment, no atomics needed
|
||||||
|
for (size_t i = start_idx; i < size; ++i) {
|
||||||
|
if (x <= thresholds[i]) {
|
||||||
|
counts[i]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AVX2 version - true vectorization with direct memory access
|
||||||
|
#ifdef __x86_64__
|
||||||
|
__attribute__((target("avx2"))) static void
|
||||||
|
update_histogram_buckets_vectorized(const std::vector<double> &thresholds,
|
||||||
|
std::vector<uint64_t> &counts, double x,
|
||||||
|
size_t start_idx) {
|
||||||
|
const size_t size = thresholds.size();
|
||||||
|
size_t i = start_idx;
|
||||||
|
|
||||||
|
// Process 4 buckets at a time with AVX2
|
||||||
|
const __m256d x_vec = _mm256_set1_pd(x);
|
||||||
|
|
||||||
|
for (; i + 4 <= size; i += 4) {
|
||||||
|
// Vectorized comparison
|
||||||
|
__m256d thresholds_vec = _mm256_loadu_pd(&thresholds[i]);
|
||||||
|
__m256d cmp_result = _mm256_cmp_pd(x_vec, thresholds_vec, _CMP_LE_OQ);
|
||||||
|
|
||||||
|
// Convert to increment mask
|
||||||
|
__m256i cmp_as_int = _mm256_castpd_si256(cmp_result);
|
||||||
|
__m256i ones = _mm256_set1_epi64x(1);
|
||||||
|
__m256i increments = _mm256_and_si256(cmp_as_int, ones);
|
||||||
|
|
||||||
|
// Vectorized 4-lane add directly to memory
|
||||||
|
__m256i current_counts = _mm256_loadu_si256((__m256i *)&counts[i]);
|
||||||
|
__m256i updated_counts = _mm256_add_epi64(current_counts, increments);
|
||||||
|
_mm256_storeu_si256((__m256i *)&counts[i], updated_counts);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle remainder
|
||||||
|
for (; i < size; ++i) {
|
||||||
|
if (x <= thresholds[i]) {
|
||||||
|
counts[i]++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
void Histogram::observe(double x) {
|
void Histogram::observe(double x) {
|
||||||
assert(p->thresholds.size() == p->counts.size());
|
assert(p->thresholds.size() == p->counts.size());
|
||||||
|
|
||||||
// Increment bucket counts (cumulative: each bucket counts all values <=
|
// Use multiversioned auto-vectorized function
|
||||||
// threshold)
|
// Compiler automatically selects best implementation for current CPU
|
||||||
for (size_t i = 0; i < p->thresholds.size(); ++i) {
|
update_histogram_buckets_vectorized(p->thresholds, p->counts, x, 0);
|
||||||
p->counts[i].fetch_add(x <= p->thresholds[i], std::memory_order_relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
// DESIGN: Single writer per thread allows simple load-modify-store for sum
|
// DESIGN: Single writer per thread allows simple load-modify-store for sum
|
||||||
// No CAS loop needed since only one thread writes to this histogram
|
// No CAS loop needed since only one thread writes to this histogram
|
||||||
@@ -630,8 +678,10 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
|
|
||||||
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
for (const auto &[thread_id, per_thread] : family->perThreadState) {
|
||||||
for (const auto &[labels_key, instance] : per_thread.instances) {
|
for (const auto &[labels_key, instance] : per_thread.instances) {
|
||||||
auto value = std::bit_cast<double>(
|
// Atomic read from render thread - single writer doesn't need atomic
|
||||||
instance->value.load(std::memory_order_relaxed));
|
// writes
|
||||||
|
double value;
|
||||||
|
__atomic_load(&instance->value, &value, __ATOMIC_RELAXED);
|
||||||
labels_sv.clear();
|
labels_sv.clear();
|
||||||
for (const auto &l : labels_key.labels)
|
for (const auto &l : labels_key.labels)
|
||||||
labels_sv.push_back(l);
|
labels_sv.push_back(l);
|
||||||
@@ -697,7 +747,9 @@ std::span<std::string_view> render(ArenaAllocator &arena) {
|
|||||||
bucket_labels_sv.push_back(
|
bucket_labels_sv.push_back(
|
||||||
{"le", format(arena, "%.17g", instance->thresholds[i])});
|
{"le", format(arena, "%.17g", instance->thresholds[i])});
|
||||||
}
|
}
|
||||||
auto count = instance->counts[i].load(std::memory_order_relaxed);
|
// Atomic read from render thread - single writer doesn't need atomic
|
||||||
|
// writes
|
||||||
|
auto count = __atomic_load_n(&instance->counts[i], __ATOMIC_RELAXED);
|
||||||
auto labels = format_labels(bucket_labels_sv);
|
auto labels = format_labels(bucket_labels_sv);
|
||||||
output.push_back(format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
output.push_back(format(arena, "%s_bucket%.*s %llu\n", name.c_str(),
|
||||||
static_cast<int>(labels.length()),
|
static_cast<int>(labels.length()),
|
||||||
|
|||||||
Reference in New Issue
Block a user