WIP adding metrics to skiplist
This commit is contained in:
@@ -17,9 +17,9 @@ limitations under the License.
|
||||
#include "ConflictSet.h"
|
||||
#include "Internal.h"
|
||||
#include "LongestCommonPrefix.h"
|
||||
#include "Metrics.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <atomic>
|
||||
#include <bit>
|
||||
#include <cassert>
|
||||
#include <cstddef>
|
||||
@@ -578,39 +578,6 @@ static_assert(kBytesPerKey - sizeof(Node0) >= kMinNodeSurplus);
|
||||
|
||||
constexpr int64_t kFreeListMaxMemory = 1 << 20;
|
||||
|
||||
struct Metric {
|
||||
Metric *prev;
|
||||
const char *name;
|
||||
const char *help;
|
||||
ConflictSet::MetricsV1::Type type;
|
||||
std::atomic<int64_t> value;
|
||||
|
||||
protected:
|
||||
Metric(ConflictSet::Impl *impl, const char *name, const char *help,
|
||||
ConflictSet::MetricsV1::Type type);
|
||||
};
|
||||
|
||||
struct Gauge : private Metric {
|
||||
Gauge(ConflictSet::Impl *impl, const char *name, const char *help)
|
||||
: Metric(impl, name, help, ConflictSet::MetricsV1::Gauge) {}
|
||||
|
||||
void set(int64_t value) {
|
||||
this->value.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
struct Counter : private Metric {
|
||||
Counter(ConflictSet::Impl *impl, const char *name, const char *help)
|
||||
: Metric(impl, name, help, ConflictSet::MetricsV1::Counter) {}
|
||||
// Expensive. Accumulate locally and then call add instead of repeatedly
|
||||
// calling add.
|
||||
void add(int64_t value) {
|
||||
assert(value >= 0);
|
||||
static_assert(std::atomic<int64_t>::is_always_lock_free);
|
||||
this->value.fetch_add(value, std::memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
template <class T> struct BoundedFreeListAllocator {
|
||||
|
||||
static_assert(sizeof(T) >= sizeof(void *));
|
||||
@@ -3242,7 +3209,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
0) *
|
||||
2;
|
||||
|
||||
memory_bytes.set(totalBytes);
|
||||
point_writes_total.add(tls.accum.point_writes);
|
||||
range_writes_total.add(tls.accum.range_writes);
|
||||
nodes_allocated_total.add(tls.accum.nodes_allocated);
|
||||
@@ -3331,7 +3297,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
#endif
|
||||
keyUpdates = gcScanStep(keyUpdates);
|
||||
|
||||
memory_bytes.set(totalBytes);
|
||||
nodes_allocated_total.add(std::exchange(tls.accum.nodes_allocated, 0));
|
||||
nodes_released_total.add(std::exchange(tls.accum.nodes_released, 0));
|
||||
entries_inserted_total.add(std::exchange(tls.accum.entries_inserted, 0));
|
||||
@@ -3379,7 +3344,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
explicit Impl(int64_t oldestVersion) {
|
||||
assert(oldestVersion >= 0);
|
||||
init(oldestVersion);
|
||||
initMetrics();
|
||||
metrics = initMetrics(metricsList, metricsCount);
|
||||
}
|
||||
~Impl() {
|
||||
eraseTree(root, &tls);
|
||||
@@ -3402,23 +3367,12 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
|
||||
MetricsV1 *metrics;
|
||||
int metricsCount = 0;
|
||||
void initMetrics() {
|
||||
metrics = (MetricsV1 *)safe_malloc(metricsCount * sizeof(metrics[0]));
|
||||
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricList); i >= 0;
|
||||
--i, m = m->prev) {
|
||||
metrics[i].name = m->name;
|
||||
metrics[i].help = m->help;
|
||||
metrics[i].p = m;
|
||||
metrics[i].type = m->type;
|
||||
}
|
||||
}
|
||||
|
||||
Metric *metricList = nullptr;
|
||||
Metric *metricsList = nullptr;
|
||||
|
||||
#define GAUGE(name, help) \
|
||||
Gauge name { this, #name, help }
|
||||
Gauge name { metricsList, metricsCount, #name, help }
|
||||
#define COUNTER(name, help) \
|
||||
Counter name { this, #name, help }
|
||||
Counter name { metricsList, metricsCount, #name, help }
|
||||
// ==================== METRICS DEFINITIONS ====================
|
||||
COUNTER(point_read_total, "Total number of point reads checked");
|
||||
COUNTER(point_read_short_circuit_total,
|
||||
@@ -3484,13 +3438,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
}
|
||||
};
|
||||
|
||||
Metric::Metric(ConflictSet::Impl *impl, const char *name, const char *help,
|
||||
ConflictSet::MetricsV1::Type type)
|
||||
: prev(std::exchange(impl->metricList, this)), name(name), help(help),
|
||||
type(type), value(0) {
|
||||
++impl->metricsCount;
|
||||
}
|
||||
|
||||
Node *&getInTree(Node *n, ConflictSet::Impl *impl) {
|
||||
return n->parent == nullptr ? impl->root
|
||||
: getChildExists(n->parent, n->parentsIndex);
|
||||
@@ -3509,6 +3456,7 @@ void internal_addWrites(ConflictSet::Impl *impl,
|
||||
mallocBytesDelta = 0;
|
||||
impl->addWrites(writes, count, writeVersion);
|
||||
impl->totalBytes += mallocBytesDelta;
|
||||
impl->memory_bytes.set(impl->totalBytes);
|
||||
#if SHOW_MEMORY
|
||||
if (impl->totalBytes != mallocBytes) {
|
||||
abort();
|
||||
@@ -3520,6 +3468,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
|
||||
mallocBytesDelta = 0;
|
||||
impl->setOldestVersion(oldestVersion);
|
||||
impl->totalBytes += mallocBytesDelta;
|
||||
impl->memory_bytes.set(impl->totalBytes);
|
||||
#if SHOW_MEMORY
|
||||
if (impl->totalBytes != mallocBytes) {
|
||||
abort();
|
||||
|
2
Jenkinsfile
vendored
2
Jenkinsfile
vendored
@@ -129,7 +129,7 @@ pipeline {
|
||||
}
|
||||
steps {
|
||||
script {
|
||||
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h"
|
||||
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h"
|
||||
}
|
||||
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
|
||||
sh """
|
||||
|
64
Metrics.h
Normal file
64
Metrics.h
Normal file
@@ -0,0 +1,64 @@
|
||||
#pragma once
|
||||
|
||||
#include "ConflictSet.h"
|
||||
#include "Internal.h"
|
||||
|
||||
#include <assert.h>
|
||||
#include <atomic>
|
||||
#include <tuple>
|
||||
|
||||
struct Metric {
|
||||
Metric *prev;
|
||||
const char *name;
|
||||
const char *help;
|
||||
weaselab::ConflictSet::MetricsV1::Type type;
|
||||
std::atomic<int64_t> value;
|
||||
|
||||
protected:
|
||||
Metric(Metric *&metricList, int &metricsCount, const char *name,
|
||||
const char *help, weaselab::ConflictSet::MetricsV1::Type type)
|
||||
: prev(std::exchange(metricList, this)), name(name), help(help),
|
||||
type(type), value(0) {
|
||||
++metricsCount;
|
||||
}
|
||||
};
|
||||
|
||||
struct Gauge : private Metric {
|
||||
Gauge(Metric *&metricList, int &metricsCount, const char *name,
|
||||
const char *help)
|
||||
: Metric(metricList, metricsCount, name, help,
|
||||
weaselab::ConflictSet::MetricsV1::Gauge) {}
|
||||
|
||||
void set(int64_t value) {
|
||||
this->value.store(value, std::memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
struct Counter : private Metric {
|
||||
Counter(Metric *&metricList, int &metricsCount, const char *name,
|
||||
const char *help)
|
||||
: Metric(metricList, metricsCount, name, help,
|
||||
weaselab::ConflictSet::MetricsV1::Counter) {}
|
||||
// Expensive. Accumulate locally and then call add instead of repeatedly
|
||||
// calling add.
|
||||
void add(int64_t value) {
|
||||
assert(value >= 0);
|
||||
static_assert(std::atomic<int64_t>::is_always_lock_free);
|
||||
this->value.fetch_add(value, std::memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList,
|
||||
int metricsCount) {
|
||||
weaselab::ConflictSet::MetricsV1 *metrics =
|
||||
(weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount *
|
||||
sizeof(metrics[0]));
|
||||
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0;
|
||||
--i, m = m->prev) {
|
||||
metrics[i].name = m->name;
|
||||
metrics[i].help = m->help;
|
||||
metrics[i].p = m;
|
||||
metrics[i].type = m->type;
|
||||
}
|
||||
return metrics;
|
||||
}
|
@@ -21,7 +21,7 @@
|
||||
|
||||
std::atomic<int64_t> transactions;
|
||||
|
||||
constexpr int kBaseSearchDepth = 32;
|
||||
constexpr int kBaseSearchDepth = 115;
|
||||
constexpr int kWindowSize = 10000000;
|
||||
|
||||
std::string numToKey(int64_t num) {
|
||||
|
97
SkipList.cpp
97
SkipList.cpp
@@ -22,6 +22,7 @@
|
||||
|
||||
#include "ConflictSet.h"
|
||||
#include "Internal.h"
|
||||
#include "Metrics.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <span>
|
||||
@@ -434,13 +435,14 @@ public:
|
||||
return result;
|
||||
}
|
||||
|
||||
void detectConflicts(ReadConflictRange *ranges, int count,
|
||||
ConflictSet::Result *transactionConflictStatus) const {
|
||||
// Return number of iterations of main loop
|
||||
int detectConflicts(ReadConflictRange *ranges, int count,
|
||||
ConflictSet::Result *transactionConflictStatus) const {
|
||||
const int M = 16;
|
||||
int nextJob[M];
|
||||
CheckMax inProgress[M];
|
||||
if (!count)
|
||||
return;
|
||||
return 0;
|
||||
|
||||
int started = std::min(M, count);
|
||||
for (int i = 0; i < started; i++) {
|
||||
@@ -451,8 +453,9 @@ public:
|
||||
|
||||
int prevJob = started - 1;
|
||||
int job = 0;
|
||||
int iters = 0;
|
||||
// vtune: 340 parts
|
||||
while (true) {
|
||||
for (;; ++iters) {
|
||||
if (inProgress[job].advance()) {
|
||||
if (started == count) {
|
||||
if (prevJob == job)
|
||||
@@ -468,6 +471,7 @@ public:
|
||||
prevJob = job;
|
||||
job = nextJob[job];
|
||||
}
|
||||
return iters;
|
||||
}
|
||||
|
||||
void find(const StringRef *values, Finger *results, int *temp, int count) {
|
||||
@@ -702,15 +706,27 @@ private:
|
||||
};
|
||||
};
|
||||
|
||||
struct ReadContext {
|
||||
int64_t commits_accum = 0;
|
||||
int64_t conflicts_accum = 0;
|
||||
int64_t too_olds_accum = 0;
|
||||
int64_t check_bytes_accum = 0;
|
||||
};
|
||||
|
||||
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
Impl(int64_t oldestVersion)
|
||||
: oldestVersion(oldestVersion), newestVersion(oldestVersion),
|
||||
skipList(oldestVersion) {}
|
||||
skipList(oldestVersion) {
|
||||
metrics = initMetrics(metricsList, metricsCount);
|
||||
}
|
||||
~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); }
|
||||
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
|
||||
int count) const {
|
||||
int count) {
|
||||
ReadContext tls;
|
||||
Arena arena;
|
||||
auto *ranges = new (arena) ReadConflictRange[count];
|
||||
for (int i = 0; i < count; ++i) {
|
||||
tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len;
|
||||
ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)};
|
||||
ranges[i].end = reads[i].end.len > 0
|
||||
? StringRef{reads[i].end.p, size_t(reads[i].end.len)}
|
||||
@@ -718,13 +734,22 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
ranges[i].version = reads[i].readVersion;
|
||||
results[i] = ConflictSet::Commit;
|
||||
}
|
||||
skipList.detectConflicts(ranges, count, results);
|
||||
int iters = skipList.detectConflicts(ranges, count, results);
|
||||
for (int i = 0; i < count; ++i) {
|
||||
if (reads[i].readVersion < oldestVersion ||
|
||||
reads[i].readVersion < newestVersion - 2e9) {
|
||||
results[i] = TooOld;
|
||||
}
|
||||
tls.commits_accum += results[i] == Commit;
|
||||
tls.conflicts_accum += results[i] == Conflict;
|
||||
tls.too_olds_accum += results[i] == TooOld;
|
||||
}
|
||||
range_read_iterations_total.add(iters);
|
||||
range_read_total.add(count);
|
||||
commits_total.add(tls.commits_accum);
|
||||
conflicts_total.add(tls.conflicts_accum);
|
||||
too_olds_total.add(tls.too_olds_accum);
|
||||
check_bytes_total.add(tls.check_bytes_accum);
|
||||
}
|
||||
|
||||
void addWrites(const ConflictSet::WriteRange *writes, int count,
|
||||
@@ -788,6 +813,9 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
}
|
||||
|
||||
void setOldestVersion(int64_t oldestVersion) {
|
||||
// This isn't 100% accurate. It overcounts if you hit the end
|
||||
gc_iterations_total.add(keyUpdates);
|
||||
|
||||
assert(oldestVersion >= this->oldestVersion);
|
||||
this->oldestVersion = oldestVersion;
|
||||
SkipList::Finger finger;
|
||||
@@ -802,6 +830,54 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
|
||||
int64_t totalBytes = 0;
|
||||
|
||||
MetricsV1 *metrics;
|
||||
int metricsCount = 0;
|
||||
Metric *metricsList = nullptr;
|
||||
|
||||
#define GAUGE(name, help) \
|
||||
Gauge name { metricsList, metricsCount, #name, help }
|
||||
#define COUNTER(name, help) \
|
||||
Counter name { metricsList, metricsCount, #name, help }
|
||||
// ==================== METRICS DEFINITIONS ====================
|
||||
COUNTER(range_read_total, "Total number of range reads checked");
|
||||
COUNTER(range_read_iterations_total,
|
||||
"Total number of iterations of the main loops for range read checks");
|
||||
COUNTER(commits_total,
|
||||
"Total number of checks where the result is \"commit\"");
|
||||
COUNTER(conflicts_total,
|
||||
"Total number of checks where the result is \"conflict\"");
|
||||
COUNTER(too_olds_total,
|
||||
"Total number of checks where the result is \"too old\"");
|
||||
COUNTER(check_bytes_total, "Total number of key bytes checked");
|
||||
GAUGE(memory_bytes, "Total number of bytes in use");
|
||||
COUNTER(nodes_allocated_total,
|
||||
"The total number of physical tree nodes allocated");
|
||||
COUNTER(nodes_released_total,
|
||||
"The total number of physical tree nodes released");
|
||||
COUNTER(insert_iterations_total,
|
||||
"The total number of iterations of the main loop for insertion. "
|
||||
"Includes searches where the entry already existed, and so insertion "
|
||||
"did not take place");
|
||||
COUNTER(entries_inserted_total,
|
||||
"The total number of entries inserted in the tree");
|
||||
COUNTER(entries_erased_total,
|
||||
"The total number of entries erased from the tree");
|
||||
COUNTER(
|
||||
gc_iterations_total,
|
||||
"The total number of iterations of the main loop for garbage collection");
|
||||
COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites");
|
||||
GAUGE(oldest_version,
|
||||
"The lowest version that doesn't result in \"TooOld\" for checks");
|
||||
GAUGE(newest_version, "The version of the most recent call to addWrites");
|
||||
// ==================== END METRICS DEFINITIONS ====================
|
||||
#undef GAUGE
|
||||
#undef COUNTER
|
||||
|
||||
void getMetricsV1(MetricsV1 **metrics, int *count) {
|
||||
*metrics = this->metrics;
|
||||
*count = metricsCount;
|
||||
}
|
||||
|
||||
private:
|
||||
int64_t keyUpdates = 0;
|
||||
Arena removalArena;
|
||||
@@ -824,6 +900,7 @@ void internal_addWrites(ConflictSet::Impl *impl,
|
||||
mallocBytesDelta = 0;
|
||||
impl->addWrites(writes, count, writeVersion);
|
||||
impl->totalBytes += mallocBytesDelta;
|
||||
impl->memory_bytes.set(impl->totalBytes);
|
||||
#if SHOW_MEMORY
|
||||
if (impl->totalBytes != mallocBytes) {
|
||||
abort();
|
||||
@@ -835,6 +912,7 @@ void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
|
||||
mallocBytesDelta = 0;
|
||||
impl->setOldestVersion(oldestVersion);
|
||||
impl->totalBytes += mallocBytesDelta;
|
||||
impl->memory_bytes.set(impl->totalBytes);
|
||||
#if SHOW_MEMORY
|
||||
if (impl->totalBytes != mallocBytes) {
|
||||
abort();
|
||||
@@ -858,12 +936,11 @@ int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; }
|
||||
|
||||
void internal_getMetricsV1(ConflictSet::Impl *impl,
|
||||
ConflictSet::MetricsV1 **metrics, int *count) {
|
||||
*metrics = nullptr;
|
||||
*count = 0;
|
||||
return impl->getMetricsV1(metrics, count);
|
||||
}
|
||||
|
||||
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
|
||||
return 0;
|
||||
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void ConflictSet::check(const ReadRange *reads, Result *results,
|
||||
|
Reference in New Issue
Block a user