Files
conflict-set/ServerBench.cpp
Andrew Noyes ce54746a4a
All checks were successful
Tests / Clang total: 2500, passed: 2500
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Debug total: 2498, passed: 2498
Tests / SIMD fallback total: 2500, passed: 2500
Tests / Release [gcc] total: 2500, passed: 2500
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [gcc,aarch64] total: 1867, passed: 1867
Tests / Coverage total: 1877, passed: 1877
Code Coverage #### Project Overview No changes detected, that affect the code coverage. * Line Coverage: 99.29% (1829/1842) * Branch Coverage: 67.35% (1479/2196) * Complexity Density: 0.00 * Lines of Code: 1842 #### Quality Gates Summary Output truncated.
weaselab/conflict-set/pipeline/head This commit looks good
Add several new cache events to metrics
2024-08-16 12:48:22 -07:00

366 lines
11 KiB
C++

#include <atomic>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include <string_view>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/uio.h>
#include <thread>
#include <unistd.h>
#include <utility>
#include <vector>
#include "ConflictSet.h"
#include "third_party/nadeau.h"
std::atomic<int64_t> transactions;
constexpr int kBaseSearchDepth = 32;
constexpr int kWindowSize = 10000000;
std::basic_string<uint8_t> numToKey(int64_t num) {
std::basic_string<uint8_t> result;
result.resize(kBaseSearchDepth + sizeof(int64_t));
memset(result.data(), 0, kBaseSearchDepth);
int64_t be = __builtin_bswap64(num);
memcpy(result.data() + kBaseSearchDepth, &be, sizeof(int64_t));
return result;
}
void workload(weaselab::ConflictSet *cs) {
int64_t version = kWindowSize;
cs->addWrites(nullptr, 0, version);
for (;; transactions.fetch_add(1, std::memory_order_relaxed)) {
// Reads
{
auto beginK = numToKey(version - kWindowSize);
auto endK = numToKey(version - 1);
auto pointRv = version - kWindowSize + rand() % kWindowSize + 1;
auto pointK = numToKey(pointRv);
weaselab::ConflictSet::ReadRange reads[] = {
{
{pointK.data(), int(pointK.size())},
{nullptr, 0},
pointRv,
},
{
{beginK.data(), int(beginK.size())},
{endK.data(), int(endK.size())},
version - 2,
},
};
weaselab::ConflictSet::Result result[sizeof(reads) / sizeof(reads[0])];
cs->check(reads, result, sizeof(reads) / sizeof(reads[0]));
// for (int i = 0; i < sizeof(reads) / sizeof(reads[0]); ++i) {
// if (result[i] != weaselab::ConflictSet::Commit) {
// fprintf(stderr, "Unexpected conflict: [%s, %s) @ %" PRId64 "\n",
// printable(reads[i].begin).c_str(),
// printable(reads[i].end).c_str(), reads[i].readVersion);
// abort();
// }
// }
}
// Writes
{
weaselab::ConflictSet::WriteRange w;
auto k = numToKey(version);
w.begin.p = k.data();
w.end.len = 0;
if (version % (kWindowSize / 2) == 0) {
for (int l = 0; l <= k.size(); ++l) {
w.begin.len = l;
cs->addWrites(&w, 1, version);
}
} else {
w.begin.len = k.size();
cs->addWrites(&w, 1, version);
int64_t beginN = version - kWindowSize + rand() % kWindowSize;
auto b = numToKey(beginN);
auto e = numToKey(beginN + 1000);
w.begin.p = b.data();
w.begin.len = b.size();
w.end.p = e.data();
w.end.len = e.size();
cs->addWrites(&w, 1, version);
}
}
// GC
cs->setOldestVersion(version - kWindowSize);
++version;
}
}
// Adapted from getaddrinfo man page
int getListenFd(const char *node, const char *service) {
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, s;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = nullptr;
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
s = getaddrinfo(node, service, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
abort();
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind(2).
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != nullptr; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) {
continue;
}
int val = 1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */
}
close(sfd);
}
freeaddrinfo(result); /* No longer needed */
if (rp == nullptr) { /* No address succeeded */
fprintf(stderr, "Could not bind\n");
abort();
}
int rv = listen(sfd, SOMAXCONN);
if (rv) {
perror("listen()");
abort();
}
return sfd;
}
// HTTP response
//
std::string_view part1 =
"HTTP/1.1 200 OK \r\nContent-type: text/plain; version=0.0.4; "
"charset=utf-8; escaping=values\r\nContent-Length: ";
// Decimal content length
std::string_view part2 = "\r\n\r\n";
// Body
double toSeconds(timeval t) {
return double(t.tv_sec) + double(t.tv_usec) * 1e-6;
}
#include <linux/perf_event.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/syscall.h>
#include <sys/types.h>
#include <unistd.h>
#ifdef __linux__
struct PerfCounter {
PerfCounter(int type, int config, const std::string &labels = {})
: labels(labels) {
struct perf_event_attr pe;
memset(&pe, 0, sizeof(pe));
pe.type = type;
pe.size = sizeof(pe);
pe.config = config;
pe.inherit = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
fd = perf_event_open(&pe, 0, -1, -1, 0);
}
int64_t total() const {
int64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
perror("read instructions from perf");
abort();
}
return count;
}
PerfCounter(PerfCounter &&other)
: fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {}
PerfCounter &operator=(PerfCounter &&other) {
fd = std::exchange(other.fd, -1);
labels = std::move(other.labels);
return *this;
}
~PerfCounter() {
if (fd >= 0) {
close(fd);
}
}
bool ok() const { return fd >= 0; }
const std::string &getLabels() const { return labels; }
private:
int fd;
std::string labels;
static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
int cpu, int group_fd, unsigned long flags) {
int ret;
ret = syscall(SYS_perf_event_open, hw_event, pid, cpu, group_fd, flags);
return ret;
}
};
#else
struct PerfCounter {
PerfCounter(int, int) {}
int64_t total() { return 0; }
};
#endif
int main(int argc, char **argv) {
if (argc != 3) {
goto fail;
}
{
int listenFd = getListenFd(argv[1], argv[2]);
weaselab::ConflictSet cs{0};
weaselab::ConflictSet::MetricsV1 *metrics;
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS};
PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES};
std::vector<PerfCounter> cacheCounters;
for (auto [id, idStr] : std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_L1D, "l1d"},
{PERF_COUNT_HW_CACHE_L1I, "l1i"},
{PERF_COUNT_HW_CACHE_LL, "ll"},
{PERF_COUNT_HW_CACHE_DTLB, "dtlb"},
// Somehow was showing a miss rate > 1 /shrug
// {PERF_COUNT_HW_CACHE_ITLB, "itlb"},
{PERF_COUNT_HW_CACHE_BPU, "bpu"},
{PERF_COUNT_HW_CACHE_NODE, "node"},
}) {
for (auto [op, opStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_OP_READ, "read"},
{PERF_COUNT_HW_CACHE_OP_WRITE, "write"},
{PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"},
}) {
for (auto [result, resultStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"},
{PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"},
}) {
auto labels = "{id=\"" + idStr + "\", op=\"" + opStr +
"\", result=\"" + resultStr + "\"}";
cacheCounters.emplace_back(PERF_TYPE_HW_CACHE,
id | (op << 8) | (result << 16), labels);
if (!cacheCounters.back().ok()) {
fprintf(stderr, "Could not open cache event: %s\n", labels.c_str());
cacheCounters.pop_back();
}
}
}
}
auto w = std::thread{workload, &cs};
for (;;) {
struct sockaddr_storage peer_addr = {};
socklen_t peer_addr_len = sizeof(peer_addr);
const int connfd =
accept(listenFd, (struct sockaddr *)&peer_addr, &peer_addr_len);
std::string body;
rusage r;
getrusage(RUSAGE_SELF, &r);
body += "# HELP process_cpu_seconds_total Total user and system CPU time "
"spent in seconds.\n# TYPE process_cpu_seconds_total counter\n"
"process_cpu_seconds_total ";
body += std::to_string(toSeconds(r.ru_utime) + toSeconds(r.ru_stime));
body += "\n";
body += "# HELP process_resident_memory_bytes Resident memory size in "
"bytes.\n# TYPE process_resident_memory_bytes gauge\n"
"process_resident_memory_bytes ";
body += std::to_string(getCurrentRSS());
body += "\n";
body += "# HELP transactions_total Total number of transactions\n"
"# TYPE transactions_total counter\n"
"transactions_total ";
body += std::to_string(transactions.load(std::memory_order_relaxed));
body += "\n";
body += "# HELP instructions_total Total number of instructions\n"
"# TYPE instructions_total counter\n"
"instructions_total ";
body += std::to_string(instructions.total());
body += "\n";
body += "# HELP cycles_total Total number of cycles\n"
"# TYPE cycles_total counter\n"
"cycles_total ";
body += std::to_string(cycles.total());
body += "\n";
body += "# HELP cache_event_total Total number of cache events\n"
"# TYPE cache_event_total counter\n";
for (const auto &counter : cacheCounters) {
body += "cache_event_total" + counter.getLabels() + " " +
std::to_string(counter.total()) + "\n";
}
for (int i = 0; i < metricsCount; ++i) {
body += "# HELP ";
body += metrics[i].name;
body += " ";
body += metrics[i].help;
body += "\n";
body += "# TYPE ";
body += metrics[i].name;
body += " ";
body += metrics[i].type == metrics[i].Counter ? "counter" : "gauge";
body += "\n";
body += metrics[i].name;
body += " ";
body += std::to_string(metrics[i].getValue());
body += "\n";
}
auto len = std::to_string(body.size());
iovec iov[] = {
{(void *)part1.data(), part1.size()},
{(void *)len.data(), len.size()},
{(void *)part2.data(), part2.size()},
{(void *)body.data(), body.size()},
};
int written;
do {
written = writev(connfd, iov, sizeof(iov) / sizeof(iov[0]));
} while (written < 0 && errno == EINTR);
close(connfd);
}
}
fail:
fprintf(stderr, "Expected ./%s <host> <port>\n", argv[0]);
return 1;
}