giff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt index 3f353c2ef..cd0834761 100644 --- a/fdbserver/CMakeLists.txt +++ b/fdbserver/CMakeLists.txt @@ -22,6 +22,9 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/workloads) add_flow_target(EXECUTABLE NAME fdbserver SRCS ${FDBSERVER_SRCS}) +find_package(ConflictSet) +target_link_libraries(fdbserver PRIVATE conflict_set_static) + if (WITH_SWIFT) # Setup the Swift sources in FDBServer. include(FindSwiftLibs) diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp index bf4118f5f..d3b4eaad8 100644 --- a/fdbserver/Resolver.actor.cpp +++ b/fdbserver/Resolver.actor.cpp @@ -132,7 +132,7 @@ struct Resolver : ReferenceCounted { AsyncVar totalStateBytes; AsyncTrigger checkNeededVersion; std::map proxyInfoMap; - ConflictSet* conflictSet; + ConflictSet2* conflictSet; TransientStorageMetricSample iopsSample; // Use LogSystem as backend for txnStateStore. However, the real commit diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp index b48d32c6b..da106b5d2 100644 --- a/fdbserver/SkipList.cpp +++ b/fdbserver/SkipList.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include "flow/Platform.h" #include "fdbrpc/fdbrpc.h" @@ -34,6 +35,8 @@ #include "fdbclient/SystemData.h" #include "fdbserver/ConflictSet.h" +#define USE_RADIX_TREE 1 + static std::vector skc; static thread_local uint32_t g_seed = 0; @@ -782,26 +785,34 @@ private: } }; -struct ConflictSet { - ConflictSet() : removalKey(makeString(0)), oldestVersion(0) {} - ~ConflictSet() {} +struct ConflictSet2 { + ConflictSet2() : versionHistory(0), removalKey(makeString(0)), oldestVersion(0) {} + ~ConflictSet2() {} +#if USE_RADIX_TREE + ConflictSet versionHistory; +#else SkipList versionHistory; +#endif Key removalKey; Version oldestVersion; }; -ConflictSet* newConflictSet() { - return new ConflictSet; +ConflictSet2* newConflictSet() { + return new ConflictSet2; } -void clearConflictSet(ConflictSet* cs, Version v) { - SkipList(v).swap(cs->versionHistory); +void clearConflictSet(ConflictSet2* cs, Version v) { +#if USE_RADIX_TREE + cs->versionHistory = ConflictSet{ 0 }; +#else + SkipList().swap(cs->versionHistory); +#endif } -void destroyConflictSet(ConflictSet* cs) { +void destroyConflictSet(ConflictSet2* cs) { delete cs; } -ConflictBatch::ConflictBatch(ConflictSet* cs, +ConflictBatch::ConflictBatch(ConflictSet2* cs, std::map>* conflictingKeyRangeMap, Arena* resolveBatchReplyArena) : cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap), @@ -971,11 +982,15 @@ void ConflictBatch::detectConflicts(Version now, t = timer(); if (newOldestVersion > cs->oldestVersion) { cs->oldestVersion = newOldestVersion; +#if USE_RADIX_TREE + cs->versionHistory.setOldestVersion(newOldestVersion); +#else SkipList::Finger finger; int temp; cs->versionHistory.find(&cs->removalKey, &finger, &temp, 1); cs->versionHistory.removeBefore(cs->oldestVersion, finger, combinedWriteConflictRanges.size() * 3 + 10); cs->removalKey = finger.getValue(); +#endif } g_removeBefore += timer() - t; } @@ -984,8 +999,34 @@ void ConflictBatch::checkReadConflictRanges() { if (combinedReadConflictRanges.empty()) return; +#if USE_RADIX_TREE + Arena arena; + auto* reads = new (arena) ConflictSet::ReadRange[combinedReadConflictRanges.size()]; + + for (int i = 0; i < combinedReadConflictRanges.size(); ++i) { + auto& read = reads[i]; + read.readVersion = combinedReadConflictRanges[i].version; + read.begin.p = combinedReadConflictRanges[i].begin.begin(); + read.begin.len = combinedReadConflictRanges[i].begin.size(); + read.end.p = combinedReadConflictRanges[i].end.begin(); + read.end.len = combinedReadConflictRanges[i].end.size(); + } + auto* results = new (arena) ConflictSet::Result[combinedReadConflictRanges.size()]; + cs->versionHistory.check(reads, results, combinedReadConflictRanges.size()); + + for (int i = 0; i < combinedReadConflictRanges.size(); ++i) { + if (results[i] == ConflictSet::Conflict) { + transactionConflictStatus[combinedReadConflictRanges[i].transaction] = true; + if (combinedReadConflictRanges[i].conflictingKeyRange != nullptr) { + combinedReadConflictRanges[i].conflictingKeyRange->push_back(*combinedReadConflictRanges[i].cKRArena, + combinedReadConflictRanges[i].indexInTx); + } + } + } +#else cs->versionHistory.detectConflicts( &combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus); +#endif } void ConflictBatch::addConflictRanges(Version now, @@ -1015,7 +1056,22 @@ void ConflictBatch::mergeWriteConflictRanges(Version now) { if (combinedWriteConflictRanges.empty()) return; +#if USE_RADIX_TREE + Arena arena; + auto* writes = new (arena) ConflictSet::WriteRange[combinedWriteConflictRanges.size()]; + + for (int i = 0; i < combinedWriteConflictRanges.size(); ++i) { + auto& write = writes[i]; + write.begin.p = combinedWriteConflictRanges[i].first.begin(); + write.begin.len = combinedWriteConflictRanges[i].first.size(); + write.end.p = combinedWriteConflictRanges[i].second.begin(); + write.end.len = combinedWriteConflictRanges[i].second.size(); + } + + cs->versionHistory.addWrites(writes, combinedWriteConflictRanges.size(), now); +#else addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory); +#endif } void ConflictBatch::combineWriteConflictRanges() { @@ -1115,7 +1171,7 @@ void skipListTest() { double start; - ConflictSet* cs = newConflictSet(); + ConflictSet2* cs = newConflictSet(); Arena testDataArena; VectorRef> testData; @@ -1197,6 +1253,4 @@ void skipListTest() { for (const auto& counter : skc) { printf("%20s: %s\n", counter->getMetric().name().c_str(), counter->getMetric().formatted().c_str()); } - - printf("%d entries in version history\n", cs->versionHistory.count()); } diff --git a/fdbserver/include/fdbserver/ConflictSet.h b/fdbserver/include/fdbserver/ConflictSet.h index 90ed2c406..b7e31217c 100644 --- a/fdbserver/include/fdbserver/ConflictSet.h +++ b/fdbserver/include/fdbserver/ConflictSet.h @@ -28,13 +28,13 @@ #include "fdbclient/CommitTransaction.h" #include "fdbserver/ResolverBug.h" -struct ConflictSet; -ConflictSet* newConflictSet(); -void clearConflictSet(ConflictSet*, Version); -void destroyConflictSet(ConflictSet*); +struct ConflictSet2; +ConflictSet2* newConflictSet(); +void clearConflictSet(ConflictSet2*, Version); +void destroyConflictSet(ConflictSet2*); struct ConflictBatch { - explicit ConflictBatch(ConflictSet*, + explicit ConflictBatch(ConflictSet2*, std::map>* conflictingKeyRangeMap = nullptr, Arena* resolveBatchReplyArena = nullptr); ~ConflictBatch(); @@ -54,7 +54,7 @@ struct ConflictBatch { void GetTooOldTransactions(std::vector& tooOldTransactions); private: - ConflictSet* cs; + ConflictSet2* cs; Standalone> transactionInfo; std::vector points; int transactionCount;