#include #include #include #include #include #include #include #include #include #include #include #include #include #include "ConflictSet.h" #include "third_party/nadeau.h" std::atomic transactions; constexpr int kBaseSearchDepth = 32; constexpr int kWindowSize = 10000000; std::basic_string numToKey(int64_t num) { std::basic_string result; result.resize(kBaseSearchDepth + sizeof(int64_t)); memset(result.data(), 0, kBaseSearchDepth); int64_t be = __builtin_bswap64(num); memcpy(result.data() + kBaseSearchDepth, &be, sizeof(int64_t)); return result; } void workload(weaselab::ConflictSet *cs) { int64_t version = kWindowSize; cs->addWrites(nullptr, 0, version); for (;; transactions.fetch_add(1, std::memory_order_relaxed)) { // Reads { auto beginK = numToKey(version - kWindowSize); auto endK = numToKey(version - 1); auto pointRv = version - kWindowSize + rand() % kWindowSize + 1; auto pointK = numToKey(pointRv); weaselab::ConflictSet::ReadRange reads[] = { { {pointK.data(), int(pointK.size())}, {nullptr, 0}, pointRv, }, { {beginK.data(), int(beginK.size())}, {endK.data(), int(endK.size())}, version - 2, }, }; weaselab::ConflictSet::Result result[sizeof(reads) / sizeof(reads[0])]; cs->check(reads, result, sizeof(reads) / sizeof(reads[0])); // for (int i = 0; i < sizeof(reads) / sizeof(reads[0]); ++i) { // if (result[i] != weaselab::ConflictSet::Commit) { // fprintf(stderr, "Unexpected conflict: [%s, %s) @ %" PRId64 "\n", // printable(reads[i].begin).c_str(), // printable(reads[i].end).c_str(), reads[i].readVersion); // abort(); // } // } } // Writes { weaselab::ConflictSet::WriteRange w; auto k = numToKey(version); w.begin.p = k.data(); w.end.len = 0; if (version % (kWindowSize / 2) == 0) { for (int l = 0; l <= k.size(); ++l) { w.begin.len = l; cs->addWrites(&w, 1, version); } } else { w.begin.len = k.size(); cs->addWrites(&w, 1, version); } } // GC cs->setOldestVersion(version - kWindowSize); ++version; } } // Adapted from getaddrinfo man page int getListenFd(const char *node, const char *service) { struct addrinfo hints; struct addrinfo *result, *rp; int sfd, s; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ hints.ai_canonname = nullptr; hints.ai_addr = nullptr; hints.ai_next = nullptr; s = getaddrinfo(node, service, &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); abort(); } /* getaddrinfo() returns a list of address structures. Try each address until we successfully bind(2). If socket(2) (or bind(2)) fails, we (close the socket and) try the next address. */ for (rp = result; rp != nullptr; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } int val = 1; setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } close(sfd); } freeaddrinfo(result); /* No longer needed */ if (rp == nullptr) { /* No address succeeded */ fprintf(stderr, "Could not bind\n"); abort(); } int rv = listen(sfd, SOMAXCONN); if (rv) { perror("listen()"); abort(); } return sfd; } // HTTP response // std::string_view part1 = "HTTP/1.1 200 OK \r\nContent-type: text/plain; version=0.0.4; " "charset=utf-8; escaping=values\r\nContent-Length: "; // Decimal content length std::string_view part2 = "\r\n\r\n"; // Body double toSeconds(timeval t) { return double(t.tv_sec) + double(t.tv_usec) * 1e-6; } int main(int argc, char **argv) { if (argc != 3) { goto fail; } { int listenFd = getListenFd(argv[1], argv[2]); weaselab::ConflictSet cs{0}; weaselab::ConflictSet::MetricsV1 *metrics; int metricsCount; cs.getMetricsV1(&metrics, &metricsCount); auto w = std::thread{workload, &cs}; for (;;) { struct sockaddr_storage peer_addr = {}; socklen_t peer_addr_len = sizeof(peer_addr); const int connfd = accept(listenFd, (struct sockaddr *)&peer_addr, &peer_addr_len); std::string body; rusage r; getrusage(RUSAGE_SELF, &r); body += "# HELP process_cpu_seconds_total Total user and system CPU time " "spent in seconds.\n# TYPE process_cpu_seconds_total counter\n" "process_cpu_seconds_total "; body += std::to_string(toSeconds(r.ru_utime) + toSeconds(r.ru_stime)); body += "\n"; body += "# HELP process_resident_memory_bytes Resident memory size in " "bytes.\n# TYPE process_resident_memory_bytes gauge\n" "process_resident_memory_bytes "; body += std::to_string(getCurrentRSS()); body += "\n"; body += "# HELP transactions_total Total number of transactions\n" "# TYPE transactions_total counter\n" "transactions_total "; body += std::to_string(transactions.load(std::memory_order_relaxed)); body += "\n"; for (int i = 0; i < metricsCount; ++i) { body += "# HELP "; body += metrics[i].name; body += " "; body += metrics[i].help; body += "\n"; body += "# TYPE "; body += metrics[i].name; body += " "; body += metrics[i].type == metrics[i].Counter ? "counter" : "gauge"; body += "\n"; body += metrics[i].name; body += " "; body += std::to_string(metrics[i].getValue()); body += "\n"; } auto len = std::to_string(body.size()); iovec iov[] = { {(void *)part1.data(), part1.size()}, {(void *)len.data(), len.size()}, {(void *)part2.data(), part2.size()}, {(void *)body.data(), body.size()}, }; int written; do { written = writev(connfd, iov, sizeof(iov) / sizeof(iov[0])); } while (written < 0 && errno == EINTR); close(connfd); } } fail: fprintf(stderr, "Expected ./%s \n", argv[0]); return 1; }