#pragma once #include "KeyCompare.h" #include "VersionedMap.h" #include #include #include struct Facade { explicit Facade(int64_t version) : oldestVersion(version), nextPurgeVersion(version), unversionedVersion(version), versioned(version) {} struct View { std::vector> rangeRead(const String &begin, const String &end, int limit, bool reverse) const { std::vector> result; if (begin >= end) { return result; } weaselab::VersionedMap::Iterator versionedIter[2]; const weaselab::VersionedMap::Key key[] = { {begin.data(), int(begin.size())}, {end.data(), int(end.size())}}; const int64_t v[] = {version, version}; facade->versioned.firstGeq(key, v, versionedIter, 2); // Make sure versionedIter[1] param1 is >= `end` if (versionedIter[1] != facade->versioned.end(version)) { auto m = *versionedIter[1]; if (String(m.param1, m.param1Len) < end) { ++versionedIter[1]; } } if (versionedIter[0] == versionedIter[1]) { // No mutations intersect [begin, end) facade->unversionedRead(begin, end, limit, reverse, result); return result; } if (reverse) { weaselab::VersionedMap::Iterator iter = versionedIter[1]; weaselab::VersionedMap::Iterator endIter = versionedIter[0]; auto unversionedIter = facade->unversioned.lower_bound(end); const auto beginIter = facade->unversioned.begin(); do { --iter; auto m = *iter; auto mBegin = weaselab::VersionedMap::Key{m.param1, m.param1Len}; auto mEnd = m.type == weaselab::VersionedMap::Set || m.param2Len == 0 ? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1} : weaselab::VersionedMap::Key{m.param2, m.param2Len}; // Read from unversioned down to mEnd if (unversionedIter != beginIter) { --unversionedIter; for (; unversionedIter->first >= mEnd && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { break; } } } if (limit == 0) { return result; } bool pointMutation = m.type == weaselab::VersionedMap::Set || m.param2Len == 0; // Read from versioned until non-adjacent readVersionedReverse: switch (m.type) { case weaselab::VersionedMap::Set: { result.emplace_back(String(mBegin.p, mBegin.len), String(m.param2, m.param2Len)); --limit; if (limit == 0) { return result; } } break; case weaselab::VersionedMap::Clear: break; } if (iter != endIter) { --iter; auto tmpM = *iter; const auto tmpMBegin = weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len}; const auto tmpMEnd = tmpM.type == weaselab::VersionedMap::Set || tmpM.param2Len == 0 ? weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len + 1} : weaselab::VersionedMap::Key{tmpM.param2, tmpM.param2Len}; if (tmpMEnd >= mBegin) { // Adjacent with last (temporally) mutation mBegin = tmpMBegin; mEnd = tmpMEnd; m = tmpM; pointMutation = false; goto readVersionedReverse; } else { ++iter; } } // Advance unversioned iter if (pointMutation) { if (unversionedIter != facade->unversioned.end() && unversionedIter->first < mBegin) { ++unversionedIter; } assert(unversionedIter == facade->unversioned.lower_bound( String(mBegin.p, mBegin.len))); } else { unversionedIter = facade->unversioned.lower_bound(String(mBegin.p, mBegin.len)); } } while (iter != endIter); if (unversionedIter != beginIter) { --unversionedIter; for (; unversionedIter->first >= begin && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { break; } } } return result; } else { auto unversionedIter = facade->unversioned.lower_bound(begin); for (auto iter = versionedIter[0]; iter != versionedIter[1];) { auto m = *iter; auto mBegin = weaselab::VersionedMap::Key{m.param1, m.param1Len}; auto mEnd = m.type == weaselab::VersionedMap::Set || m.param2Len == 0 ? weaselab::VersionedMap::Key{m.param1, m.param1Len + 1} : weaselab::VersionedMap::Key{m.param2, m.param2Len}; // Read from unversioned up to mBegin for (; unversionedIter != facade->unversioned.end() && unversionedIter->first < mBegin && limit > 0;) { result.push_back(*unversionedIter); --limit; ++unversionedIter; } if (limit == 0) { return result; } bool pointMutation = m.type == weaselab::VersionedMap::Set || m.param2Len == 0; // Read from versioned until non-adjacent readVersionedForward: switch (m.type) { case weaselab::VersionedMap::Set: { result.emplace_back(String(mBegin.p, mBegin.len), String(m.param2, m.param2Len)); --limit; if (limit == 0) { return result; } } break; case weaselab::VersionedMap::Clear: break; } ++iter; if (iter != versionedIter[1]) { auto tmpM = *iter; if (weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len} <= mEnd) { // Adjacent with last mutation mBegin = weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len}; mEnd = tmpM.type == weaselab::VersionedMap::Set || tmpM.param2Len == 0 ? weaselab::VersionedMap::Key{tmpM.param1, tmpM.param1Len + 1} : weaselab::VersionedMap::Key{tmpM.param2, tmpM.param2Len}; m = tmpM; pointMutation = false; goto readVersionedForward; } } // Advance unversioned iter if (pointMutation) { // Point mutation if (unversionedIter != facade->unversioned.end() && unversionedIter->first <= mBegin) { ++unversionedIter; } assert(unversionedIter == facade->unversioned.lower_bound(String(mEnd.p, mEnd.len))); } else { // Range mutation unversionedIter = facade->unversioned.lower_bound(String(mEnd.p, mEnd.len)); } } // Finish reading from unversioned for (; unversionedIter != facade->unversioned.end() && unversionedIter->first < end && limit > 0; ++unversionedIter) { result.push_back(*unversionedIter); --limit; } return result; } } /** @private */ View(const Facade *facade, int64_t version) : facade{facade}, version{version} {} private: const Facade *facade; int64_t version; }; void addMutations(const weaselab::VersionedMap::Mutation *mutations, int numMutations, int64_t version) { versioned.addMutations(mutations, numMutations, version); } void setOldestVersion(int64_t version, bool force = false) { // Don't scan and apply mutations every time setOldestVersion is called. oldestVersion = version; if (!force) { if (version >= nextPurgeVersion) { nextPurgeVersion = versioned.getVersion(); } else { return; } } for (auto iter = versioned.begin(version), end = versioned.end(version); iter != end; ++iter) { auto m = *iter; assert(m.notModifiedSince <= version); if (m.notModifiedSince <= unversionedVersion) { continue; } switch (m.type) { case weaselab::VersionedMap::Set: unversioned[String(m.param1, m.param1Len)] = String(m.param2, m.param2Len); break; case weaselab::VersionedMap::Clear: if (m.param2Len == 0) { unversioned.erase(String(m.param1, m.param1Len)); } else { for (auto unversionedIter = unversioned.lower_bound(String(m.param1, m.param1Len)); unversionedIter != unversioned.end() && unversionedIter->first < String(m.param2, m.param2Len);) { unversionedIter = unversioned.erase(unversionedIter); } } break; } } unversionedVersion = version; versioned.setOldestVersion(version); } View viewAt(int64_t version) const { assert(version >= oldestVersion); assert(version <= versioned.getVersion()); return View{this, version}; } int64_t getVersion() const { return versioned.getVersion(); } int64_t getOldestVersion() const { return oldestVersion; } void unversionedRead(const String &begin, const String &end, int &limit, bool reverse, std::vector> &result) const { if (begin >= end) { return; } if (reverse) { auto unversionedIter = unversioned.lower_bound(end); const auto beginIter = unversioned.begin(); if (unversionedIter == beginIter) { return; } --unversionedIter; for (; unversionedIter->first >= begin && limit > 0; --unversionedIter) { result.push_back(*unversionedIter); --limit; if (unversionedIter == beginIter) { return; } } } else { for (auto iter = unversioned.lower_bound(begin), iterEnd = unversioned.lower_bound(end); iter != iterEnd && limit > 0; ++iter) { result.push_back(*iter); --limit; } } } int64_t oldestVersion; int64_t nextPurgeVersion; int64_t unversionedVersion; std::map unversioned; weaselab::VersionedMap versioned; };