Make metrics thread-safe
Some checks failed
Tests / Clang total: 1533, failed: 1, passed: 1532
Tests / Clang - debug total: 1531, passed: 1531
Tests / SIMD fallback total: 1533, failed: 1, passed: 1532
Tests / Release [gcc] total: 1533, failed: 1, passed: 1532
Tests / Release [gcc,aarch64] total: 1144, failed: 2, passed: 1142
Tests / Coverage total: 1151, passed: 1151
weaselab/conflict-set/pipeline/head There was a failure building this commit

Even concurrently with calling non-const methods on the associated
ConflictSet
This commit is contained in:
2024-07-12 13:22:02 -07:00
parent ef14003781
commit 3288c583e4
10 changed files with 111 additions and 22 deletions

View File

@@ -362,10 +362,14 @@ void benchMetrics() {
ankerl::nanobench::Bench bench;
ConflictSet cs{0};
bench.run("metrics", [&]() {
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
int count;
ConflictSet::MetricsV1 *m;
cs.getMetricsV1(&m, &count);
bench.batch(count);
bench.run("fetch metric", [&]() {
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
});
}

View File

@@ -60,6 +60,8 @@ cmake_pop_check_state()
option(USE_SIMD_FALLBACK
"Use fallback implementations of functions that use SIMD" OFF)
option(DISABLE_TSAN "Disable TSAN" OFF)
# This is encouraged according to
# https://valgrind.org/docs/manual/manual-core-adv.html#manual-core-adv.clientreq
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/valgrind)
@@ -245,7 +247,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
endforeach()
# tsan tests
if(NOT CMAKE_CROSSCOMPILING AND NOT CMAKE_BUILD_TYPE STREQUAL Debug)
if(NOT CMAKE_CROSSCOMPILING AND NOT DISABLE_TSAN)
add_executable(tsan_driver ConflictSet.cpp FuzzTestDriver.cpp)
target_compile_options(tsan_driver PRIVATE ${TEST_FLAGS} -fsanitize=thread)
target_link_options(tsan_driver PRIVATE -fsanitize=thread)

View File

@@ -551,7 +551,7 @@ struct Metric {
const char *name;
const char *help;
ConflictSet::MetricsV1::Type type;
double value;
std::atomic<double> value;
protected:
Metric(ConflictSet::Impl *impl, const char *name, const char *help,
@@ -562,15 +562,27 @@ struct Gauge : private Metric {
Gauge(ConflictSet::Impl *impl, const char *name, const char *help)
: Metric(impl, name, help, ConflictSet::MetricsV1::Gauge) {}
void set(double value) { this->value = value; }
void set(double value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(ConflictSet::Impl *impl, const char *name, const char *help)
: Metric(impl, name, help, ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(double value) {
assert(value >= 0);
this->value += value;
static_assert(std::atomic<double>::is_always_lock_free);
double old = this->value.load(std::memory_order_relaxed);
for (;;) {
double newVal = old + value;
if (this->value.compare_exchange_weak(old, newVal,
std::memory_order_relaxed)) {
break;
}
}
}
};
@@ -3235,7 +3247,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
++i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].value = &m->value;
metrics[i].p = m;
metrics[i].type = m->type;
}
std::sort(metrics, metrics + metricsCount,
@@ -3274,6 +3286,11 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
// ==================== END METRICS DEFINITIONS ====================
#undef GAUGE
#undef COUNTER
void getMetricsV1(MetricsV1 **metrics, int *count) {
*metrics = this->metrics;
*count = metricsCount;
}
};
Metric::Metric(ConflictSet::Impl *impl, const char *name, const char *help,
@@ -3414,8 +3431,11 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->getBytes(); }
void internal_getMetricsV1(ConflictSet::Impl *impl,
ConflictSet::MetricsV1 **metrics, int *count) {
*metrics = impl->metrics;
*count = impl->metricsCount;
impl->getMetricsV1(metrics, count);
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
}
// ==================== END IMPLEMENTATION ====================
@@ -3506,6 +3526,10 @@ void ConflictSet::getMetricsV1(MetricsV1 **metrics, int *count) const {
return internal_getMetricsV1(impl, metrics, count);
}
double ConflictSet::MetricsV1::getValue() const {
return internal_getMetricValue(this);
}
ConflictSet::ConflictSet(int64_t oldestVersion)
: impl(internal_create(oldestVersion)) {}

View File

@@ -702,17 +702,36 @@ template <class ConflictSetImpl> struct TestDriver {
}
}
oldestVersion +=
arbitrary.bounded(10) ? arbitrary.bounded(10) : arbitrary.next();
oldestVersion = std::min(oldestVersion, writeVersion);
#ifdef THREAD_TEST
std::latch ready{1};
std::thread thread2{[&]() {
ready.count_down();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
}};
ready.wait();
#endif
CALLGRIND_START_INSTRUMENTATION;
cs.addWrites(writes, numPointWrites + numRangeWrites, v);
CALLGRIND_STOP_INSTRUMENTATION;
refImpl.addWrites(writes, numPointWrites + numRangeWrites, v);
oldestVersion +=
arbitrary.bounded(10) ? arbitrary.bounded(10) : arbitrary.next();
oldestVersion = std::min(oldestVersion, writeVersion);
cs.setOldestVersion(oldestVersion);
refImpl.setOldestVersion(oldestVersion);
#ifdef THREAD_TEST
thread2.join();
#endif
}
{
int numPointReads = arbitrary.bounded(100);
@@ -785,7 +804,15 @@ template <class ConflictSetImpl> struct TestDriver {
std::latch ready{1};
std::thread thread2{[&]() {
ready.count_down();
// Call all const methods
cs.check(reads, results3, numPointReads + numRangeReads);
cs.getBytes();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
}};
ready.wait();
#endif
@@ -794,6 +821,15 @@ template <class ConflictSetImpl> struct TestDriver {
cs.check(reads, results1, numPointReads + numRangeReads);
CALLGRIND_STOP_INSTRUMENTATION;
// Call remaining const methods
cs.getBytes();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
refImpl.check(reads, results2, numPointReads + numRangeReads);
auto compareResults = [reads](ConflictSet::Result *results1,

14
Jenkinsfile vendored
View File

@@ -48,6 +48,18 @@ pipeline {
recordIssues(tools: [clang()])
}
}
stage('Clang - debug') {
agent {
dockerfile {
args '-v /home/jenkins/ccache:/ccache'
reuseNode true
}
}
steps {
CleanBuildAndTest("-DCMAKE_BUILD_TYPE=Debug")
recordIssues(tools: [clang()])
}
}
stage('SIMD fallback') {
agent {
dockerfile {
@@ -106,7 +118,7 @@ pipeline {
}
}
steps {
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug")
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
sh '''
gcovr -f ConflictSet.cpp --cobertura > build/coverage.xml
'''

View File

@@ -136,7 +136,7 @@ int main(int argc, const char **argv) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, *metrics[i].value);
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
printf("Check: %g seconds, %g MB/s, Add: %g seconds, %g MB/s, Gc ratio: "

View File

@@ -867,6 +867,10 @@ void internal_getMetricsV1(ConflictSet::Impl *impl,
*count = 0;
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return 0;
}
void ConflictSet::check(const ReadRange *reads, Result *results,
int count) const {
internal_check(impl, reads, results, count);
@@ -887,6 +891,10 @@ void ConflictSet::getMetricsV1(MetricsV1 **metrics, int *count) const {
return internal_getMetricsV1(impl, metrics, count);
}
double ConflictSet::MetricsV1::getValue() const {
return internal_getMetricValue(this);
}
ConflictSet::ConflictSet(int64_t oldestVersion)
: impl(internal_create(oldestVersion)) {}

View File

@@ -15,4 +15,5 @@ __ZN8weaselab11ConflictSetD2Ev
__ZN8weaselab11ConflictSetaSEOS0_
__ZNK8weaselab11ConflictSet12getMetricsV1EPPNS0_9MetricsV1EPi
__ZNK8weaselab11ConflictSet5checkEPKNS0_9ReadRangeEPNS0_6ResultEi
__ZNK8weaselab11ConflictSet8getBytesEv
__ZNK8weaselab11ConflictSet8getBytesEv
__ZNK8weaselab11ConflictSet9MetricsV18getValueEv

View File

@@ -30,6 +30,6 @@ int main(void) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, *metrics[i].value);
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
}

View File

@@ -101,9 +101,10 @@ struct __attribute__((__visibility__("default"))) ConflictSet {
const char *help;
/** Counters are >= 0 and non-decreasing. Gauges are any value. */
enum Type { Counter, Gauge } type;
/** Thread-safety: do not read concurrently with a call to any non-const
* method in the ConflictSet associated with this metric. */
const double *value;
/** Get the most up-to-date value available for this metric. Thread-safe. */
double getValue() const;
/** @private */
void *p;
};
/** Experimental! Store a pointer to an array of MetricsV1 (owned by the
@@ -111,7 +112,8 @@ struct __attribute__((__visibility__("default"))) ConflictSet {
* no guarantees about the contents of the metrics (e.g. names, help text, and
* the meaning of values). A correct implementation is free to return nonsense
* or nothing at all. Not intended to be inspected programmatically. Only
* intended to be plumbed along to e.g. Prometheus. */
* intended to be plumbed along to e.g. Prometheus. Callers may repeatedly
* call `getValue` on the metrics they're interested in. */
void getMetricsV1(MetricsV1 **metrics, int *count) const;
#if __cplusplus > 199711L