#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "ConflictSet.h" #include "Internal.h" #include "third_party/nadeau.h" constexpr int kCacheLine = 64; // TODO mac m1 is 128 template struct TxQueue { explicit TxQueue(int lgSlotCount) : slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1), slots(new T[slotCount]) { // Otherwise we can't tell the difference between full and empty. assert(!(slotCountMask & 0x80000000)); } /// Call from producer thread, after ensuring consumer is no longer accessing /// it somehow ~TxQueue() { delete[] slots; } /// Must be called from the producer thread void push(T t) { if (wouldBlock()) { // Wait for pops to change and try again consumer.pops.wait(producer.lastPopRead, std::memory_order_relaxed); producer.lastPopRead = consumer.pops.load(std::memory_order_acquire); } slots[producer.pushesNonAtomic++ & slotCountMask] = std::move(t); // seq_cst so that the notify can't be ordered before the store producer.pushes.store(producer.pushesNonAtomic, std::memory_order_seq_cst); // We have to notify every time, since we don't know if this is the last // push ever producer.pushes.notify_one(); } /// Must be called from the producer thread uint32_t outstanding() { return producer.pushesNonAtomic - consumer.pops.load(std::memory_order_relaxed); } /// Returns true if a call to push might block. Must be called from the /// producer thread. bool wouldBlock() { // See if we can determine that overflow won't happen entirely from state // local to the producer if (producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1) { // Re-read pops with memory order producer.lastPopRead = consumer.pops.load(std::memory_order_acquire); return producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1; } return false; } /// Valid until the next pop, or until this queue is destroyed. T *pop() { // See if we can determine that there's an entry we can pop entirely from // state local to the consumer if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { // Re-read pushes with memory order and try again consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire); if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { // Wait for pushes to change and try again producer.pushes.wait(consumer.lastPushRead, std::memory_order_relaxed); consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire); } } auto result = &slots[consumer.popsNonAtomic++ & slotCountMask]; // We only have to write pops with memory order if we've run out of items. // We know that we'll eventually run out. if (consumer.lastPushRead - consumer.popsNonAtomic == 0) { // seq_cst so that the notify can't be ordered before the store consumer.pops.store(consumer.popsNonAtomic, std::memory_order_seq_cst); consumer.pops.notify_one(); } return result; } private: const uint32_t slotCount; const uint32_t slotCountMask; T *slots; struct alignas(kCacheLine) ProducerState { std::atomic pushes{0}; uint32_t pushesNonAtomic{0}; uint32_t lastPopRead{0}; }; struct alignas(kCacheLine) ConsumerState { std::atomic pops{0}; uint32_t popsNonAtomic{0}; uint32_t lastPushRead{0}; }; ProducerState producer; ConsumerState consumer; }; std::atomic transactions; int64_t safeUnaryMinus(int64_t x) { return x == std::numeric_limits::min() ? x : -x; } void tupleAppend(std::string &output, int64_t value) { if (value == 0) { output.push_back(0x14); return; } uint32_t size = 8 - __builtin_clrsbll(value) / 8; int typeCode = 0x14 + (value < 0 ? -1 : 1) * size; output.push_back(typeCode); if (value < 0) { value = ~safeUnaryMinus(value); } uint64_t swap = __builtin_bswap64(value); output.insert(output.end(), (uint8_t *)&swap + 8 - size, (uint8_t *)&swap + 8); } void tupleAppend(std::string &output, std::string_view value) { output.push_back('\x02'); if (memchr(value.data(), '\x00', value.size()) != nullptr) { for (auto c : value) { if (c == '\x00') { output.push_back('\x00'); output.push_back('\xff'); } else { output.push_back(c); } } } else { output.insert(output.end(), value.begin(), value.end()); } output.push_back('\x00'); } template std::string tupleKey(const Ts &...ts) { std::string result; (tupleAppend(result, ts), ...); return result; } constexpr int kTotalKeyRange = 1'000'000'000; constexpr int kWindowSize = 1'000'000; constexpr int kNumReadKeysPerTx = 5; constexpr int kNumWriteKeysPerTx = 10; struct Transaction { std::vector keys; std::vector reads; std::vector writes; int64_t version; int64_t oldestVersion; Transaction() = default; explicit Transaction(int64_t version) : version(version), oldestVersion(version - kWindowSize) { std::vector keyIndices; for (int i = 0; i < std::max(kNumReadKeysPerTx, kNumWriteKeysPerTx); ++i) { keyIndices.push_back(rand() % kTotalKeyRange); } std::sort(keyIndices.begin(), keyIndices.end()); constexpr std::string_view fullString = "this is a string, where a prefix of it is used as an element of the " "tuple forming the key"; for (int i = 0; i < int(keyIndices.size()); ++i) { keys.push_back( tupleKey(0x100, keyIndices[i] / fullString.size(), fullString.substr(0, keyIndices[i] % fullString.size()))); // printf("%s\n", printable(keys.back()).c_str()); } for (int i = 0; i < kNumWriteKeysPerTx; ++i) { writes.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())}, {nullptr, 0}}); } reads.push_back({{(const uint8_t *)keys[0].data(), int(keys[0].size())}, {(const uint8_t *)keys[1].data(), int(keys[1].size())}, version - std::min(10, kWindowSize)}); static_assert(kNumReadKeysPerTx >= 3); for (int i = 2; i < kNumReadKeysPerTx; ++i) { reads.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())}, {nullptr, 0}, version - kWindowSize}); } } Transaction(Transaction &&) = default; Transaction &operator=(Transaction &&) = default; Transaction(Transaction const &) = delete; Transaction const &operator=(Transaction const &) = delete; }; struct Resolver { void resolve(const weaselab::ConflictSet::ReadRange *reads, int readCount, const weaselab::ConflictSet::WriteRange *writes, int writeCount, int64_t newVersion, int64_t newOldestVersion) { results.resize(readCount); cs.check(reads, results.data(), readCount); cs.addWrites(writes, writeCount, newVersion); cs.setOldestVersion(newOldestVersion); } ConflictSet cs{0}; private: std::vector results; }; // Adapted from getaddrinfo man page int getListenFd(const char *node, const char *service) { struct addrinfo hints; struct addrinfo *result, *rp; int sfd, s; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ hints.ai_canonname = nullptr; hints.ai_addr = nullptr; hints.ai_next = nullptr; s = getaddrinfo(node, service, &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); abort(); } /* getaddrinfo() returns a list of address structures. Try each address until we successfully bind(2). If socket(2) (or bind(2)) fails, we (close the socket and) try the next address. */ for (rp = result; rp != nullptr; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } int val = 1; setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } close(sfd); } freeaddrinfo(result); /* No longer needed */ if (rp == nullptr) { /* No address succeeded */ fprintf(stderr, "Could not bind\n"); abort(); } int rv = listen(sfd, SOMAXCONN); if (rv) { perror("listen()"); abort(); } return sfd; } // HTTP response // std::string_view part1 = "HTTP/1.1 200 OK \r\nContent-type: text/plain; version=0.0.4; " "charset=utf-8; escaping=values\r\nContent-Length: "; // Decimal content length std::string_view part2 = "\r\n\r\n"; // Body double toSeconds(timeval t) { return double(t.tv_sec) + double(t.tv_usec) * 1e-6; } #ifdef __linux__ #include struct PerfCounter { PerfCounter(int type, int config, const std::string &labels = {}, int groupLeaderFd = -1) : labels(labels) { struct perf_event_attr pe; memset(&pe, 0, sizeof(pe)); pe.type = type; pe.size = sizeof(pe); pe.config = config; pe.inherit = 1; pe.exclude_kernel = 1; pe.exclude_hv = 1; fd = perf_event_open(&pe, 0, -1, groupLeaderFd, 0); if (fd < 0 && errno != ENOENT && errno != EINVAL) { perror(labels.c_str()); } } int64_t total() const { int64_t count; if (read(fd, &count, sizeof(count)) != sizeof(count)) { perror("read instructions from perf"); abort(); } return count; } PerfCounter(PerfCounter &&other) : fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {} PerfCounter &operator=(PerfCounter &&other) { fd = std::exchange(other.fd, -1); labels = std::move(other.labels); return *this; } ~PerfCounter() { if (fd >= 0) { close(fd); } } bool ok() const { return fd >= 0; } const std::string &getLabels() const { return labels; } int getFd() const { return fd; } private: int fd; std::string labels; static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid, int cpu, int group_fd, unsigned long flags) { int ret; ret = syscall(SYS_perf_event_open, hw_event, pid, cpu, group_fd, flags); return ret; } }; #endif int main(int argc, char **argv) { if (argc != 3) { goto fail; } { int listenFd = getListenFd(argv[1], argv[2]); Resolver resolver; auto &cs = resolver.cs; weaselab::ConflictSet::MetricsV1 *metrics; int metricsCount; cs.getMetricsV1(&metrics, &metricsCount); #ifdef __linux__ PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS}; PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES, "", instructions.getFd()}; std::vector cacheCounters; for (auto [id, idStr] : std::initializer_list>{ {PERF_COUNT_HW_CACHE_L1D, "l1d"}, {PERF_COUNT_HW_CACHE_L1I, "l1i"}, {PERF_COUNT_HW_CACHE_LL, "ll"}, {PERF_COUNT_HW_CACHE_DTLB, "dtlb"}, {PERF_COUNT_HW_CACHE_ITLB, "itlb"}, {PERF_COUNT_HW_CACHE_BPU, "bpu"}, {PERF_COUNT_HW_CACHE_NODE, "node"}, }) { for (auto [op, opStr] : std::initializer_list>{ {PERF_COUNT_HW_CACHE_OP_READ, "read"}, {PERF_COUNT_HW_CACHE_OP_WRITE, "write"}, {PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"}, }) { int groupLeaderFd = -1; for (auto [result, resultStr] : std::initializer_list>{ {PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"}, {PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"}, }) { auto labels = "{id=\"" + idStr + "\", op=\"" + opStr + "\", result=\"" + resultStr + "\"}"; cacheCounters.emplace_back(PERF_TYPE_HW_CACHE, id | (op << 8) | (result << 16), labels, groupLeaderFd); if (!cacheCounters.back().ok()) { cacheCounters.pop_back(); } else { if (groupLeaderFd == -1) { groupLeaderFd = cacheCounters.back().getFd(); } } } } } #endif TxQueue queue{10}; auto workloadThread = std::thread{[&]() { for (int64_t version = kWindowSize;; ++version, transactions.fetch_add(1, std::memory_order_relaxed)) { queue.push(Transaction(version)); } }}; auto resolverThread = std::thread{[&]() { for (;;) { auto tx = queue.pop(); resolver.resolve(tx->reads.data(), tx->reads.size(), tx->writes.data(), tx->writes.size(), tx->version, tx->oldestVersion); } }}; for (;;) { struct sockaddr_storage peer_addr = {}; socklen_t peer_addr_len = sizeof(peer_addr); const int connfd = accept(listenFd, (struct sockaddr *)&peer_addr, &peer_addr_len); std::string body; rusage r; getrusage(RUSAGE_SELF, &r); body += "# HELP process_cpu_seconds_total Total user and system CPU time " "spent in seconds.\n# TYPE process_cpu_seconds_total counter\n" "process_cpu_seconds_total "; body += std::to_string(toSeconds(r.ru_utime) + toSeconds(r.ru_stime)); body += "\n"; body += "# HELP process_resident_memory_bytes Resident memory size in " "bytes.\n# TYPE process_resident_memory_bytes gauge\n" "process_resident_memory_bytes "; body += std::to_string(getCurrentRSS()); body += "\n"; body += "# HELP transactions_total Total number of transactions\n" "# TYPE transactions_total counter\n" "transactions_total "; body += std::to_string(transactions.load(std::memory_order_relaxed)); body += "\n"; #ifdef __linux__ body += "# HELP instructions_total Total number of instructions\n" "# TYPE instructions_total counter\n" "instructions_total "; body += std::to_string(instructions.total()); body += "\n"; body += "# HELP cycles_total Total number of cycles\n" "# TYPE cycles_total counter\n" "cycles_total "; body += std::to_string(cycles.total()); body += "\n"; body += "# HELP cache_event_total Total number of cache events\n" "# TYPE cache_event_total counter\n"; for (const auto &counter : cacheCounters) { body += "cache_event_total" + counter.getLabels() + " " + std::to_string(counter.total()) + "\n"; } #endif for (int i = 0; i < metricsCount; ++i) { body += "# HELP "; body += metrics[i].name; body += " "; body += metrics[i].help; body += "\n"; body += "# TYPE "; body += metrics[i].name; body += " "; body += metrics[i].type == metrics[i].Counter ? "counter" : "gauge"; body += "\n"; body += metrics[i].name; body += " "; body += std::to_string(metrics[i].getValue()); body += "\n"; } auto len = std::to_string(body.size()); iovec iov[] = { {(void *)part1.data(), part1.size()}, {(void *)len.data(), len.size()}, {(void *)part2.data(), part2.size()}, {(void *)body.data(), body.size()}, }; int written; do { written = writev(connfd, iov, sizeof(iov) / sizeof(iov[0])); } while (written < 0 && errno == EINTR); close(connfd); } } fail: fprintf(stderr, "Expected ./%s \n", argv[0]); return 1; }