Skip unversioned until non-adjacent mutation in facade

Only implemented for forward reads so far
This commit is contained in:
2024-06-17 11:57:11 -07:00
parent b2ce851d56
commit fcb881f408
2 changed files with 99 additions and 12 deletions

View File

@@ -103,7 +103,62 @@ void bulkFirstGeq() {
[&] { versionedMap.firstGeq(keys, iterators, kNumQueries); }); [&] { versionedMap.firstGeq(keys, iterators, kNumQueries); });
} }
void facadeVersionedOnlyRead() {
Facade facade{0};
constexpr int kNumKeys = 10000;
ankerl::nanobench::Bench bench;
bench.batch(kNumKeys);
const int64_t beginBytes = __builtin_bswap64(0);
const int64_t endBytes = __builtin_bswap64(kNumKeys);
const String begin{(const uint8_t *)&beginBytes, 8};
const String end{(const uint8_t *)&endBytes, 8};
// Insert clear in entire range, so that all reads are served from versioned,
// logically and hopefully physically.
{
weaselab::VersionedMap::Mutation mutations[] = {
{begin.data(), int(begin.size()), end.data(), int(end.size()),
weaselab::VersionedMap::Clear},
};
facade.addMutations(mutations, sizeof(mutations) / sizeof(mutations[0]),
facade.getVersion() + 1);
}
// Add keys
for (int i = 0; i < kNumKeys; ++i) {
const int64_t k = __builtin_bswap64(i);
weaselab::VersionedMap::Mutation mutations[] = {
{(const uint8_t *)&k, 8, (const uint8_t *)&k, 8,
weaselab::VersionedMap::Set},
};
facade.addMutations(mutations, sizeof(mutations) / sizeof(mutations[0]),
facade.getVersion() + 1);
}
// Populate the unversioned map
for (int i = 0; i < kNumKeys; ++i) {
const int64_t k = __builtin_bswap64(i);
weaselab::VersionedMap::Mutation mutations[] = {
{(const uint8_t *)&k, 8, (const uint8_t *)&k, 8,
weaselab::VersionedMap::Set},
};
facade.addMutations(mutations, sizeof(mutations) / sizeof(mutations[0]),
facade.getVersion() + 1);
}
facade.setOldestVersion(facade.getVersion() - 1, /*force*/ true);
bench.run("Facade versioned-only read forward", [&]() {
facade.viewAt(facade.getVersion()).rangeRead(begin, end, kNumKeys, false);
});
bench.run("Facade versioned-only read reverse", [&]() {
facade.viewAt(facade.getVersion()).rangeRead(begin, end, kNumKeys, true);
});
}
int main() { int main() {
monotonicallyIncreasing(); monotonicallyIncreasing();
bulkFirstGeq(); bulkFirstGeq();
facadeVersionedOnlyRead();
} }

View File

@@ -112,16 +112,17 @@ struct Facade {
return result; return result;
} else { } else {
auto unversionedIter = facade->unversioned.lower_bound(begin); auto unversionedIter = facade->unversioned.lower_bound(begin);
for (auto iter = versionedIter[0]; iter != versionedIter[1]; ++iter) { for (auto iter = versionedIter[0]; iter != versionedIter[1];) {
auto m = *iter; auto m = *iter;
const auto mBegin = auto mBegin = weaselab::VersionedMap::Key{m.param1, m.param1Len};
weaselab::VersionedMap::Key{m.param1, m.param1Len}; auto mEnd =
const auto mEnd =
m.type == weaselab::VersionedMap::Set || m.param2Len == 0 m.type == weaselab::VersionedMap::Set || m.param2Len == 0
? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1} ? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1}
: weaselab::VersionedMap::Key{m.param2, m.param2Len}; : weaselab::VersionedMap::Key{m.param2, m.param2Len};
// Read from unversioned up to mBegin
for (; unversionedIter != facade->unversioned.end() && for (; unversionedIter != facade->unversioned.end() &&
(unversionedIter->first <=> mBegin) < 0 && limit > 0;) { unversionedIter->first < mBegin && limit > 0;) {
result.push_back(*unversionedIter); result.push_back(*unversionedIter);
--limit; --limit;
++unversionedIter; ++unversionedIter;
@@ -129,6 +130,12 @@ struct Facade {
if (limit == 0) { if (limit == 0) {
return result; return result;
} }
bool pointMutation =
m.type == weaselab::VersionedMap::Set || m.param2Len == 0;
// Read from versioned until non-adjacent
readVersioned:
switch (m.type) { switch (m.type) {
case weaselab::VersionedMap::Set: { case weaselab::VersionedMap::Set: {
result.emplace_back(String(mBegin.p, mBegin.len), result.emplace_back(String(mBegin.p, mBegin.len),
@@ -141,19 +148,42 @@ struct Facade {
case weaselab::VersionedMap::Clear: case weaselab::VersionedMap::Clear:
break; break;
} }
if (m.type == weaselab::VersionedMap::Set || m.param2Len == 0) { ++iter;
if (iter != versionedIter[1]) {
auto tmpM = *iter;
if (weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len} <=
mEnd) {
// Adjacent with last mutation
mBegin = weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len};
mEnd = tmpM.type == weaselab::VersionedMap::Set ||
tmpM.param2Len == 0
? weaselab::VersionedMap::Key{tmpM.param1,
tmpM.param1Len + 1}
: weaselab::VersionedMap::Key{tmpM.param2,
tmpM.param2Len};
m = tmpM;
pointMutation = false;
goto readVersioned;
}
}
// Advance unversioned iter
if (pointMutation) {
// Point mutation
if (unversionedIter != facade->unversioned.end() && if (unversionedIter != facade->unversioned.end() &&
(unversionedIter->first <=> mBegin) == 0) { unversionedIter->first <= mBegin) {
++unversionedIter; ++unversionedIter;
} }
assert(unversionedIter == assert(unversionedIter ==
facade->unversioned.lower_bound(String(mEnd.p, mEnd.len))); facade->unversioned.lower_bound(String(mEnd.p, mEnd.len)));
} else { } else {
// Range mutation
unversionedIter = unversionedIter =
facade->unversioned.lower_bound(String(mEnd.p, mEnd.len)); facade->unversioned.lower_bound(String(mEnd.p, mEnd.len));
} }
} }
// Finish reading from unversioned
for (; unversionedIter != facade->unversioned.end() && for (; unversionedIter != facade->unversioned.end() &&
unversionedIter->first < end && limit > 0; unversionedIter->first < end && limit > 0;
++unversionedIter) { ++unversionedIter) {
@@ -178,14 +208,16 @@ struct Facade {
versioned.addMutations(mutations, numMutations, version); versioned.addMutations(mutations, numMutations, version);
} }
void setOldestVersion(int64_t version) { void setOldestVersion(int64_t version, bool force = false) {
// Don't scan and apply mutations every time setOldestVersion is called. // Don't scan and apply mutations every time setOldestVersion is called.
oldestVersion = version; oldestVersion = version;
if (version >= nextPurgeVersion) { if (!force) {
nextPurgeVersion = versioned.getVersion(); if (version >= nextPurgeVersion) {
} else { nextPurgeVersion = versioned.getVersion();
return; } else {
return;
}
} }
for (auto iter = versioned.begin(version), end = versioned.end(version); for (auto iter = versioned.begin(version), end = versioned.end(version);