Scan and remove old entries in addMutations

This commit is contained in:
2024-05-22 16:04:01 -07:00
parent 82c8f95dbe
commit 262c9cd10c
2 changed files with 77 additions and 26 deletions

View File

@@ -703,21 +703,10 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
}
}
// Removes `finger` from the tree, and leaves `finger` pointing to the next
// entry.
// Removes `finger` from the tree, and leaves `finger` pointing to insertion
// point of its former entry.
void remove(Finger &finger) {
#ifndef NDEBUG
Entry *expected;
{
Finger copy;
finger.copyTo(copy);
move<std::memory_order_relaxed>(copy, latestVersion, true);
expected =
copy.searchPathSize() > 0 ? mm.base[copy.backNode()].entry : nullptr;
}
#endif
// True if finger is pointing to an entry > than the entry we're removing
// after we rotate it down
@@ -762,17 +751,6 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
node = parent;
}
finger.setSearchPathSizeUnsafe(oldSize);
// finger now points to the insertion point of the node we're removing
move<std::memory_order_relaxed>(finger, latestVersion, true);
#ifndef NDEBUG
if (finger.searchPathSize() > 0) {
assert(mm.base[finger.backNode()].entry == expected);
} else {
assert(expected == nullptr);
}
#endif
}
uint32_t newNode(int64_t version, int64_t rangeVersion, const uint8_t *key,
@@ -803,13 +781,78 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
void printInOrderHelper(int64_t version, uint32_t node, int depth);
void scanAndRemoveOldEntries(int fuel) {
// Get a finger to the first entry > continueKey, or the last entry if
// continueKey is the empty string
if (latestRoot == 0) {
// Tree is empty
return;
}
Finger finger;
if (continueKey.len == 0) {
// Set finger to last entry in tree
bool ignored;
finger.push(latestRoot, ignored);
uint32_t c;
while ((c = child<std::memory_order_relaxed>(finger.backNode(), true,
latestVersion)) != 0) {
finger.push(c, true);
}
} else {
search<std::memory_order_relaxed>(continueKey, latestRoot, latestVersion,
finger);
move<std::memory_order_relaxed>(finger, latestVersion, true);
if (finger.searchPathSize() == 0) {
continueKey = {nullptr, 0};
return;
}
}
assert(finger.backNode() != 0);
int64_t rangeVersion = mm.base[finger.backNode()].entry->rangeVersion;
move<std::memory_order_relaxed>(finger, latestVersion, false);
if (finger.searchPathSize() == 0) {
continueKey = {nullptr, 0};
return;
}
// Phew. Ok now we have a finger to the next entry to consider removing, and
// the range version terminated at this entry.
for (; fuel > 0; --fuel) {
const auto &n = mm.base[finger.backNode()];
if (rangeVersion < 0 && std::max(n.entry->pointVersion,
n.entry->rangeVersion) < oldestVersion) {
remove(finger);
if (latestRoot == 0) {
return;
}
} else {
rangeVersion = n.entry->rangeVersion;
}
move<std::memory_order_relaxed>(finger, latestVersion, false);
if (finger.searchPathSize() == 0) {
continueKey = {nullptr, 0};
return;
}
}
continueArena = Arena();
const auto &n = mm.base[finger.backNode()];
uint8_t *data = new (continueArena) uint8_t[n.entry->keyLen];
memcpy(data, n.entry->getKey(), n.entry->keyLen);
continueKey = {data, n.entry->keyLen};
}
void addMutations(const Mutation *mutations, int numMutations,
int64_t version) {
mallocBytesDelta = 0;
// TODO scan to remove mutations older than oldestVersion
assert(latestVersion < version);
latestVersion = version;
latestRoot = roots.roots()[roots.rootCount() - 1];
// TODO tune?
scanAndRemoveOldEntries(2 * numMutations + 10);
// TODO Improve ILP?
for (int i = 0; i < numMutations; ++i) {
const auto &m = mutations[i];
@@ -827,6 +870,7 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
while (iter.searchPathSize() > 0 &&
mm.base[iter.backNode()] < Key{m.param2, m.param2Len}) {
remove(iter);
move<std::memory_order_relaxed>(iter, latestVersion, true);
}
insert({m.param2, m.param2Len}, {});
}
@@ -843,6 +887,10 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
void firstGeq(const Key *key, const int64_t *version, Iterator *iterator,
int count) const;
// State used to resume scanning and removing old entries in `addMutations`
Key continueKey;
Arena continueArena;
MemManager mm;
RootSet roots;
// Only meaningful within the callstack of `addMutations`