Files
versioned-map/FdbVersionedMap.cpp
2024-05-30 18:02:05 -07:00

1450 lines
43 KiB
C++

#include "Internal.h"
#include "KeyCompare.h"
#include "VersionedMap.h"
#include <map>
#include <math.h>
#include <vector>
// FDB implementation adapted to compile outside of FDB
using Version = int64_t;
using KeyRef = weaselab::VersionedMap::Key;
using ValueRef = weaselab::VersionedMap::Key;
using StringRef = weaselab::VersionedMap::Key;
/*
* FastRef.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <cstdint>
#include <deque>
template <class Subclass> class ThreadUnsafeReferenceCounted {
public:
ThreadUnsafeReferenceCounted() : referenceCount(1) {}
// NO virtual destructor! Subclass should have a virtual destructor if it is
// not sealed.
void addref() const { ++referenceCount; }
void delref() const {
if (delref_no_destroy()) {
((Subclass *)this)->~Subclass();
safe_free((void *)this, sizeof(Subclass));
}
}
bool delref_no_destroy() const { return !--referenceCount; }
int32_t debugGetReferenceCount() const {
return referenceCount;
} // Never use in production code, only for tracing
bool isSoleOwner() const { return referenceCount == 1; }
private:
ThreadUnsafeReferenceCounted(
const ThreadUnsafeReferenceCounted &) /* = delete*/;
void operator=(const ThreadUnsafeReferenceCounted &) /* = delete*/;
mutable int32_t referenceCount;
};
#define ReferenceCounted ThreadUnsafeReferenceCounted
template <class P> void addref(P *ptr) { ptr->addref(); }
template <class P> void delref(P *ptr) { ptr->delref(); }
template <class P> class Reference {
public:
Reference() : ptr(nullptr) {}
explicit Reference(P *ptr) : ptr(ptr) {}
static Reference<P> addRef(P *ptr) {
ptr->addref();
return Reference(ptr);
}
Reference(const Reference &r) : ptr(r.getPtr()) {
if (ptr)
addref(ptr);
}
Reference(Reference &&r) noexcept : ptr(r.getPtr()) { r.ptr = nullptr; }
template <class Q> Reference(const Reference<Q> &r) : ptr(r.getPtr()) {
if (ptr)
addref(ptr);
}
template <class Q> Reference(Reference<Q> &&r) : ptr(r.getPtr()) {
r.setPtrUnsafe(nullptr);
}
~Reference() {
if (ptr)
delref(ptr);
}
Reference &operator=(const Reference &r) {
P *oldPtr = ptr;
P *newPtr = r.ptr;
if (oldPtr != newPtr) {
if (newPtr)
addref(newPtr);
ptr = newPtr;
if (oldPtr)
delref(oldPtr);
}
return *this;
}
Reference &operator=(Reference &&r) noexcept {
P *oldPtr = ptr;
P *newPtr = r.ptr;
if (oldPtr != newPtr) {
r.ptr = nullptr;
ptr = newPtr;
if (oldPtr)
delref(oldPtr);
}
return *this;
}
void clear() {
P *oldPtr = ptr;
if (oldPtr) {
ptr = nullptr;
delref(oldPtr);
}
}
P *operator->() const { return ptr; }
P &operator*() const { return *ptr; }
P *getPtr() const { return ptr; }
void setPtrUnsafe(P *p) { ptr = p; }
P *extractPtr() {
auto *p = ptr;
ptr = nullptr;
return p;
}
template <class T> Reference<T> castTo() {
return Reference<T>::addRef((T *)ptr);
}
bool isValid() const { return ptr != nullptr; }
explicit operator bool() const { return ptr != nullptr; }
private:
P *ptr;
};
template <class P, class... Args> Reference<P> makeReference(Args &&...args) {
return Reference<P>(new (safe_malloc(sizeof(P)))
P(std::forward<Args>(args)...));
}
template <class P>
bool operator==(const Reference<P> &lhs, const Reference<P> &rhs) {
return lhs.getPtr() == rhs.getPtr();
}
template <class P>
bool operator!=(const Reference<P> &lhs, const Reference<P> &rhs) {
return !(lhs == rhs);
}
/*
* IndexedSet.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
template <class Key, class Value> class MapPair {
public:
Key key;
Value value;
template <class Key_, class Value_>
MapPair(Key_ &&key, Value_ &&value)
: key(std::forward<Key_>(key)), value(std::forward<Value_>(value)) {}
void operator=(MapPair const &rhs) {
key = rhs.key;
value = rhs.value;
}
MapPair(MapPair const &rhs) : key(rhs.key), value(rhs.value) {}
MapPair(MapPair &&r) noexcept
: key(std::move(r.key)), value(std::move(r.value)) {}
void operator=(MapPair &&r) noexcept {
key = std::move(r.key);
value = std::move(r.value);
}
auto operator<=>(MapPair<Key, Value> const &r) const { return key <=> r.key; }
auto operator<=>(Key const &r) const { return key <=> r; }
};
/*
* VersionedMap.h
*
* This source file is part of the FoundationDB open source project
*
* Copyright 2013-2022 Apple Inc. and the FoundationDB project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// PTree is a persistent balanced binary tree implementation. It is based on a
// treap as a way to guarantee O(1) space for node insertion (rotating is
// asymptotically cheap), but the constant factors are very large.
//
// Each node has three pointers - the first two are its left and right children,
// respectively, and the third can be set to point to a newer version of the
// node. This third pointer allows us to maintain persistence without full path
// copying, and is employed to achieve O(1) space node insertion.
//
// PTree also supports efficient finger searches.
namespace PTreeImpl {
#ifdef _MSC_VER
#pragma warning(disable : 4800)
#endif
template <class T> struct PTree : public ReferenceCounted<PTree<T>> {
uint32_t priority;
Reference<PTree> pointer[3];
Version lastUpdateVersion;
bool updated;
bool replacedPointer;
T data;
const Reference<PTree> &child(bool which, Version at) const {
if (updated && lastUpdateVersion <= at && which == replacedPointer)
return pointer[2];
else
return pointer[which];
}
const Reference<PTree> &left(Version at) const { return child(false, at); }
const Reference<PTree> &right(Version at) const { return child(true, at); }
PTree(const T &data, Version ver)
: lastUpdateVersion(ver), updated(false), data(data) {
priority = gRandom.next();
}
PTree(uint32_t pri, T const &data, Reference<PTree> const &left,
Reference<PTree> const &right, Version ver)
: priority(pri), lastUpdateVersion(ver), updated(false), data(data) {
pointer[0] = left;
pointer[1] = right;
}
private:
PTree(PTree const &);
};
template <class T> class PTreeFinger {
using PTreeFingerEntry = PTree<T> const *;
// This finger size supports trees with up to exp(96/4.3) ~= 4,964,514,749
// entries. The number 4.3 comes from here:
// https://en.wikipedia.org/wiki/Random_binary_tree#The_longest_path see also:
// check().
static constexpr size_t N = 96;
PTreeFingerEntry entries_[N];
size_t size_ = 0;
size_t bound_sz_ = 0;
public:
PTreeFinger() {}
// Explicit copy constructors ensure we copy the live values in entries_.
PTreeFinger(PTreeFinger const &f) { *this = f; }
PTreeFinger(PTreeFinger &&f) { *this = f; }
PTreeFinger &operator=(PTreeFinger const &f) {
size_ = f.size_;
bound_sz_ = f.bound_sz_;
std::copy(f.entries_, f.entries_ + size_, entries_);
return *this;
}
PTreeFinger &operator=(PTreeFinger &&f) {
size_ = std::exchange(f.size_, 0);
bound_sz_ = f.bound_sz_;
std::copy(f.entries_, f.entries_ + size_, entries_);
return *this;
}
size_t size() const { return size_; }
PTree<T> const *back() const { return entries_[size_ - 1]; }
void pop_back() { size_--; }
void clear() { size_ = 0; }
PTree<T> const *operator[](size_t i) const { return entries_[i]; }
void resize(size_t sz) {
size_ = sz;
assert(size_ < N);
}
void push_back(PTree<T> const *node) {
entries_[size_++] = {node};
assert(size_ < N);
}
void push_for_bound(PTree<T> const *node, bool less) {
push_back(node);
bound_sz_ = less ? size_ : bound_sz_;
}
// remove the end of the finger so that the last entry is less than the probe
void trim_to_bound() { size_ = bound_sz_; }
};
template <class T>
static Reference<PTree<T>> update(Reference<PTree<T>> const &node, bool which,
Reference<PTree<T>> const &ptr, Version at) {
if (ptr.getPtr() ==
node->child(which, at).getPtr() /* && node->replacedVersion <= at*/) {
return node;
}
if (node->lastUpdateVersion == at) {
//&& (!node->updated || node->replacedPointer==which)) {
if (node->updated && node->replacedPointer != which) {
// We are going to have to copy this node, but its aux pointer will never
// be used again and should drop its reference count
Reference<PTree<T>> r;
if (which)
r = makeReference<PTree<T>>(node->priority, node->data,
node->child(0, at), ptr, at);
else
r = makeReference<PTree<T>>(node->priority, node->data, ptr,
node->child(1, at), at);
node->pointer[2].clear();
return r;
} else {
if (node->updated)
node->pointer[2] = ptr;
else
node->pointer[which] = ptr;
return node;
}
}
if (node->updated) {
if (which)
return makeReference<PTree<T>>(node->priority, node->data,
node->child(0, at), ptr, at);
else
return makeReference<PTree<T>>(node->priority, node->data, ptr,
node->child(1, at), at);
} else {
node->lastUpdateVersion = at;
node->replacedPointer = which;
node->pointer[2] = ptr;
node->updated = true;
return node;
}
}
template <class T, class X>
bool contains(const Reference<PTree<T>> &p, Version at, const X &x) {
if (!p)
return false;
auto cmp = x <=> p->data;
bool less = cmp < 0;
if (cmp == 0)
return true;
return contains(p->child(!less, at), at, x);
}
// TODO: Remove the number of invocations of operator<, and replace with
// something closer to memcmp. and same for upper_bound.
template <class T, class X>
void lower_bound(const Reference<PTree<T>> &p, Version at, const X &x,
PTreeFinger<T> &f) {
if (!p) {
f.trim_to_bound();
return;
}
auto cmp = x <=> p->data;
bool less = cmp < 0;
f.push_for_bound(p.getPtr(), less);
if (cmp == 0)
return;
lower_bound(p->child(!less, at), at, x, f);
}
template <class T, class X>
void upper_bound(const Reference<PTree<T>> &p, Version at, const X &x,
PTreeFinger<T> &f) {
if (!p) {
f.trim_to_bound();
return;
}
bool less = x < p->data;
f.push_for_bound(p.getPtr(), less);
upper_bound(p->child(!less, at), at, x, f);
}
template <class T, bool forward> void move(Version at, PTreeFinger<T> &f) {
assert(f.size());
const PTree<T> *n;
n = f.back();
if (n->child(forward, at)) {
n = n->child(forward, at).getPtr();
do {
f.push_back(n);
n = n->child(!forward, at).getPtr();
} while (n);
} else {
do {
n = f.back();
f.pop_back();
} while (f.size() && f.back()->child(forward, at).getPtr() == n);
}
}
template <class T, bool forward> int halfMove(Version at, PTreeFinger<T> &f) {
// Post: f[:return_value] is the finger that would have been returned by
// move<forward>(at,f), and f[:original_length_of_f] is unmodified
assert(f.size());
const PTree<T> *n;
n = f.back();
if (n->child(forward, at)) {
n = n->child(forward, at).getPtr();
do {
f.push_back(n);
n = n->child(!forward, at).getPtr();
} while (n);
return f.size();
} else {
int s = f.size();
do {
n = f[s - 1];
--s;
} while (s && f[s - 1]->child(forward, at).getPtr() == n);
return s;
}
}
template <class T> void next(Version at, PTreeFinger<T> &f) {
move<T, true>(at, f);
}
template <class T> void previous(Version at, PTreeFinger<T> &f) {
move<T, false>(at, f);
}
template <class T> int halfNext(Version at, PTreeFinger<T> &f) {
return halfMove<T, true>(at, f);
}
template <class T> int halfPrevious(Version at, PTreeFinger<T> &f) {
return halfMove<T, false>(at, f);
}
template <class T> T get(PTreeFinger<T> &f) {
assert(f.size());
return f.back()->data;
}
// Modifies p to point to a PTree with x inserted
template <class T> void insert(Reference<PTree<T>> &p, Version at, const T &x) {
if (!p) {
p = makeReference<PTree<T>>(x, at);
} else {
auto c = x <=> p->data;
if (c == 0) {
p = makeReference<PTree<T>>(p->priority, x, p->left(at), p->right(at),
at);
} else {
const bool direction = !(c < 0);
Reference<PTree<T>> child = p->child(direction, at);
insert(child, at, x);
p = update(p, direction, child, at);
if (p->child(direction, at)->priority > p->priority)
rotate(p, at, !direction);
}
}
}
template <class T>
Reference<PTree<T>> firstNode(const Reference<PTree<T>> &p, Version at) {
if (!p)
assert(false);
if (!p->left(at))
return p;
return firstNode(p->left(at), at);
}
template <class T>
Reference<PTree<T>> lastNode(const Reference<PTree<T>> &p, Version at) {
if (!p)
assert(false);
if (!p->right(at))
return p;
return lastNode(p->right(at), at);
}
template <class T, bool last>
void firstOrLastFinger(const Reference<PTree<T>> &p, Version at,
PTreeFinger<T> &f) {
if (!p)
return;
f.push_back(p.getPtr());
firstOrLastFinger<T, last>(p->child(last, at), at, f);
}
template <class T>
void first(const Reference<PTree<T>> &p, Version at, PTreeFinger<T> &f) {
return firstOrLastFinger<T, false>(p, at, f);
}
template <class T>
void last(const Reference<PTree<T>> &p, Version at, PTreeFinger<T> &f) {
return firstOrLastFinger<T, true>(p, at, f);
}
// modifies p to point to a PTree with the root of p removed
template <class T> void removeRoot(Reference<PTree<T>> &p, Version at) {
if (!p->right(at))
p = p->left(at);
else if (!p->left(at))
p = p->right(at);
else {
bool direction = p->right(at)->priority < p->left(at)->priority;
rotate(p, at, direction);
Reference<PTree<T>> child = p->child(direction, at);
removeRoot(child, at);
p = update(p, direction, child, at);
}
}
// changes p to point to a PTree with finger removed. p must be the root of the
// tree associated with finger.
//
// Invalidates finger.
template <class T>
void removeFinger(Reference<PTree<T>> &p, Version at, PTreeFinger<T> finger) {
assert(finger.size() > 0);
// Start at the end of the finger, remove, and propagate copies up along the
// search path (finger) as needed.
auto node =
Reference<PTree<T>>::addRef(const_cast<PTree<T> *>(finger.back()));
auto *before = node.getPtr();
removeRoot(node, at);
for (;;) {
if (before == node.getPtr()) {
// Done propagating copies
return;
}
if (finger.size() == 1) {
// Check we passed the correct root for this finger
assert(p.getPtr() == before);
// Propagate copy to root
p = node;
return;
}
finger.pop_back();
auto parent =
Reference<PTree<T>>::addRef(const_cast<PTree<T> *>(finger.back()));
bool isLeftChild = parent->left(at).getPtr() == before;
bool isRightChild = parent->right(at).getPtr() == before;
assert(isLeftChild || isRightChild); // Corrupt finger?
// Prepare for next iteration
before = parent.getPtr();
node = update(parent, isRightChild, node, at);
}
}
// changes p to point to a PTree with x removed
template <class T, class X>
void remove(Reference<PTree<T>> &p, Version at, const X &x) {
if (!p)
assert(false); // attempt to remove item not present in PTree
auto cmp = x <=> p->data;
if (cmp < 0) {
Reference<PTree<T>> child = p->child(0, at);
remove(child, at, x);
p = update(p, 0, child, at);
} else if (cmp > 0) {
Reference<PTree<T>> child = p->child(1, at);
remove(child, at, x);
p = update(p, 1, child, at);
} else {
removeRoot(p, at);
}
}
template <class T, class X>
void remove(Reference<PTree<T>> &p, Version at, const X &begin, const X &end) {
if (!p)
return;
int beginDir, endDir;
auto beginCmp = begin <=> p->data;
if (beginCmp < 0)
beginDir = -1;
else if (beginCmp > 0)
beginDir = +1;
else
beginDir = 0;
if (!(p->data < end))
endDir = -1;
else
endDir = +1;
if (beginDir == endDir) {
Reference<PTree<T>> child = p->child(beginDir == +1, at);
remove(child, at, begin, end);
p = update(p, beginDir == +1, child, at);
} else {
if (beginDir == -1) {
Reference<PTree<T>> left = p->child(0, at);
removeBeyond(left, at, begin, 1);
p = update(p, 0, left, at);
}
if (endDir == +1) {
Reference<PTree<T>> right = p->child(1, at);
removeBeyond(right, at, end, 0);
p = update(p, 1, right, at);
}
if (beginDir < endDir)
removeRoot(p, at);
}
}
template <class T, class X>
void removeBeyond(Reference<PTree<T>> &p, Version at, const X &pivot,
bool dir) {
if (!p)
return;
if ((p->data < pivot) ^ dir) {
p = p->child(!dir, at);
removeBeyond(p, at, pivot, dir);
} else {
Reference<PTree<T>> child = p->child(dir, at);
removeBeyond(child, at, pivot, dir);
p = update(p, dir, child, at);
}
}
/*template<class T, class X>
void remove(Reference<PTree<T>>& p, Version at, const X& begin, const X& end) {
Reference<PTree<T>> left, center, right;
split(p, begin, left, center, at);
split(center, end, center, right, at);
p = append(left, right, at);
}*/
// inputs a PTree with the root node potentially violating the heap property
// modifies p to point to a valid PTree
template <class T> void demoteRoot(Reference<PTree<T>> &p, Version at) {
if (!p)
assert(false);
uint32_t priority[2];
for (int i = 0; i < 2; i++)
if (p->child(i, at))
priority[i] = p->child(i, at)->priority;
else
priority[i] = 0;
bool higherDirection = priority[1] > priority[0];
if (priority[higherDirection] < p->priority)
return;
// else, child(higherDirection) is a greater priority than us and the other
// child...
rotate(p, at, !higherDirection);
Reference<PTree<T>> child = p->child(!higherDirection, at);
demoteRoot(child, at);
p = update(p, !higherDirection, child, at);
}
template <class T>
Reference<PTree<T>> append(const Reference<PTree<T>> &left,
const Reference<PTree<T>> &right, Version at) {
if (!left)
return right;
if (!right)
return left;
Reference<PTree<T>> r = makeReference<PTree<T>>(lastNode(left, at)->data, at);
Reference<PTree<T>> a = left;
remove(a, at, r->data);
r->pointer[0] = a;
r->pointer[1] = right;
demoteRoot(r, at);
return r;
}
template <class T, class X>
void split(Reference<PTree<T>> p, const X &x, Reference<PTree<T>> &left,
Reference<PTree<T>> &right, Version at) {
if (!p) {
left = Reference<PTree<T>>();
right = Reference<PTree<T>>();
return;
}
if (p->data < x) {
left = p;
Reference<PTree<T>> lr = left->right(at);
split(lr, x, lr, right, at);
left = update(left, 1, lr, at);
} else {
right = p;
Reference<PTree<T>> rl = right->left(at);
split(rl, x, left, rl, at);
right = update(right, 0, rl, at);
}
}
template <class T> void rotate(Reference<PTree<T>> &n, Version at, bool right) {
auto l = n->child(!right, at);
n = update(l, right, update(n, !right, l->child(right, at), at), at);
// Diagram for right = true
// n l
// / \
// l -> n
// \ /
// x x
}
template <class T>
void printTree(const Reference<PTree<T>> &p, Version at, int depth = 0) {
if (p->left(at))
printTree(p->left(at), at, depth + 1);
for (int i = 0; i < depth; i++)
printf(" ");
// printf(":%s\n", describe(p->data.value.first).c_str());
printf(":%s\n", describe(p->data.key).c_str());
if (p->right(at))
printTree(p->right(at), at, depth + 1);
}
template <class T>
void printTreeDetails(const Reference<PTree<T>> &p, int depth = 0) {
// printf("Node %p (depth %d): %s\n", p.getPtr(), depth,
// describe(p->data.value.first).c_str());
printf("Node %p (depth %d): %s\n", p.getPtr(), depth,
describe(p->data.key).c_str());
printf(" Left: %p\n", p->pointer[0].getPtr());
printf(" Right: %p\n", p->pointer[1].getPtr());
// if (p->pointer[2])
if (p->updated)
printf(" Version %lld %s: %p\n", p->lastUpdateVersion,
p->replacedPointer ? "Right" : "Left", p->pointer[2].getPtr());
for (int i = 0; i < 3; i++)
if (p->pointer[i])
printTreeDetails(p->pointer[i], depth + 1);
}
/*static int depth(const Reference<PTree<int>>& p, Version at) {
if (!p) return 0;
int d1 = depth(p->left(at), at) + 1;
int d2 = depth(p->right(at), at) + 1;
return d1 > d2 ? d1 : d2;
}*/
template <class T>
void validate(const Reference<PTree<T>> &p, Version at, T *min, T *max,
int &count, int &height, int depth = 0) {
if (!p) {
height = 0;
return;
}
assert((!min || *min <= p->data) && (!max || p->data <= *max));
for (int i = 0; i < 2; i++) {
if (p->child(i, at))
assert(p->child(i, at)->priority <= p->priority);
}
++count;
int h1, h2;
validate(p->left(at), at, min, &p->data, count, h1, depth + 1);
validate(p->right(at), at, &p->data, max, count, h2, depth + 1);
height = std::max(h1, h2) + 1;
}
template <class T> void check(const Reference<PTree<T>> &p) {
int count = 0, height;
validate(p, (T *)0, (T *)0, count, height);
if (count && height > 4.3 * log(double(count))) {
// printf("height %d; count %d\n", height, count);
assert(false);
}
}
// Remove pointers to any child nodes that have been updated at or before the
// given version This essentially gets rid of node versions that will never be
// read (beyond 5s worth of versions)
// TODO look into making this per-version compaction. (We could keep track of
// updated nodes at each version for example)
template <class T>
void compact(Reference<PTree<T>> &p, Version newOldestVersion) {
if (!p) {
return;
}
if (p->updated && p->lastUpdateVersion <= newOldestVersion) {
/* If the node has been updated, figure out which pointer was replaced. And
replace that pointer with the updated pointer. Then we can get rid of the
updated child pointer and then make room in the node for future updates
*/
auto which = p->replacedPointer;
p->pointer[which] = p->pointer[2];
p->updated = false;
p->pointer[2] = Reference<PTree<T>>();
// p->pointer[which] = Reference<PTree<T>>();
}
Reference<PTree<T>> left = p->left(newOldestVersion);
Reference<PTree<T>> right = p->right(newOldestVersion);
compact(left, newOldestVersion);
compact(right, newOldestVersion);
}
} // namespace PTreeImpl
class ValueOrClearToRef {
public:
static ValueOrClearToRef value(ValueRef const &v) {
return ValueOrClearToRef(v, false);
}
static ValueOrClearToRef clearTo(KeyRef const &k) {
return ValueOrClearToRef(k, true);
}
bool isValue() const { return !isClear; };
bool isClearTo() const { return isClear; }
ValueRef const &getValue() const {
assert(isValue());
return item;
};
KeyRef const &getEndKey() const {
assert(isClearTo());
return item;
};
private:
ValueOrClearToRef(StringRef item, bool isClear)
: item(item), isClear(isClear) {}
StringRef item;
bool isClear;
};
// VersionedMap provides an interface to a partially persistent tree, allowing
// you to read the values at a particular version, create new versions, modify
// the current version of the tree, and forget versions prior to a specific
// version.
template <class K, class T> class VersionedMap {
// private:
public:
typedef PTreeImpl::PTree<MapPair<K, std::pair<T, Version>>> PTreeT;
typedef PTreeImpl::PTreeFinger<MapPair<K, std::pair<T, Version>>>
PTreeFingerT;
typedef Reference<PTreeT> Tree;
Version oldestVersion, latestVersion;
// This deque keeps track of PTree root nodes at various versions. Since the
// versions increase monotonically, the deque is implicitly sorted and hence
// binary-searchable.
std::deque<std::pair<Version, Tree>> roots;
struct rootsComparator {
bool operator()(const std::pair<Version, Tree> &value, const Version &key) {
return (value.first < key);
}
bool operator()(const Version &key, const std::pair<Version, Tree> &value) {
return (key < value.first);
}
};
Tree const &getRoot(Version v) const {
auto r = upper_bound(roots.begin(), roots.end(), v, rootsComparator());
--r;
return r->second;
}
struct iterator;
VersionedMap() : oldestVersion(0), latestVersion(0) {
roots.emplace_back(0, Tree());
}
VersionedMap(VersionedMap &&v) noexcept
: oldestVersion(v.oldestVersion), latestVersion(v.latestVersion),
roots(std::move(v.roots)) {}
void operator=(VersionedMap &&v) noexcept {
oldestVersion = v.oldestVersion;
latestVersion = v.latestVersion;
roots = std::move(v.roots);
}
Version getLatestVersion() const { return latestVersion; }
Version getOldestVersion() const { return oldestVersion; }
// front element should be the oldest version in the deque, hence the next
// oldest should be at index 1
Version getNextOldestVersion() const { return roots[1]->first; }
void forgetVersionsBefore(Version newOldestVersion) {
assert(newOldestVersion <= latestVersion);
auto r = upper_bound(roots.begin(), roots.end(), newOldestVersion,
rootsComparator());
auto upper = r;
--r;
// if the specified newOldestVersion does not exist, insert a new
// entry-pair with newOldestVersion and the root from next lower version
if (r->first != newOldestVersion) {
r = roots.emplace(upper, newOldestVersion, getRoot(newOldestVersion));
}
assert(r->first == newOldestVersion);
roots.erase(roots.begin(), r);
oldestVersion = newOldestVersion;
}
public:
void createNewVersion(
Version version) { // following sets and erases are into the given
// version, which may now be passed to at(). Must be
// called in monotonically increasing order.
if (version > latestVersion) {
latestVersion = version;
Tree r = getRoot(version);
roots.emplace_back(version, r);
} else
assert(version == latestVersion);
}
// insert() and erase() invalidate atLatest() and all iterators into it
void insert(const K &k, const T &t) { insert(k, t, latestVersion); }
void insert(const K &k, const T &t, Version insertAt) {
PTreeImpl::insert(
roots.back().second, latestVersion,
MapPair<K, std::pair<T, Version>>(k, std::make_pair(t, insertAt)));
}
void erase(const K &begin, const K &end) {
PTreeImpl::remove(roots.back().second, latestVersion, begin, end);
}
void erase(const K &key) { // key must be present
PTreeImpl::remove(roots.back().second, latestVersion, key);
}
void erase(iterator const &item) { // iterator must be in latest version!
assert(item.at == latestVersion);
PTreeImpl::removeFinger(roots.back().second, latestVersion, item.finger);
}
void printDetail() { PTreeImpl::printTreeDetails(roots.back().second, 0); }
void printTree(Version at) {
PTreeImpl::printTree(roots.back().second, at, 0);
}
void compact(Version newOldestVersion) {
assert(newOldestVersion <= latestVersion);
// auto newBegin = roots.lower_bound(newOldestVersion);
auto newBegin = lower_bound(roots.begin(), roots.end(), newOldestVersion,
rootsComparator());
for (auto root = roots.begin(); root != newBegin; ++root) {
if (root->second)
PTreeImpl::compact(root->second, newOldestVersion);
}
// printf("\nPrinting the tree at latest version after compaction.\n");
// PTreeImpl::printTreeDetails(roots.back().second(), 0);
}
// for(auto i = vm.at(version).lower_bound(range.begin); i < range.end; ++i)
struct iterator {
iterator() = default;
explicit iterator(Tree const &root, Version at) : root(root), at(at) {}
K const &key() const { return finger.back()->data.key; }
Version insertVersion() const {
return finger.back()->data.value.second;
} // Returns the version at which the current item was inserted
operator bool() const { return finger.size() != 0; }
bool operator<(const K &key) const { return this->key() < key; }
T const &operator*() { return finger.back()->data.value.first; }
T const *operator->() { return &finger.back()->data.value.first; }
void operator++() {
if (finger.size())
PTreeImpl::next(at, finger);
else
PTreeImpl::first(root, at, finger);
}
void operator--() {
if (finger.size())
PTreeImpl::previous(at, finger);
else
PTreeImpl::last(root, at, finger);
}
bool operator==(const iterator &r) const {
if (finger.size() && r.finger.size())
return finger.back() == r.finger.back();
else
return finger.size() == r.finger.size();
}
bool operator!=(const iterator &r) const {
if (finger.size() && r.finger.size())
return finger.back() != r.finger.back();
else
return finger.size() != r.finger.size();
}
private:
friend class VersionedMap<K, T>;
Tree root;
Version at;
PTreeFingerT finger;
};
class ViewAtVersion {
public:
ViewAtVersion(Tree const &root, Version at) : root(root), at(at) {}
iterator begin() const {
iterator i(root, at);
PTreeImpl::first(root, at, i.finger);
return i;
}
iterator end() const { return iterator(root, at); }
// Returns x such that key==*x, or end()
template <class X> iterator find(const X &key) const {
iterator i(root, at);
PTreeImpl::lower_bound(root, at, key, i.finger);
if (i && i.key() == key)
return i;
else
return end();
}
// Returns the smallest x such that *x>=key, or end()
template <class X> iterator lower_bound(const X &key) const {
iterator i(root, at);
PTreeImpl::lower_bound(root, at, key, i.finger);
return i;
}
// Returns the smallest x such that *x>key, or end()
template <class X> iterator upper_bound(const X &key) const {
iterator i(root, at);
PTreeImpl::upper_bound(root, at, key, i.finger);
return i;
}
// Returns the largest x such that *x<=key, or end()
template <class X> iterator lastLessOrEqual(const X &key) const {
iterator i(root, at);
PTreeImpl::upper_bound(root, at, key, i.finger);
--i;
return i;
}
// Returns the largest x such that *x<key, or end()
template <class X> iterator lastLess(const X &key) const {
iterator i(root, at);
PTreeImpl::lower_bound(root, at, key, i.finger);
--i;
return i;
}
private:
Tree root;
Version at;
};
ViewAtVersion at(Version v) const {
if (v < 0) {
return atLatest();
}
return ViewAtVersion(getRoot(v), v);
}
ViewAtVersion atLatest() const {
return ViewAtVersion(roots.back().second, latestVersion);
}
bool isClearContaining(ViewAtVersion const &view, KeyRef key) {
auto i = view.lastLessOrEqual(key);
return i && i->isClearTo() && i->getEndKey() > key;
}
// TODO: getHistory?
};
KeyRef copy(Arena &arena, KeyRef k) {
auto *data = new (arena) uint8_t[k.len];
memcpy(data, k.p, k.len);
return {data, k.len};
}
inline KeyRef keyAfter(const KeyRef &key, Arena &arena) {
uint8_t *t = new (arena) uint8_t[key.len + 1];
memcpy(t, key.p, key.len);
t[key.len] = 0;
return KeyRef{t, key.len + 1};
}
namespace weaselab {
struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
using VersionedData =
::VersionedMap<weaselab::VersionedMap::Key, ValueOrClearToRef>;
struct StandaloneVerUpdateRef {
Arena arena;
Version version;
std::vector<Mutation> mutations;
};
VersionedData versionedData;
std::map<Version, StandaloneVerUpdateRef>
mutationLog; // versions (durableVersion, version]
std::map<Version, std::vector<Arena>>
freeable; // for each version, arena's that must be held until that
// version is < oldestVersion
StandaloneVerUpdateRef &addVersionToMutationLog(Version v) {
// return existing version...
auto m = mutationLog.find(v);
if (m != mutationLog.end())
return m->second;
// ...or create a new one
auto &u = mutationLog[v];
u.version = v;
return u;
}
void expandClear(Key &begin, Key &end, VersionedData const &data,
Arena &arena) {
// Expand the clear
const auto &d = data.atLatest();
// If another clear overlaps the beginning of this one, engulf it
auto i = d.lastLess(begin);
if (i && i->isClearTo() && i->getEndKey() >= begin)
begin = copy(arena, i.key());
// If another clear overlaps the end of this one, engulf it; otherwise
// expand
i = d.lastLessOrEqual(end);
if (i && i->isClearTo() && i->getEndKey() >= end) {
end = copy(arena, i->getEndKey());
}
}
void addMutations(const Mutation *mutations, int numMutations,
int64_t version) {
versionedData.createNewVersion(version);
auto &verUpdateRef = addVersionToMutationLog(version);
auto &arena = verUpdateRef.arena;
for (int i = 0; i < numMutations; ++i) {
const auto &m = mutations[i];
// Trailing zero byte as required by VersionedMutation
auto param1 = keyAfter(Key{m.param1, m.param1Len}, arena);
--param1.len;
auto param2 = m.type == weaselab::VersionedMap::Set || m.param2Len > 0
? copy(arena, Key{m.param2, m.param2Len})
: keyAfter(param1, arena);
if (m.type == weaselab::VersionedMap::Set) {
// VersionedMap (data) is bookkeeping all empty ranges. If the key to be
// set is new, it is supposed to be in a range what was empty. Break the
// empty range into halves.
auto prev = versionedData.atLatest().lastLessOrEqual(param1);
if (prev && prev->isClearTo() && prev->getEndKey() > param1) {
assert(prev.key() <= param1);
KeyRef end = prev->getEndKey();
// the insert version of the previous clear is preserved for
// the "left half", because in changeDurableVersion() the
// previous clear is still responsible for removing it
// insert() invalidates prev, so prev.key() is not safe to
// pass to it by reference
versionedData.insert(KeyRef(prev.key()),
ValueOrClearToRef::clearTo(param1),
prev.insertVersion()); // overwritten by below
// insert if empty
KeyRef nextKey = keyAfter(param1, arena);
if (end != nextKey) {
assert(end > nextKey);
// the insert version of the "right half" is not preserved,
// because in changeDurableVersion() this set is responsible
// for removing it
// FIXME: This copy is technically an asymptotic problem,
// definitely a waste of memory (copy of keyAfter is a
// waste, but not asymptotic)
versionedData.insert(nextKey,
ValueOrClearToRef::clearTo(copy(arena, end)));
}
}
versionedData.insert(param1, ValueOrClearToRef::value(param2));
} else if (m.type == VersionedMap::Clear) {
if (param2.len == 0) {
param2 = keyAfter(param1, arena);
}
expandClear(param1, param2, versionedData, arena);
versionedData.erase(param1, param2);
assert(param2 > param1);
assert(
!versionedData.isClearContaining(versionedData.atLatest(), param1));
versionedData.insert(param1, ValueOrClearToRef::clearTo(param2));
}
verUpdateRef.mutations.push_back(
Mutation{param1.p, param2.p, param1.len, param2.len, m.type});
}
}
void setOldestVersion(int64_t oldestVersion) {
auto &freeVec = freeable[versionedData.getLatestVersion()];
auto iter = mutationLog.begin();
while (iter != mutationLog.end() && iter->first <= oldestVersion) {
for (const auto &m : iter->second.mutations) {
auto param1 = Key{m.param1, m.param1Len};
auto i = versionedData.atLatest().find(param1);
if (i) {
assert(i.key() == param1);
if (i.insertVersion() <= oldestVersion)
versionedData.erase(i);
}
if (m.type == Set) {
// A set can split a clear, so there might be another entry
// immediately after this one that should also be cleaned up
i = versionedData.atLatest().upper_bound(param1);
if (i) {
if (i.insertVersion() <= oldestVersion)
versionedData.erase(i);
}
}
}
freeVec.push_back(std::move(iter->second.arena));
iter = mutationLog.erase(iter);
}
versionedData.forgetVersionsBefore(oldestVersion);
freeable.erase(freeable.begin(), freeable.lower_bound(oldestVersion));
}
};
VersionedMap::Impl *internal_makeImpl(int64_t version) {
return new (safe_malloc(sizeof(VersionedMap::Impl))) VersionedMap::Impl();
}
VersionedMap::VersionedMap(int64_t version)
: impl(internal_makeImpl(version)) {}
VersionedMap::~VersionedMap() {
if (impl != nullptr) {
impl->~Impl();
safe_free(impl, sizeof(*impl));
}
}
VersionedMap::VersionedMap(VersionedMap &&other) noexcept {
impl = std::exchange(other.impl, nullptr);
}
VersionedMap &VersionedMap::operator=(VersionedMap &&other) noexcept {
impl = std::exchange(other.impl, nullptr);
return *this;
}
void VersionedMap::addMutations(const Mutation *mutations, int numMutations,
int64_t version) {
impl->addMutations(mutations, numMutations, version);
}
struct VersionedMap::Iterator::Impl {
VersionedMap::Impl::VersionedData::iterator iter;
};
VersionedMap::Iterator::~Iterator() {
if (impl != nullptr) {
impl->~Impl();
safe_free(impl, sizeof(*impl));
}
}
VersionedMap::Iterator::Iterator(const Iterator &other)
: impl(new(safe_malloc(sizeof(Impl))) Impl{other.impl->iter}) {}
VersionedMap::Iterator &
VersionedMap::Iterator::operator=(const Iterator &other) {
if (impl != nullptr) {
impl->~Impl();
safe_free(impl, sizeof(*impl));
}
impl = new (safe_malloc(sizeof(Impl))) Impl{other.impl->iter};
return *this;
}
VersionedMap::Iterator::Iterator(Iterator &&other) noexcept
: impl(std::exchange(other.impl, nullptr)) {}
VersionedMap::Iterator &
VersionedMap::Iterator::operator=(Iterator &&other) noexcept {
if (impl != nullptr) {
impl->~Impl();
safe_free(impl, sizeof(*impl));
}
impl = std::exchange(other.impl, nullptr);
return *this;
}
VersionedMap::Iterator::VersionedMutation
VersionedMap::Iterator::operator*() const {
assert(impl->iter);
VersionedMap::Iterator::VersionedMutation result;
result.param1 = impl->iter.key().p;
result.param1Len = impl->iter.key().len;
if (impl->iter->isClearTo()) {
result.type = VersionedMap::Clear;
result.param2 = impl->iter->getEndKey().p;
result.param2Len = impl->iter->getEndKey().len;
} else {
result.type = VersionedMap::Set;
result.param2 = impl->iter->getValue().p;
result.param2Len = impl->iter->getValue().len;
}
result.notModifiedSince = impl->iter.insertVersion();
return result;
}
VersionedMap::Iterator &VersionedMap::Iterator::operator++() {
++impl->iter;
return *this;
}
VersionedMap::Iterator VersionedMap::Iterator::operator++(int) {
// TODO interposable call
auto result = *this;
++impl->iter;
return result;
}
VersionedMap::Iterator &VersionedMap::Iterator::operator--() {
--impl->iter;
return *this;
}
VersionedMap::Iterator VersionedMap::Iterator::operator--(int) {
// TODO interposable call
auto result = *this;
--impl->iter;
return result;
}
bool VersionedMap::Iterator::operator==(const Iterator &other) const {
assert(impl != nullptr && other.impl != nullptr);
return impl->iter == other.impl->iter;
}
bool VersionedMap::Iterator::operator!=(const Iterator &other) const {
assert(impl != nullptr && other.impl != nullptr);
return impl->iter != other.impl->iter;
}
void VersionedMap::firstGeq(const Key *key, const int64_t *version,
Iterator *iterator, int count) const {
for (int i = 0; i < count; i++) {
if (iterator[i].impl != nullptr) {
iterator[i].impl->~Impl();
new (iterator[i].impl) Iterator::Impl();
} else {
// TODO re-use root if version if matches
iterator[i].impl =
new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
}
auto view = impl->versionedData.at(version[i]);
iterator[i].impl->iter = view.lastLessOrEqual(key[i]);
// Increment if the mutation is < key[i], and doesn't intersect it
if (iterator[i].impl->iter) {
if (iterator[i].impl->iter->isValue() &&
iterator[i].impl->iter.key() < key[i]) {
++iterator[i].impl->iter;
} else if (iterator[i].impl->iter->isClearTo() &&
iterator[i].impl->iter->getEndKey() <= key[i]) {
++iterator[i].impl->iter;
}
} else {
iterator[i].impl->iter = view.begin();
}
}
}
VersionedMap::Iterator VersionedMap::begin(int64_t version) const {
Iterator result;
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
result.impl->iter = impl->versionedData.at(version).begin();
return result;
}
VersionedMap::Iterator VersionedMap::end(int64_t version) const {
Iterator result;
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
result.impl->iter = impl->versionedData.at(version).end();
return result;
}
int64_t VersionedMap::getVersion() const {
return impl->versionedData.latestVersion;
}
int64_t VersionedMap::getOldestVersion() const {
return impl->versionedData.oldestVersion;
}
void VersionedMap::setOldestVersion(int64_t oldestVersion) {
impl->setOldestVersion(oldestVersion);
}
int64_t VersionedMap::getBytes() const {
// TODO
return -1;
}
} // namespace weaselab