diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 3f353c2ef..074a18628 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -22,6 +22,9 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/workloads) add_flow_target(EXECUTABLE NAME fdbserver SRCS ${FDBSERVER_SRCS}) +find_package(conflict-set) +target_link_libraries(fdbserver PRIVATE conflict-set-static) + if (WITH_SWIFT) # Setup the Swift sources in FDBServer. include(FindSwiftLibs) diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index b48d32c6b..c83ae4f7a 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include "flow/Platform.h" #include "fdbrpc/fdbrpc.h" @@ -34,6 +35,8 @@ #include "fdbclient/SystemData.h" #include "fdbserver/ConflictSet.h" +#define USE_RADIX_TREE 1 + static std::vector skc; static thread_local uint32_t g_seed = 0; @@ -783,10 +786,14 @@ private: }; struct ConflictSet { - ConflictSet() : removalKey(makeString(0)), oldestVersion(0) {} + ConflictSet() : versionHistory(0), removalKey(makeString(0)), oldestVersion(0) {} ~ConflictSet() {} +#if USE_RADIX_TREE + weaselab::ConflictSet versionHistory; +#else SkipList versionHistory; +#endif Key removalKey; Version oldestVersion; }; @@ -795,7 +802,11 @@ ConflictSet* newConflictSet() { return new ConflictSet; } void clearConflictSet(ConflictSet* cs, Version v) { +#if USE_RADIX_TREE + cs->versionHistory = weaselab::ConflictSet{ v }; +#else SkipList(v).swap(cs->versionHistory); +#endif } void destroyConflictSet(ConflictSet* cs) { delete cs; @@ -971,11 +982,15 @@ void ConflictBatch::detectConflicts(Version now, t = timer(); if (newOldestVersion > cs->oldestVersion) { cs->oldestVersion = newOldestVersion; +#if USE_RADIX_TREE + cs->versionHistory.setOldestVersion(newOldestVersion); +#else SkipList::Finger finger; int temp; cs->versionHistory.find(&cs->removalKey, &finger, &temp, 1); cs->versionHistory.removeBefore(cs->oldestVersion, finger, combinedWriteConflictRanges.size() * 3 + 10); cs->removalKey = finger.getValue(); +#endif } g_removeBefore += timer() - t; } @@ -984,8 +999,34 @@ void ConflictBatch::checkReadConflictRanges() { if (combinedReadConflictRanges.empty()) return; +#if USE_RADIX_TREE + Arena arena; + auto* reads = new (arena) weaselab::ConflictSet::ReadRange[combinedReadConflictRanges.size()]; + + for (int i = 0; i < combinedReadConflictRanges.size(); ++i) { + auto& read = reads[i]; + read.readVersion = combinedReadConflictRanges[i].version; + read.begin.p = combinedReadConflictRanges[i].begin.begin(); + read.begin.len = combinedReadConflictRanges[i].begin.size(); + read.end.p = combinedReadConflictRanges[i].end.begin(); + read.end.len = combinedReadConflictRanges[i].end.size(); + } + auto* results = new (arena) weaselab::ConflictSet::Result[combinedReadConflictRanges.size()]; + cs->versionHistory.check(reads, results, combinedReadConflictRanges.size()); + + for (int i = 0; i < combinedReadConflictRanges.size(); ++i) { + if (results[i] == weaselab::ConflictSet::Conflict) { + transactionConflictStatus[combinedReadConflictRanges[i].transaction] = true; + if (combinedReadConflictRanges[i].conflictingKeyRange != nullptr) { + combinedReadConflictRanges[i].conflictingKeyRange->push_back(*combinedReadConflictRanges[i].cKRArena, + combinedReadConflictRanges[i].indexInTx); + } + } + } +#else cs->versionHistory.detectConflicts( &combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus); +#endif } void ConflictBatch::addConflictRanges(Version now, @@ -1015,7 +1056,22 @@ void ConflictBatch::mergeWriteConflictRanges(Version now) { if (combinedWriteConflictRanges.empty()) return; +#if USE_RADIX_TREE + Arena arena; + auto* writes = new (arena) weaselab::ConflictSet::WriteRange[combinedWriteConflictRanges.size()]; + + for (int i = 0; i < combinedWriteConflictRanges.size(); ++i) { + auto& write = writes[i]; + write.begin.p = combinedWriteConflictRanges[i].first.begin(); + write.begin.len = combinedWriteConflictRanges[i].first.size(); + write.end.p = combinedWriteConflictRanges[i].second.begin(); + write.end.len = combinedWriteConflictRanges[i].second.size(); + } + + cs->versionHistory.addWrites(writes, combinedWriteConflictRanges.size(), now); +#else addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory); +#endif } void ConflictBatch::combineWriteConflictRanges() { @@ -1197,6 +1253,4 @@ void skipListTest() { for (const auto& counter : skc) { printf("%20s: %s\n", counter->getMetric().name().c_str(), counter->getMetric().formatted().c_str()); } - - printf("%d entries in version history\n", cs->versionHistory.count()); }