Isolate conflict-set on one thread in server_bench
Some checks reported errors
Tests / 64 bit versions total: 8097, passed: 8097
Tests / Debug total: 8095, passed: 8095
Tests / SIMD fallback total: 8097, passed: 8097
Tests / Release [clang] total: 8097, passed: 8097
Clang |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / gcc total: 8097, passed: 8097
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend |:-:|:-:|:-:|:-:|:-: |0|0|0|0|:clap:
Tests / Release [clang,aarch64] total: 5366, passed: 5366
Tests / Coverage total: 5414, passed: 5414
weaselab/conflict-set/pipeline/head Something is wrong with the build of this commit

This commit is contained in:
2024-11-13 22:46:11 -08:00
parent c5ef843f9e
commit dcc5275ec9
2 changed files with 143 additions and 18 deletions

View File

@@ -23,6 +23,97 @@
#include "Internal.h"
#include "third_party/nadeau.h"
constexpr int kCacheLine = 64; // TODO mac m1 is 128
template <class T> struct TxQueue {
explicit TxQueue(int lgSlotCount)
: slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1),
slots(new T[slotCount]) {
// Otherwise we can't tell the difference between full and empty.
assert(!(slotCountMask & 0x80000000));
}
/// Call from producer thread, after ensuring consumer is no longer accessing
/// it somehow
~TxQueue() { delete[] slots; }
/// Must be called from the producer thread
void push(T t) {
if (wouldBlock()) {
// Wait for pops to change and try again
consumer.pops.wait(producer.lastPopRead, std::memory_order_relaxed);
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
}
slots[producer.pushesNonAtomic++ & slotCountMask] = std::move(t);
// seq_cst so that the notify can't be ordered before the store
producer.pushes.store(producer.pushesNonAtomic, std::memory_order_seq_cst);
// We have to notify every time, since we don't know if this is the last
// push ever
producer.pushes.notify_one();
}
/// Must be called from the producer thread
uint32_t outstanding() {
return producer.pushesNonAtomic -
consumer.pops.load(std::memory_order_relaxed);
}
/// Returns true if a call to push might block. Must be called from the
/// producer thread.
bool wouldBlock() {
// See if we can determine that overflow won't happen entirely from state
// local to the producer
if (producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1) {
// Re-read pops with memory order
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
return producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1;
}
return false;
}
/// Valid until the next pop, or until this queue is destroyed.
T *pop() {
// See if we can determine that there's an entry we can pop entirely from
// state local to the consumer
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
// Re-read pushes with memory order and try again
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
// Wait for pushes to change and try again
producer.pushes.wait(consumer.lastPushRead, std::memory_order_relaxed);
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
}
}
auto result = &slots[consumer.popsNonAtomic++ & slotCountMask];
// We only have to write pops with memory order if we've run out of items.
// We know that we'll eventually run out.
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
// seq_cst so that the notify can't be ordered before the store
consumer.pops.store(consumer.popsNonAtomic, std::memory_order_seq_cst);
consumer.pops.notify_one();
}
return result;
}
private:
const uint32_t slotCount;
const uint32_t slotCountMask;
T *slots;
struct alignas(kCacheLine) ProducerState {
std::atomic<uint32_t> pushes{0};
uint32_t pushesNonAtomic{0};
uint32_t lastPopRead{0};
};
struct alignas(kCacheLine) ConsumerState {
std::atomic<uint32_t> pops{0};
uint32_t popsNonAtomic{0};
uint32_t lastPushRead{0};
};
ProducerState producer;
ConsumerState consumer;
};
std::atomic<int64_t> transactions;
int64_t safeUnaryMinus(int64_t x) {
@@ -69,15 +160,20 @@ constexpr int kWindowSize = 1'000'000;
constexpr int kNumReadKeysPerTx = 10;
constexpr int kNumWriteKeysPerTx = 5;
void workload(weaselab::ConflictSet *cs) {
int64_t version = kWindowSize;
for (;; transactions.fetch_add(1, std::memory_order_relaxed)) {
struct Transaction {
std::vector<std::string> keys;
std::vector<weaselab::ConflictSet::ReadRange> reads;
std::vector<weaselab::ConflictSet::WriteRange> writes;
int64_t version;
int64_t oldestVersion;
Transaction() = default;
explicit Transaction(int64_t version)
: version(version), oldestVersion(version - kWindowSize) {
std::vector<int64_t> keyIndices;
for (int i = 0; i < std::max(kNumReadKeysPerTx, kNumWriteKeysPerTx); ++i) {
keyIndices.push_back(rand() % kTotalKeyRange);
}
std::sort(keyIndices.begin(), keyIndices.end());
std::vector<std::string> keys;
constexpr std::string_view fullString =
"this is a string, where a prefix of it is used as an element of the "
"tuple forming the key";
@@ -87,10 +183,6 @@ void workload(weaselab::ConflictSet *cs) {
fullString.substr(0, keyIndices[i] % fullString.size())));
// printf("%s\n", printable(keys.back()).c_str());
}
std::vector<weaselab::ConflictSet::ReadRange> reads;
std::vector<weaselab::ConflictSet::WriteRange> writes;
std::vector<weaselab::ConflictSet::Result> results;
for (int i = 0; i < kNumWriteKeysPerTx; ++i) {
writes.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())},
{nullptr, 0}});
@@ -104,13 +196,30 @@ void workload(weaselab::ConflictSet *cs) {
{nullptr, 0},
version - kWindowSize});
}
results.resize(reads.size());
cs->check(reads.data(), results.data(), reads.size());
cs->addWrites(writes.data(), writes.size(), version);
cs->setOldestVersion(version - kWindowSize);
++version;
}
}
Transaction(Transaction &&) = default;
Transaction &operator=(Transaction &&) = default;
Transaction(Transaction const &) = delete;
Transaction const &operator=(Transaction const &) = delete;
};
struct Resolver {
void resolve(const weaselab::ConflictSet::ReadRange *reads, int readCount,
const weaselab::ConflictSet::WriteRange *writes, int writeCount,
int64_t newVersion, int64_t newOldestVersion) {
results.resize(readCount);
cs.check(reads, results.data(), readCount);
cs.addWrites(writes, writeCount, newVersion);
cs.setOldestVersion(newOldestVersion);
}
ConflictSet cs{0};
private:
std::vector<weaselab::ConflictSet::Result> results;
};
// Adapted from getaddrinfo man page
int getListenFd(const char *node, const char *service) {
@@ -253,7 +362,8 @@ int main(int argc, char **argv) {
{
int listenFd = getListenFd(argv[1], argv[2]);
weaselab::ConflictSet cs{0};
Resolver resolver;
auto &cs = resolver.cs;
weaselab::ConflictSet::MetricsV1 *metrics;
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
@@ -302,7 +412,23 @@ int main(int argc, char **argv) {
}
#endif
auto w = std::thread{workload, &cs};
TxQueue<std::unique_ptr<Transaction>> queue{10};
auto workloadThread = std::thread{[&]() {
for (int64_t version = kWindowSize;;
++version, transactions.fetch_add(1, std::memory_order_relaxed)) {
auto tx = std::make_unique<Transaction>(version);
queue.push(std::move(tx));
}
}};
auto resolverThread = std::thread{[&]() {
for (;;) {
auto tx = queue.pop()->get();
resolver.resolve(tx->reads.data(), tx->reads.size(), tx->writes.data(),
tx->writes.size(), tx->version, tx->oldestVersion);
}
}};
for (;;) {
struct sockaddr_storage peer_addr = {};