Add efficient firstGeq overload for latestVersion
This commit is contained in:
@@ -97,6 +97,9 @@ void bulkFirstGeq() {
|
|||||||
bench.run("bulkFirstGeq", [&] {
|
bench.run("bulkFirstGeq", [&] {
|
||||||
versionedMap.firstGeq(keys, versions, iterators, kNumQueries);
|
versionedMap.firstGeq(keys, versions, iterators, kNumQueries);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
bench.run("bulkFirstGeq (latest version)",
|
||||||
|
[&] { versionedMap.firstGeq(keys, iterators, kNumQueries); });
|
||||||
}
|
}
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
|
@@ -1416,6 +1416,35 @@ void VersionedMap::firstGeq(const Key *key, const int64_t *version,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void VersionedMap::firstGeq(const Key *key, Iterator *iterator,
|
||||||
|
int count) const {
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
if (iterator[i].impl != nullptr) {
|
||||||
|
iterator[i].impl->~Impl();
|
||||||
|
new (iterator[i].impl) Iterator::Impl();
|
||||||
|
} else {
|
||||||
|
// TODO re-use root if version if matches
|
||||||
|
iterator[i].impl =
|
||||||
|
new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto view = impl->versionedData.atLatest();
|
||||||
|
iterator[i].impl->iter = view.lastLessOrEqual(key[i]);
|
||||||
|
// Increment if the mutation is < key[i], and doesn't intersect it
|
||||||
|
if (iterator[i].impl->iter) {
|
||||||
|
if (iterator[i].impl->iter->isValue() &&
|
||||||
|
iterator[i].impl->iter.key() < key[i]) {
|
||||||
|
++iterator[i].impl->iter;
|
||||||
|
} else if (iterator[i].impl->iter->isClearTo() &&
|
||||||
|
iterator[i].impl->iter->getEndKey() <= key[i]) {
|
||||||
|
++iterator[i].impl->iter;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
iterator[i].impl->iter = view.begin();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
VersionedMap::Iterator VersionedMap::begin(int64_t version) const {
|
VersionedMap::Iterator VersionedMap::begin(int64_t version) const {
|
||||||
Iterator result;
|
Iterator result;
|
||||||
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
||||||
|
111
VersionedMap.cpp
111
VersionedMap.cpp
@@ -949,9 +949,23 @@ struct __attribute__((__visibility__("hidden"))) VersionedMap::Impl {
|
|||||||
totalMallocBytes += mallocBytesDelta;
|
totalMallocBytes += mallocBytesDelta;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct StepwiseFirstGeq {
|
||||||
|
|
||||||
|
const VersionedMap::Impl *map;
|
||||||
|
const weaselab::VersionedMap::Key *key;
|
||||||
|
int64_t version;
|
||||||
|
weaselab::VersionedMap::Iterator *iterator;
|
||||||
|
|
||||||
|
void begin(uint32_t root);
|
||||||
|
bool step();
|
||||||
|
void end();
|
||||||
|
};
|
||||||
|
|
||||||
void firstGeq(const Key *key, const int64_t *version, Iterator *iterator,
|
void firstGeq(const Key *key, const int64_t *version, Iterator *iterator,
|
||||||
int count) const;
|
int count) const;
|
||||||
|
|
||||||
|
void firstGeq(const Key *key, Iterator *iterator, int count) const;
|
||||||
|
|
||||||
// State used to resume scanning and removing old entries in `addMutations`
|
// State used to resume scanning and removing old entries in `addMutations`
|
||||||
Key continueKey;
|
Key continueKey;
|
||||||
Arena continueArena;
|
Arena continueArena;
|
||||||
@@ -1222,36 +1236,20 @@ bool geq(const VersionedMap::Iterator::VersionedMutation &m,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
void VersionedMap::Impl::StepwiseFirstGeq::begin(uint32_t root) {
|
||||||
const int64_t *version, Iterator *iterator,
|
|
||||||
int count) const {
|
|
||||||
if (count == 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct StepwiseFirstGeq {
|
|
||||||
|
|
||||||
const VersionedMap::Impl *map;
|
|
||||||
const weaselab::VersionedMap::Key *key;
|
|
||||||
int64_t version;
|
|
||||||
weaselab::VersionedMap::Iterator *iterator;
|
|
||||||
|
|
||||||
void begin(RootSet::ThreadSafeHandle handle) {
|
|
||||||
if (iterator->impl != nullptr) {
|
if (iterator->impl != nullptr) {
|
||||||
iterator->impl->~Impl();
|
iterator->impl->~Impl();
|
||||||
new (iterator->impl) Iterator::Impl();
|
new (iterator->impl) Iterator::Impl();
|
||||||
} else {
|
} else {
|
||||||
iterator->impl =
|
iterator->impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
||||||
new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
|
||||||
}
|
}
|
||||||
uint32_t root = handle.rootForVersion(version);
|
|
||||||
Finger &finger = iterator->impl->finger;
|
Finger &finger = iterator->impl->finger;
|
||||||
finger.clear();
|
finger.clear();
|
||||||
bool ignored = false;
|
bool ignored = false;
|
||||||
finger.push(root, ignored);
|
finger.push(root, ignored);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool step() {
|
bool VersionedMap::Impl::StepwiseFirstGeq::step() {
|
||||||
Finger &finger = iterator->impl->finger;
|
Finger &finger = iterator->impl->finger;
|
||||||
|
|
||||||
auto n = finger.backNode();
|
auto n = finger.backNode();
|
||||||
@@ -1263,12 +1261,11 @@ void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
|||||||
// No duplicates
|
// No duplicates
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
finger.push(map->child<std::memory_order_acquire>(n, c > 0, version),
|
finger.push(map->child<std::memory_order_acquire>(n, c > 0, version), c > 0);
|
||||||
c > 0);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void end() {
|
void VersionedMap::Impl::StepwiseFirstGeq::end() {
|
||||||
Finger &finger = iterator->impl->finger;
|
Finger &finger = iterator->impl->finger;
|
||||||
if (finger.searchPathSize() > 0 && finger.backNode() == 0) {
|
if (finger.searchPathSize() > 0 && finger.backNode() == 0) {
|
||||||
map->move<std::memory_order_acquire, true>(finger, version);
|
map->move<std::memory_order_acquire, true>(finger, version);
|
||||||
@@ -1297,9 +1294,15 @@ void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
|||||||
iterator->impl->map->move<std::memory_order_acquire, true>(
|
iterator->impl->map->move<std::memory_order_acquire, true>(
|
||||||
finger, iterator->impl->version);
|
finger, iterator->impl->version);
|
||||||
}
|
}
|
||||||
loopEnd:;
|
loopEnd:;
|
||||||
|
}
|
||||||
|
|
||||||
|
void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
||||||
|
const int64_t *version, Iterator *iterator,
|
||||||
|
int count) const {
|
||||||
|
if (count == 0) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
// Use stack allocation for small count
|
// Use stack allocation for small count
|
||||||
Arena arena;
|
Arena arena;
|
||||||
@@ -1322,7 +1325,56 @@ void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
|||||||
stepwise[i].key = &key[i];
|
stepwise[i].key = &key[i];
|
||||||
stepwise[i].version = version[i];
|
stepwise[i].version = version[i];
|
||||||
stepwise[i].iterator = &iterator[i];
|
stepwise[i].iterator = &iterator[i];
|
||||||
stepwise[i].begin(handle);
|
stepwise[i].begin(handle.rootForVersion(version[i]));
|
||||||
|
nextJob[i] = i + 1;
|
||||||
|
}
|
||||||
|
nextJob[count - 1] = 0;
|
||||||
|
int prevJob = count - 1;
|
||||||
|
int job = 0;
|
||||||
|
|
||||||
|
for (;;) {
|
||||||
|
if (stepwise[job].step()) {
|
||||||
|
stepwise[job].end();
|
||||||
|
if (job == prevJob) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
nextJob[prevJob] = nextJob[job];
|
||||||
|
job = prevJob;
|
||||||
|
}
|
||||||
|
prevJob = job;
|
||||||
|
job = nextJob[job];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void VersionedMap::Impl::firstGeq(const weaselab::VersionedMap::Key *key,
|
||||||
|
Iterator *iterator, int count) const {
|
||||||
|
if (count == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use stack allocation for small count
|
||||||
|
Arena arena;
|
||||||
|
constexpr int kStackAllocThreshold = 2;
|
||||||
|
StepwiseFirstGeq stepwiseStackAlloc[kStackAllocThreshold];
|
||||||
|
int nextJobStackAllocation[kStackAllocThreshold];
|
||||||
|
StepwiseFirstGeq *stepwise;
|
||||||
|
int *nextJob;
|
||||||
|
if (count <= kStackAllocThreshold) {
|
||||||
|
stepwise = stepwiseStackAlloc;
|
||||||
|
nextJob = nextJobStackAllocation;
|
||||||
|
} else {
|
||||||
|
stepwise = new (arena) StepwiseFirstGeq[count];
|
||||||
|
nextJob = new (arena) int[count];
|
||||||
|
}
|
||||||
|
|
||||||
|
const uint32_t root = roots.roots()[roots.rootCount() - 1];
|
||||||
|
assert(root == roots.getThreadSafeHandle().rootForVersion(latestVersion));
|
||||||
|
for (int i = 0; i < count; ++i) {
|
||||||
|
stepwise[i].map = this;
|
||||||
|
stepwise[i].key = &key[i];
|
||||||
|
stepwise[i].version = latestVersion;
|
||||||
|
stepwise[i].iterator = &iterator[i];
|
||||||
|
stepwise[i].begin(root);
|
||||||
nextJob[i] = i + 1;
|
nextJob[i] = i + 1;
|
||||||
}
|
}
|
||||||
nextJob[count - 1] = 0;
|
nextJob[count - 1] = 0;
|
||||||
@@ -1355,6 +1407,11 @@ void VersionedMap::firstGeq(const Key *key, const int64_t *version,
|
|||||||
impl->firstGeq(key, version, iterator, count);
|
impl->firstGeq(key, version, iterator, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void VersionedMap::firstGeq(const Key *key, Iterator *iterator,
|
||||||
|
int count) const {
|
||||||
|
impl->firstGeq(key, iterator, count);
|
||||||
|
}
|
||||||
|
|
||||||
VersionedMap::Iterator VersionedMap::begin(int64_t version) const {
|
VersionedMap::Iterator VersionedMap::begin(int64_t version) const {
|
||||||
VersionedMap::Iterator result;
|
VersionedMap::Iterator result;
|
||||||
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
result.impl = new (safe_malloc(sizeof(Iterator::Impl))) Iterator::Impl();
|
||||||
|
@@ -27,4 +27,5 @@ __ZNK8weaselab12VersionedMap8IteratordeEv
|
|||||||
__ZNK8weaselab12VersionedMap8IteratoreqERKS1_
|
__ZNK8weaselab12VersionedMap8IteratoreqERKS1_
|
||||||
__ZNK8weaselab12VersionedMap8IteratorneERKS1_
|
__ZNK8weaselab12VersionedMap8IteratorneERKS1_
|
||||||
__ZNK8weaselab12VersionedMap8firstGeqEPKNS0_3KeyEPKxPNS0_8IteratorEi
|
__ZNK8weaselab12VersionedMap8firstGeqEPKNS0_3KeyEPKxPNS0_8IteratorEi
|
||||||
|
__ZNK8weaselab12VersionedMap8firstGeqEPKNS0_3KeyEPNS0_8IteratorEi
|
||||||
__ZNK8weaselab12VersionedMap8getBytesEv
|
__ZNK8weaselab12VersionedMap8getBytesEv
|
@@ -158,6 +158,11 @@ struct __attribute__((__visibility__("default"))) VersionedMap {
|
|||||||
void firstGeq(const Key *key, const int64_t *version, Iterator *iterator,
|
void firstGeq(const Key *key, const int64_t *version, Iterator *iterator,
|
||||||
int count) const;
|
int count) const;
|
||||||
|
|
||||||
|
/** Equivalent to `firstGeq(key, version, iterator, 1)` where each element in
|
||||||
|
* `version` is the latest version, but more efficient.
|
||||||
|
*/
|
||||||
|
void firstGeq(const Key *key, Iterator *iterator, int count) const;
|
||||||
|
|
||||||
/** Returns an iterator to the first mutation visible in the view at
|
/** Returns an iterator to the first mutation visible in the view at
|
||||||
* `version`, or `end()` if none exists. Thread-safe as long as `version` is
|
* `version`, or `end()` if none exists. Thread-safe as long as `version` is
|
||||||
* not concurrently invalidated by `setOldestVersion`. */
|
* not concurrently invalidated by `setOldestVersion`. */
|
||||||
|
Reference in New Issue
Block a user