Files
versioned-map/VersionedMap.cpp
2024-05-01 15:52:40 -07:00

429 lines
13 KiB
C++

#include "VersionedMap.h"
#include <assert.h>
#include <atomic>
#include <stdio.h>
#include <sys/mman.h>
#include <sys/types.h>
#include <unistd.h>
#include <unordered_set>
#include <xxhash.h>
void *mmapSafe(void *addr, size_t len, int prot, int flags, int fd,
off_t offset) {
void *result = mmap(addr, len, prot, flags, fd, offset);
if (result == MAP_FAILED) {
int err = errno; // GCOVR_EXCL_LINE
fprintf( // GCOVR_EXCL_LINE
stderr, // GCOVR_EXCL_LINE
"Error calling mmap(%p, %zu, %d, %d, %d, %jd): %d %s\n", // GCOVR_EXCL_LINE
addr, len, prot, flags, fd, (intmax_t)offset, err, // GCOVR_EXCL_LINE
strerror(err)); // GCOVR_EXCL_LINE
fflush(stderr); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
return result;
}
void mprotectSafe(void *p, size_t s, int prot) {
if (mprotect(p, s, prot) != 0) {
int err = errno; // GCOVR_EXCL_LINE
fprintf(stderr, // GCOVR_EXCL_LINE
"Error calling mprotect(%p, %zu, %d): %s\n", // GCOVR_EXCL_LINE
p, // GCOVR_EXCL_LINE
s, // GCOVR_EXCL_LINE
prot, // GCOVR_EXCL_LINE
strerror(err)); // GCOVR_EXCL_LINE
fflush(stderr); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
}
void munmapSafe(void *ptr, size_t size) {
if (munmap(ptr, size) != 0) {
int err = errno; // GCOVR_EXCL_LINE
fprintf(stderr, "Error calling munmap(%p, %zu): %s\n", // GCOVR_EXCL_LINE
ptr, // GCOVR_EXCL_LINE
size, // GCOVR_EXCL_LINE
strerror(err)); // GCOVR_EXCL_LINE
fflush(stderr); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
}
namespace weaselab {
struct Entry {
int64_t insertVersion;
int keyLen;
// Negative if this key is cleared
int valLen;
mutable int refCount;
uint32_t priority;
// True if mutations in (pred, this) are cleared. If false, (pred, this)
// should be read through to the underlying data structure.
bool clearTo;
// There's an extra zero byte past the end of getKey, used for
// reconstructing logical mutations without copies.
const uint8_t *getKey() const { return (const uint8_t *)(this + 1); }
const uint8_t *getVal() const {
return (const uint8_t *)(this + 1) + 1 + keyLen;
}
void addref() const { ++refCount; }
void delref() const {
if (--refCount == 0) {
free((void *)this);
}
}
static Entry *make(int64_t insertVersion, const uint8_t *key, int keyLen,
const uint8_t *val, int valLen, bool clearTo) {
auto e = (Entry *)malloc(sizeof(Entry) + keyLen + 1 + std::max(valLen, 0));
e->insertVersion = insertVersion;
e->keyLen = keyLen;
e->valLen = valLen;
e->refCount = 1;
e->priority = XXH3_64bits(key, keyLen);
e->clearTo = clearTo;
memcpy((uint8_t *)e->getKey(), key, keyLen);
((uint8_t *)e->getKey())[keyLen] = 0;
memcpy((uint8_t *)e->getVal(), val, std::max(valLen, 0));
return e;
}
};
struct Node {
union {
int64_t updateVersion;
uint32_t nextFree;
};
Entry *entry;
uint32_t pointers[3];
bool replacePointer;
std::atomic<bool> updated;
};
// Limit mmap to 32 GiB so valgrind doesn't complain.
// https://bugs.kde.org/show_bug.cgi?id=229500
constexpr size_t kMapSize = size_t(32) * (1 << 30);
const size_t kPageSize = sysconf(_SC_PAGESIZE);
const uint32_t kNodesPerPage = kPageSize / sizeof(Node);
const uint32_t kMinAddressable = kNodesPerPage;
constexpr uint32_t kUpsizeBytes = 1 << 20;
constexpr uint32_t kUpsizeNodes = kUpsizeBytes / sizeof(Node);
static_assert(kUpsizeNodes * sizeof(Node) == kUpsizeBytes);
struct BitSet {
explicit BitSet(uint32_t size) : words((uint64_t *)malloc(size / 64 + 64)) {}
bool test(uint32_t i) const {
return words[i >> 6] & (uint64_t(1) << (i & 63));
}
// Returns former value
bool set(uint32_t i) {
const auto prev = words[i >> 6];
const auto mask = uint64_t(1) << (i & 63);
words[i >> 6] |= mask;
max_ = std::max(i, max_);
return prev & mask;
}
// Returns 0 if set is empty
uint32_t max() const { return max_; }
template <class F>
void iterateAbsentApproxBackwards(F f, uint32_t begin, uint32_t end) const {
// TODO can this be improved? We can do something with a word at a time
// instead of a bit at a time. The first attempt at doing so benchmarked as
// slower.
assert(begin != 0);
for (uint32_t i = end - 1; i >= begin; --i) {
if (!test(i)) {
f(i);
}
}
}
~BitSet() { free(words); }
private:
uint32_t max_ = 0;
uint64_t *const words;
};
struct MemManager {
MemManager()
: base((Node *)mmapSafe(nullptr, kMapSize, PROT_NONE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)) {
if (kPageSize % sizeof(Node) != 0) {
fprintf(stderr, // GCOVR_EXCL_LINE
"kPageSize not a multiple of Node size\n"); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
if (kUpsizeBytes % kPageSize != 0) {
fprintf(stderr, // GCOVR_EXCL_LINE
"kUpsizeBytes not a multiple of kPageSize\n"); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
}
~MemManager() {
gc(nullptr, 0, 0);
munmapSafe(base, kMapSize);
}
Node *const base;
uint32_t allocate() {
if (freeList != 0) {
uint32_t result = freeList;
freeList = base[result].nextFree;
assert(base[result].entry == nullptr);
return result;
}
if (next == firstUnaddressable) {
mprotectSafe(base + firstUnaddressable, kUpsizeBytes,
PROT_READ | PROT_WRITE);
firstUnaddressable += kUpsizeNodes;
if (firstUnaddressable > kMapSize / sizeof(Node)) {
fprintf( // GCOVR_EXCL_LINE
stderr, // GCOVR_EXCL_LINE
"Out of memory: firstUnaddressable > kMapSize / " // GCOVR_EXCL_LINE
"sizeof(Node)\n"); // GCOVR_EXCL_LINE
abort(); // GCOVR_EXCL_LINE
}
}
return next++;
}
void gc(const uint32_t *roots, int numRoots, int64_t oldestVersion) {
// Calculate reachable set
BitSet reachable{next};
uint32_t stack[1000]; // Much more than bound imposed by max height of tree
int stackIndex = 0;
auto tryPush = [&](uint32_t p) {
if (!reachable.set(p)) {
assert(stackIndex < sizeof(stack) / sizeof(stack[0]));
stack[stackIndex++] = p;
}
};
for (int i = 0; i < numRoots; ++i) {
if (roots[i] == 0) {
continue;
}
tryPush(roots[i]);
while (stackIndex > 0) {
uint32_t p = stack[--stackIndex];
auto &node = base[p];
if (node.updated.load(std::memory_order_relaxed)) {
if (node.pointers[!node.replacePointer] != 0) {
tryPush(node.pointers[!node.replacePointer]);
}
if (oldestVersion < node.updateVersion) {
if (node.pointers[node.replacePointer] != 0) {
tryPush(node.pointers[node.replacePointer]);
}
}
tryPush(node.pointers[2]);
} else {
if (node.pointers[0] != 0) {
tryPush(node.pointers[0]);
}
if (node.pointers[1] != 0) {
tryPush(node.pointers[1]);
}
}
}
}
// Reclaim memory on the right side
uint32_t max = reachable.max();
if (max == 0) {
max = kMinAddressable - 1;
}
assert(max < next);
uint32_t newFirstUnaddressable = (max / kNodesPerPage + 1) * kNodesPerPage;
if (newFirstUnaddressable < firstUnaddressable) {
for (int i = newFirstUnaddressable; i < firstUnaddressable; ++i) {
if (base[i].entry != nullptr) {
base[i].entry->delref();
}
}
mprotectSafe(base + newFirstUnaddressable,
(firstUnaddressable - newFirstUnaddressable) * sizeof(Node),
PROT_NONE);
firstUnaddressable = newFirstUnaddressable;
}
next = max + 1;
// Rebuild free list and delref entries
freeList = 0;
reachable.iterateAbsentApproxBackwards(
[&](uint32_t i) {
if (base[i].entry != nullptr) {
base[i].entry->delref();
base[i].entry = nullptr;
}
base[i].nextFree = freeList;
freeList = i;
},
kMinAddressable, next);
}
private:
uint32_t next = kMinAddressable;
uint32_t firstUnaddressable = kMinAddressable;
uint32_t freeList = 0;
};
struct RootSet {
/// Register the root node for version after adding mutations
void add(uint32_t node, int64_t version) {
if (end == 0) {
nodes[end] = node;
versions[end] = version;
++end;
return;
}
if (nodes[end - 1] == node) {
return;
}
if (end == capacity) {
capacity *= 2;
nodes = (uint32_t *)realloc(nodes, capacity * sizeof(uint32_t));
versions = (int64_t *)realloc(versions, capacity * sizeof(int64_t));
}
nodes[end] = node;
versions[end] = version;
++end;
}
/// Inform that there will be no calls to rootForVersion with a version less
/// than `oldestVersion`
void setOldestVersion(int64_t oldestVersion) {
const uint32_t firstToKeep = rootForVersion(oldestVersion);
if (firstToKeep != 0) {
memmove(nodes, nodes + firstToKeep,
(end - firstToKeep) * sizeof(uint32_t));
memmove(versions, versions + firstToKeep,
(end - firstToKeep) * sizeof(int64_t));
end -= firstToKeep;
}
assert(end > 0);
assert(nodes[0] <= oldestVersion);
}
/// Get a root node that can correctly be used for `version`
uint32_t rootForVersion(int64_t version) const {
assert(end > 0);
assert(nodes[0] <= version);
// Find the last version <= oldestVersion
int left = 0;
int right = end;
int result = 0;
while (left <= right) {
int mid = left + (right - left) / 2;
if (versions[mid] <= version) {
result = mid;
left = mid + 1;
} else {
right = mid - 1;
}
}
assert(result < end);
return result;
}
const uint32_t *roots() const { return nodes; }
int rootCount() const { return end; }
RootSet() {
nodes = (uint32_t *)malloc(kMinCapacity * sizeof(uint32_t));
versions = (int64_t *)malloc(kMinCapacity * sizeof(int64_t));
capacity = kMinCapacity;
nodes[0] = 0;
versions[0] = 0;
end = 1;
}
~RootSet() {
free(versions);
free(nodes);
}
private:
uint32_t *nodes;
// versions[i] is the version of nodes[i]
int64_t *versions;
constexpr static uint32_t kMinCapacity = 16;
uint32_t capacity;
uint32_t end;
};
struct VersionedMap::Impl {
MemManager mm;
RootSet roots;
};
} // namespace weaselab
#ifdef ENABLE_MAIN
#include <nanobench.h>
int main() {
ankerl::nanobench::Bench bench;
bench.minEpochIterations(5000);
weaselab::MemManager mm;
bench.run("allocate", [&]() {
auto x = mm.allocate();
mm.base[x].pointers[0] = 0;
mm.base[x].pointers[1] = 0;
mm.base[x].updated.store(false, std::memory_order_relaxed);
});
mm.gc(nullptr, 0, 0);
for (int i = 0; i < 10000; ++i) {
auto x = mm.allocate();
mm.base[x].pointers[0] = 0;
mm.base[x].pointers[1] = 0;
mm.base[x].updated.store(false, std::memory_order_relaxed);
}
auto root = mm.allocate();
mm.base[root].entry = weaselab::Entry::make(0, nullptr, 0, nullptr, 0,
weaselab::VersionedMap::Set);
mm.base[root].pointers[0] = 0;
mm.base[root].pointers[1] = 0;
mm.base[root].updated.store(false, std::memory_order_relaxed);
bench.run("gc", [&]() { mm.gc(&root, 1, 0); });
{
int i = 0;
constexpr int kNumVersions = 1000;
weaselab::RootSet roots;
for (; i < kNumVersions; i += 2) {
roots.add(i, i);
roots.add(i, i + 1);
}
bench.run("roots - setOldestVersion", [&]() {
roots.add(i, i);
roots.setOldestVersion(i - kNumVersions);
++i;
});
bench.run("roots - rootForVersion", [&]() {
bench.doNotOptimizeAway(roots.rootForVersion(i - kNumVersions / 2));
});
}
}
#endif