#pragma once #include "KeyCompare.h" #include "VersionedMap.h" #include #include #include struct Facade { explicit Facade(int64_t version) : oldestVersion(version), nextPurgeVersion(version), unversionedVersion(version), versioned(version) {} struct View { std::vector> rangeRead(const String &begin, const String &end, int limit, bool reverse) const { std::vector> result; if (begin >= end) { return result; } weaselab::VersionedMap::Iterator versionedIter[2]; const weaselab::VersionedMap::Key key[] = { {begin.data(), int(begin.size())}, {end.data(), int(end.size())}}; const int64_t v[] = {version, version}; facade->versioned.firstGeq(key, v, versionedIter, 2); // Make sure versionedIter[1] param1 is >= `end` if (versionedIter[1] != facade->versioned.end(version)) { auto m = *versionedIter[1]; if (String(m.param1, m.param1Len) < end) { ++versionedIter[1]; } } if (versionedIter[0] == versionedIter[1]) { // No mutations intersect [begin, end) facade->unversionedRead(begin, end, limit, reverse, result); return result; } static const uint8_t zero_[] = {0}; static const String zero{zero_, 1}; if (reverse) { weaselab::VersionedMap::Iterator iter = versionedIter[1]; weaselab::VersionedMap::Iterator endIter = versionedIter[0]; auto unversionedIter = facade->unversioned.lower_bound(end); const auto beginIter = facade->unversioned.begin(); do { --iter; auto m = *iter; const auto mBegin = weaselab::VersionedMap::Key{m.param1, m.param1Len}; const auto mEnd = m.type == weaselab::VersionedMap::Set || m.param2Len == 0 ? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1} : weaselab::VersionedMap::Key{m.param2, m.param2Len}; if (unversionedIter != beginIter) { --unversionedIter; for (; unversionedIter->first >= mEnd && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { break; } } } if (limit == 0) { return result; } switch (m.type) { case weaselab::VersionedMap::Set: { result.emplace_back(String(mBegin.p, mBegin.len), String(m.param2, m.param2Len)); --limit; if (limit == 0) { return result; } } break; case weaselab::VersionedMap::Clear: break; } if (m.type == weaselab::VersionedMap::Set || m.param2Len == 0) { if (unversionedIter != facade->unversioned.end() && unversionedIter->first < mBegin) { ++unversionedIter; } assert(unversionedIter == facade->unversioned.lower_bound( String(mBegin.p, mBegin.len))); } else { unversionedIter = facade->unversioned.lower_bound(String(mBegin.p, mBegin.len)); } } while (iter != endIter); if (unversionedIter != beginIter) { --unversionedIter; for (; unversionedIter->first >= begin && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { break; } } } return result; } else { auto unversionedIter = facade->unversioned.lower_bound(begin); for (auto iter = versionedIter[0]; iter != versionedIter[1]; ++iter) { auto m = *iter; const auto mBegin = weaselab::VersionedMap::Key{m.param1, m.param1Len}; const auto mEnd = m.type == weaselab::VersionedMap::Set || m.param2Len == 0 ? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1} : weaselab::VersionedMap::Key{m.param2, m.param2Len}; auto c = unversionedIter->first <=> mBegin; for (; unversionedIter != facade->unversioned.end() && c < 0 && limit > 0;) { result.push_back(*unversionedIter); --limit; ++unversionedIter; c = unversionedIter->first <=> mBegin; } if (limit == 0) { return result; } switch (m.type) { case weaselab::VersionedMap::Set: { result.emplace_back(String(mBegin.p, mBegin.len), String(m.param2, m.param2Len)); --limit; if (limit == 0) { return result; } } break; case weaselab::VersionedMap::Clear: break; } if (m.type == weaselab::VersionedMap::Set || m.param2Len == 0) { if (unversionedIter != facade->unversioned.end() && c == 0) { ++unversionedIter; } assert(unversionedIter == facade->unversioned.lower_bound(String(mEnd.p, mEnd.len))); } else { unversionedIter = facade->unversioned.lower_bound(String(mEnd.p, mEnd.len)); } } for (; unversionedIter != facade->unversioned.end() && unversionedIter->first < end && limit > 0; ++unversionedIter) { result.push_back(*unversionedIter); --limit; } return result; } } /** @private */ View(const Facade *facade, int64_t version) : facade{facade}, version{version} {} private: const Facade *facade; int64_t version; }; void addMutations(const weaselab::VersionedMap::Mutation *mutations, int numMutations, int64_t version) { versioned.addMutations(mutations, numMutations, version); } void setOldestVersion(int64_t version) { // Don't scan and apply mutations every time setOldestVersion is called. oldestVersion = version; if (version >= nextPurgeVersion) { nextPurgeVersion = versioned.getVersion(); } else { return; } for (auto iter = versioned.begin(version), end = versioned.end(version); iter != end; ++iter) { auto m = *iter; assert(m.version <= version); if (m.version <= unversionedVersion) { continue; } switch (m.type) { case weaselab::VersionedMap::Set: unversioned[String(m.param1, m.param1Len)] = String(m.param2, m.param2Len); break; case weaselab::VersionedMap::Clear: if (m.param2Len == 0) { unversioned.erase(String(m.param1, m.param1Len)); } else { for (auto unversionedIter = unversioned.lower_bound(String(m.param1, m.param1Len)); unversionedIter != unversioned.end() && unversionedIter->first < String(m.param2, m.param2Len);) { unversionedIter = unversioned.erase(unversionedIter); } } break; } } unversionedVersion = version; versioned.setOldestVersion(version); } View viewAt(int64_t version) const { assert(version >= oldestVersion); assert(version <= versioned.getVersion()); return View{this, version}; } int64_t getVersion() const { return versioned.getVersion(); } int64_t getOldestVersion() const { return oldestVersion; } void unversionedRead(const String &begin, const String &end, int &limit, bool reverse, std::vector> &result) const { if (begin >= end) { return; } if (reverse) { auto unversionedIter = unversioned.lower_bound(end); const auto beginIter = unversioned.begin(); if (unversionedIter == beginIter) { return; } --unversionedIter; for (; unversionedIter->first >= begin && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { return; } } } else { for (auto iter = unversioned.lower_bound(begin), iterEnd = unversioned.lower_bound(end); iter != iterEnd && limit > 0; ++iter) { result.push_back(*iter); --limit; } } } int64_t oldestVersion; int64_t nextPurgeVersion; int64_t unversionedVersion; std::map unversioned; weaselab::VersionedMap versioned; };