514 lines
16 KiB
C++
514 lines
16 KiB
C++
#include <algorithm>
|
|
#include <atomic>
|
|
#include <cstdint>
|
|
#include <cstdlib>
|
|
#include <errno.h>
|
|
#include <netdb.h>
|
|
#include <stdio.h>
|
|
#include <stdlib.h>
|
|
#include <string.h>
|
|
#include <string>
|
|
#include <string_view>
|
|
#include <sys/ioctl.h>
|
|
#include <sys/resource.h>
|
|
#include <sys/socket.h>
|
|
#include <sys/types.h>
|
|
#include <sys/uio.h>
|
|
#include <thread>
|
|
#include <unistd.h>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
#include "ConflictSet.h"
|
|
#include "Internal.h"
|
|
#include "third_party/nadeau.h"
|
|
|
|
constexpr int kCacheLine = 64; // TODO mac m1 is 128
|
|
|
|
template <class T> struct TxQueue {
|
|
|
|
explicit TxQueue(int lgSlotCount)
|
|
: slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1),
|
|
slots(new T[slotCount]) {
|
|
// Otherwise we can't tell the difference between full and empty.
|
|
assert(!(slotCountMask & 0x80000000));
|
|
}
|
|
|
|
/// Call from producer thread, after ensuring consumer is no longer accessing
|
|
/// it somehow
|
|
~TxQueue() { delete[] slots; }
|
|
|
|
/// Must be called from the producer thread
|
|
void push(T t) {
|
|
if (wouldBlock()) {
|
|
// Wait for pops to change and try again
|
|
consumer.pops.wait(producer.lastPopRead, std::memory_order_relaxed);
|
|
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
|
|
}
|
|
slots[producer.pushesNonAtomic++ & slotCountMask] = std::move(t);
|
|
// seq_cst so that the notify can't be ordered before the store
|
|
producer.pushes.store(producer.pushesNonAtomic, std::memory_order_seq_cst);
|
|
// We have to notify every time, since we don't know if this is the last
|
|
// push ever
|
|
producer.pushes.notify_one();
|
|
}
|
|
|
|
/// Must be called from the producer thread
|
|
uint32_t outstanding() {
|
|
return producer.pushesNonAtomic -
|
|
consumer.pops.load(std::memory_order_relaxed);
|
|
}
|
|
|
|
/// Returns true if a call to push might block. Must be called from the
|
|
/// producer thread.
|
|
bool wouldBlock() {
|
|
// See if we can determine that overflow won't happen entirely from state
|
|
// local to the producer
|
|
if (producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1) {
|
|
// Re-read pops with memory order
|
|
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
|
|
return producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1;
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// Valid until the next pop, or until this queue is destroyed.
|
|
T *pop() {
|
|
// See if we can determine that there's an entry we can pop entirely from
|
|
// state local to the consumer
|
|
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
|
// Re-read pushes with memory order and try again
|
|
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
|
|
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
|
// Wait for pushes to change and try again
|
|
producer.pushes.wait(consumer.lastPushRead, std::memory_order_relaxed);
|
|
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
|
|
}
|
|
}
|
|
auto result = &slots[consumer.popsNonAtomic++ & slotCountMask];
|
|
// We only have to write pops with memory order if we've run out of items.
|
|
// We know that we'll eventually run out.
|
|
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
|
// seq_cst so that the notify can't be ordered before the store
|
|
consumer.pops.store(consumer.popsNonAtomic, std::memory_order_seq_cst);
|
|
consumer.pops.notify_one();
|
|
}
|
|
return result;
|
|
}
|
|
|
|
private:
|
|
const uint32_t slotCount;
|
|
const uint32_t slotCountMask;
|
|
T *slots;
|
|
struct alignas(kCacheLine) ProducerState {
|
|
std::atomic<uint32_t> pushes{0};
|
|
uint32_t pushesNonAtomic{0};
|
|
uint32_t lastPopRead{0};
|
|
};
|
|
struct alignas(kCacheLine) ConsumerState {
|
|
std::atomic<uint32_t> pops{0};
|
|
uint32_t popsNonAtomic{0};
|
|
uint32_t lastPushRead{0};
|
|
};
|
|
ProducerState producer;
|
|
ConsumerState consumer;
|
|
};
|
|
|
|
std::atomic<int64_t> transactions;
|
|
|
|
int64_t safeUnaryMinus(int64_t x) {
|
|
return x == std::numeric_limits<int64_t>::min() ? x : -x;
|
|
}
|
|
|
|
void tupleAppend(std::string &output, int64_t value) {
|
|
if (value == 0) {
|
|
output.push_back(0x14);
|
|
return;
|
|
}
|
|
uint32_t size = 8 - __builtin_clrsbll(value) / 8;
|
|
int typeCode = 0x14 + (value < 0 ? -1 : 1) * size;
|
|
output.push_back(typeCode);
|
|
if (value < 0) {
|
|
value = ~safeUnaryMinus(value);
|
|
}
|
|
uint64_t swap = __builtin_bswap64(value);
|
|
output.insert(output.end(), (uint8_t *)&swap + 8 - size,
|
|
(uint8_t *)&swap + 8);
|
|
}
|
|
|
|
void tupleAppend(std::string &output, std::string_view value) {
|
|
output.push_back('\x02');
|
|
if (memchr(value.data(), '\x00', value.size()) != nullptr) {
|
|
for (auto c : value) {
|
|
if (c == '\x00') {
|
|
output.push_back('\x00');
|
|
output.push_back('\xff');
|
|
} else {
|
|
output.push_back(c);
|
|
}
|
|
}
|
|
} else {
|
|
output.insert(output.end(), value.begin(), value.end());
|
|
}
|
|
output.push_back('\x00');
|
|
}
|
|
|
|
template <class... Ts> std::string tupleKey(const Ts &...ts) {
|
|
std::string result;
|
|
(tupleAppend(result, ts), ...);
|
|
return result;
|
|
}
|
|
|
|
constexpr int kTotalKeyRange = 1'000'000'000;
|
|
constexpr int kWindowSize = 1'000'000;
|
|
constexpr int kNumReadKeysPerTx = 5;
|
|
constexpr int kNumWriteKeysPerTx = 10;
|
|
|
|
struct Transaction {
|
|
std::vector<std::string> keys;
|
|
std::vector<weaselab::ConflictSet::ReadRange> reads;
|
|
std::vector<weaselab::ConflictSet::WriteRange> writes;
|
|
int64_t version;
|
|
int64_t oldestVersion;
|
|
Transaction() = default;
|
|
explicit Transaction(int64_t version)
|
|
: version(version), oldestVersion(version - kWindowSize) {
|
|
std::vector<int64_t> keyIndices;
|
|
for (int i = 0; i < std::max(kNumReadKeysPerTx, kNumWriteKeysPerTx); ++i) {
|
|
keyIndices.push_back(rand() % kTotalKeyRange);
|
|
}
|
|
std::sort(keyIndices.begin(), keyIndices.end());
|
|
constexpr std::string_view fullString =
|
|
"this is a string, where a prefix of it is used as an element of the "
|
|
"tuple forming the key";
|
|
for (int i = 0; i < int(keyIndices.size()); ++i) {
|
|
keys.push_back(
|
|
tupleKey(0x100, keyIndices[i] / fullString.size(),
|
|
fullString.substr(0, keyIndices[i] % fullString.size())));
|
|
// printf("%s\n", printable(keys.back()).c_str());
|
|
}
|
|
for (int i = 0; i < kNumWriteKeysPerTx; ++i) {
|
|
writes.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())},
|
|
{nullptr, 0}});
|
|
}
|
|
reads.push_back({{(const uint8_t *)keys[0].data(), int(keys[0].size())},
|
|
{(const uint8_t *)keys[1].data(), int(keys[1].size())},
|
|
version - std::min(10, kWindowSize)});
|
|
static_assert(kNumReadKeysPerTx >= 3);
|
|
for (int i = 2; i < kNumReadKeysPerTx; ++i) {
|
|
reads.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())},
|
|
{nullptr, 0},
|
|
version - kWindowSize});
|
|
}
|
|
}
|
|
|
|
Transaction(Transaction &&) = default;
|
|
Transaction &operator=(Transaction &&) = default;
|
|
Transaction(Transaction const &) = delete;
|
|
Transaction const &operator=(Transaction const &) = delete;
|
|
};
|
|
|
|
struct Resolver {
|
|
|
|
void resolve(const weaselab::ConflictSet::ReadRange *reads, int readCount,
|
|
const weaselab::ConflictSet::WriteRange *writes, int writeCount,
|
|
int64_t newVersion, int64_t newOldestVersion) {
|
|
results.resize(readCount);
|
|
cs.check(reads, results.data(), readCount);
|
|
cs.addWrites(writes, writeCount, newVersion);
|
|
cs.setOldestVersion(newOldestVersion);
|
|
}
|
|
|
|
ConflictSet cs{0};
|
|
|
|
private:
|
|
std::vector<weaselab::ConflictSet::Result> results;
|
|
};
|
|
|
|
// Adapted from getaddrinfo man page
|
|
int getListenFd(const char *node, const char *service) {
|
|
|
|
struct addrinfo hints;
|
|
struct addrinfo *result, *rp;
|
|
int sfd, s;
|
|
|
|
memset(&hints, 0, sizeof(hints));
|
|
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
|
|
hints.ai_socktype = SOCK_STREAM; /* stream socket */
|
|
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
|
|
hints.ai_protocol = 0; /* Any protocol */
|
|
hints.ai_canonname = nullptr;
|
|
hints.ai_addr = nullptr;
|
|
hints.ai_next = nullptr;
|
|
|
|
s = getaddrinfo(node, service, &hints, &result);
|
|
if (s != 0) {
|
|
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
|
|
abort();
|
|
}
|
|
|
|
/* getaddrinfo() returns a list of address structures.
|
|
Try each address until we successfully bind(2).
|
|
If socket(2) (or bind(2)) fails, we (close the socket
|
|
and) try the next address. */
|
|
|
|
for (rp = result; rp != nullptr; rp = rp->ai_next) {
|
|
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
|
|
if (sfd == -1) {
|
|
continue;
|
|
}
|
|
|
|
int val = 1;
|
|
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
|
|
|
|
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
|
|
break; /* Success */
|
|
}
|
|
|
|
close(sfd);
|
|
}
|
|
|
|
freeaddrinfo(result); /* No longer needed */
|
|
|
|
if (rp == nullptr) { /* No address succeeded */
|
|
fprintf(stderr, "Could not bind\n");
|
|
abort();
|
|
}
|
|
|
|
int rv = listen(sfd, SOMAXCONN);
|
|
if (rv) {
|
|
perror("listen()");
|
|
abort();
|
|
}
|
|
|
|
return sfd;
|
|
}
|
|
|
|
// HTTP response
|
|
//
|
|
std::string_view part1 =
|
|
"HTTP/1.1 200 OK \r\nContent-type: text/plain; version=0.0.4; "
|
|
"charset=utf-8; escaping=values\r\nContent-Length: ";
|
|
// Decimal content length
|
|
std::string_view part2 = "\r\n\r\n";
|
|
// Body
|
|
|
|
double toSeconds(timeval t) {
|
|
return double(t.tv_sec) + double(t.tv_usec) * 1e-6;
|
|
}
|
|
|
|
#ifdef __linux__
|
|
#include <linux/perf_event.h>
|
|
struct PerfCounter {
|
|
PerfCounter(int type, int config, const std::string &labels = {},
|
|
int groupLeaderFd = -1)
|
|
: labels(labels) {
|
|
struct perf_event_attr pe;
|
|
|
|
memset(&pe, 0, sizeof(pe));
|
|
pe.type = type;
|
|
pe.size = sizeof(pe);
|
|
pe.config = config;
|
|
pe.inherit = 1;
|
|
pe.exclude_kernel = 1;
|
|
pe.exclude_hv = 1;
|
|
|
|
fd = perf_event_open(&pe, 0, -1, groupLeaderFd, 0);
|
|
if (fd < 0 && errno != ENOENT && errno != EINVAL) {
|
|
perror(labels.c_str());
|
|
}
|
|
}
|
|
|
|
int64_t total() const {
|
|
int64_t count;
|
|
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
|
|
perror("read instructions from perf");
|
|
abort();
|
|
}
|
|
return count;
|
|
}
|
|
|
|
PerfCounter(PerfCounter &&other)
|
|
: fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {}
|
|
PerfCounter &operator=(PerfCounter &&other) {
|
|
fd = std::exchange(other.fd, -1);
|
|
labels = std::move(other.labels);
|
|
return *this;
|
|
}
|
|
|
|
~PerfCounter() {
|
|
if (fd >= 0) {
|
|
close(fd);
|
|
}
|
|
}
|
|
|
|
bool ok() const { return fd >= 0; }
|
|
const std::string &getLabels() const { return labels; }
|
|
int getFd() const { return fd; }
|
|
|
|
private:
|
|
int fd;
|
|
std::string labels;
|
|
static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
|
|
int cpu, int group_fd, unsigned long flags) {
|
|
int ret;
|
|
|
|
ret = syscall(SYS_perf_event_open, hw_event, pid, cpu, group_fd, flags);
|
|
return ret;
|
|
}
|
|
};
|
|
#endif
|
|
|
|
int main(int argc, char **argv) {
|
|
if (argc != 3) {
|
|
goto fail;
|
|
}
|
|
{
|
|
int listenFd = getListenFd(argv[1], argv[2]);
|
|
|
|
Resolver resolver;
|
|
auto &cs = resolver.cs;
|
|
weaselab::ConflictSet::MetricsV1 *metrics;
|
|
int metricsCount;
|
|
cs.getMetricsV1(&metrics, &metricsCount);
|
|
|
|
#ifdef __linux__
|
|
PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS};
|
|
PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES, "",
|
|
instructions.getFd()};
|
|
|
|
std::vector<PerfCounter> cacheCounters;
|
|
for (auto [id, idStr] : std::initializer_list<std::pair<int, std::string>>{
|
|
{PERF_COUNT_HW_CACHE_L1D, "l1d"},
|
|
{PERF_COUNT_HW_CACHE_L1I, "l1i"},
|
|
{PERF_COUNT_HW_CACHE_LL, "ll"},
|
|
{PERF_COUNT_HW_CACHE_DTLB, "dtlb"},
|
|
{PERF_COUNT_HW_CACHE_ITLB, "itlb"},
|
|
{PERF_COUNT_HW_CACHE_BPU, "bpu"},
|
|
{PERF_COUNT_HW_CACHE_NODE, "node"},
|
|
}) {
|
|
for (auto [op, opStr] :
|
|
std::initializer_list<std::pair<int, std::string>>{
|
|
{PERF_COUNT_HW_CACHE_OP_READ, "read"},
|
|
{PERF_COUNT_HW_CACHE_OP_WRITE, "write"},
|
|
{PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"},
|
|
}) {
|
|
int groupLeaderFd = -1;
|
|
for (auto [result, resultStr] :
|
|
std::initializer_list<std::pair<int, std::string>>{
|
|
{PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"},
|
|
{PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"},
|
|
}) {
|
|
auto labels = "{id=\"" + idStr + "\", op=\"" + opStr +
|
|
"\", result=\"" + resultStr + "\"}";
|
|
cacheCounters.emplace_back(PERF_TYPE_HW_CACHE,
|
|
id | (op << 8) | (result << 16), labels,
|
|
groupLeaderFd);
|
|
if (!cacheCounters.back().ok()) {
|
|
cacheCounters.pop_back();
|
|
} else {
|
|
if (groupLeaderFd == -1) {
|
|
groupLeaderFd = cacheCounters.back().getFd();
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
#endif
|
|
|
|
TxQueue<Transaction> queue{10};
|
|
|
|
auto workloadThread = std::thread{[&]() {
|
|
for (int64_t version = kWindowSize;;
|
|
++version, transactions.fetch_add(1, std::memory_order_relaxed)) {
|
|
queue.push(Transaction(version));
|
|
}
|
|
}};
|
|
|
|
auto resolverThread = std::thread{[&]() {
|
|
for (;;) {
|
|
auto tx = queue.pop();
|
|
resolver.resolve(tx->reads.data(), tx->reads.size(), tx->writes.data(),
|
|
tx->writes.size(), tx->version, tx->oldestVersion);
|
|
}
|
|
}};
|
|
|
|
for (;;) {
|
|
struct sockaddr_storage peer_addr = {};
|
|
socklen_t peer_addr_len = sizeof(peer_addr);
|
|
const int connfd =
|
|
accept(listenFd, (struct sockaddr *)&peer_addr, &peer_addr_len);
|
|
|
|
std::string body;
|
|
|
|
rusage r;
|
|
getrusage(RUSAGE_SELF, &r);
|
|
body += "# HELP process_cpu_seconds_total Total user and system CPU time "
|
|
"spent in seconds.\n# TYPE process_cpu_seconds_total counter\n"
|
|
"process_cpu_seconds_total ";
|
|
body += std::to_string(toSeconds(r.ru_utime) + toSeconds(r.ru_stime));
|
|
body += "\n";
|
|
body += "# HELP process_resident_memory_bytes Resident memory size in "
|
|
"bytes.\n# TYPE process_resident_memory_bytes gauge\n"
|
|
"process_resident_memory_bytes ";
|
|
body += std::to_string(getCurrentRSS());
|
|
body += "\n";
|
|
body += "# HELP transactions_total Total number of transactions\n"
|
|
"# TYPE transactions_total counter\n"
|
|
"transactions_total ";
|
|
body += std::to_string(transactions.load(std::memory_order_relaxed));
|
|
body += "\n";
|
|
#ifdef __linux__
|
|
body += "# HELP instructions_total Total number of instructions\n"
|
|
"# TYPE instructions_total counter\n"
|
|
"instructions_total ";
|
|
body += std::to_string(instructions.total());
|
|
body += "\n";
|
|
body += "# HELP cycles_total Total number of cycles\n"
|
|
"# TYPE cycles_total counter\n"
|
|
"cycles_total ";
|
|
body += std::to_string(cycles.total());
|
|
body += "\n";
|
|
body += "# HELP cache_event_total Total number of cache events\n"
|
|
"# TYPE cache_event_total counter\n";
|
|
for (const auto &counter : cacheCounters) {
|
|
body += "cache_event_total" + counter.getLabels() + " " +
|
|
std::to_string(counter.total()) + "\n";
|
|
}
|
|
#endif
|
|
|
|
for (int i = 0; i < metricsCount; ++i) {
|
|
body += "# HELP ";
|
|
body += metrics[i].name;
|
|
body += " ";
|
|
body += metrics[i].help;
|
|
body += "\n";
|
|
body += "# TYPE ";
|
|
body += metrics[i].name;
|
|
body += " ";
|
|
body += metrics[i].type == metrics[i].Counter ? "counter" : "gauge";
|
|
body += "\n";
|
|
body += metrics[i].name;
|
|
body += " ";
|
|
body += std::to_string(metrics[i].getValue());
|
|
body += "\n";
|
|
}
|
|
|
|
auto len = std::to_string(body.size());
|
|
iovec iov[] = {
|
|
{(void *)part1.data(), part1.size()},
|
|
{(void *)len.data(), len.size()},
|
|
{(void *)part2.data(), part2.size()},
|
|
{(void *)body.data(), body.size()},
|
|
};
|
|
int written;
|
|
do {
|
|
written = writev(connfd, iov, sizeof(iov) / sizeof(iov[0]));
|
|
} while (written < 0 && errno == EINTR);
|
|
close(connfd);
|
|
}
|
|
}
|
|
fail:
|
|
fprintf(stderr, "Expected ./%s <host> <port>\n", argv[0]);
|
|
return 1;
|
|
} |