All checks were successful
Tests / Release [gcc] total: 583, passed: 583
GNU C Compiler (gcc) |Total|New|Outstanding|Fixed|Trend
|:-:|:-:|:-:|:-:|:-:
|0|0|0|0|:clap:
Reference build: <a href="https://jenkins.weaselab.dev/job/weaselab/job/conflict-set/job/main/21//gcc">weaselab » conflict-set » main #21</a>
Tests / Coverage total: 581, passed: 581
weaselab/conflict-set/pipeline/head This commit looks good
216 lines
7.3 KiB
Plaintext
216 lines
7.3 KiB
Plaintext
diff --git a/fdbserver/CMakeLists.txt b/fdbserver/CMakeLists.txt
|
|
index 3f353c2ef..cd0834761 100644
|
|
--- a/fdbserver/CMakeLists.txt
|
|
+++ b/fdbserver/CMakeLists.txt
|
|
@@ -22,6 +22,9 @@ file(MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/workloads)
|
|
|
|
add_flow_target(EXECUTABLE NAME fdbserver SRCS ${FDBSERVER_SRCS})
|
|
|
|
+find_package(ConflictSet)
|
|
+target_link_libraries(fdbserver PRIVATE conflict_set_static)
|
|
+
|
|
if (WITH_SWIFT)
|
|
# Setup the Swift sources in FDBServer.
|
|
include(FindSwiftLibs)
|
|
diff --git a/fdbserver/Resolver.actor.cpp b/fdbserver/Resolver.actor.cpp
|
|
index bf4118f5f..d3b4eaad8 100644
|
|
--- a/fdbserver/Resolver.actor.cpp
|
|
+++ b/fdbserver/Resolver.actor.cpp
|
|
@@ -132,7 +132,7 @@ struct Resolver : ReferenceCounted<Resolver> {
|
|
AsyncVar<int64_t> totalStateBytes;
|
|
AsyncTrigger checkNeededVersion;
|
|
std::map<NetworkAddress, ProxyRequestsInfo> proxyInfoMap;
|
|
- ConflictSet* conflictSet;
|
|
+ ConflictSet2* conflictSet;
|
|
TransientStorageMetricSample iopsSample;
|
|
|
|
// Use LogSystem as backend for txnStateStore. However, the real commit
|
|
diff --git a/fdbserver/SkipList.cpp b/fdbserver/SkipList.cpp
|
|
index b48d32c6b..d75734e61 100644
|
|
--- a/fdbserver/SkipList.cpp
|
|
+++ b/fdbserver/SkipList.cpp
|
|
@@ -25,6 +25,7 @@
|
|
#include <numeric>
|
|
#include <string>
|
|
#include <vector>
|
|
+#include <conflict_set/ConflictSet.h>
|
|
|
|
#include "flow/Platform.h"
|
|
#include "fdbrpc/fdbrpc.h"
|
|
@@ -34,6 +35,8 @@
|
|
#include "fdbclient/SystemData.h"
|
|
#include "fdbserver/ConflictSet.h"
|
|
|
|
+#define USE_RADIX_TREE 1
|
|
+
|
|
static std::vector<PerfDoubleCounter*> skc;
|
|
|
|
static thread_local uint32_t g_seed = 0;
|
|
@@ -782,26 +785,34 @@ private:
|
|
}
|
|
};
|
|
|
|
-struct ConflictSet {
|
|
- ConflictSet() : removalKey(makeString(0)), oldestVersion(0) {}
|
|
- ~ConflictSet() {}
|
|
+struct ConflictSet2 {
|
|
+ ConflictSet2() : versionHistory(0), removalKey(makeString(0)), oldestVersion(0) {}
|
|
+ ~ConflictSet2() {}
|
|
|
|
+#if USE_RADIX_TREE
|
|
+ ConflictSet versionHistory;
|
|
+#else
|
|
SkipList versionHistory;
|
|
+#endif
|
|
Key removalKey;
|
|
Version oldestVersion;
|
|
};
|
|
|
|
-ConflictSet* newConflictSet() {
|
|
- return new ConflictSet;
|
|
+ConflictSet2* newConflictSet() {
|
|
+ return new ConflictSet2;
|
|
}
|
|
-void clearConflictSet(ConflictSet* cs, Version v) {
|
|
- SkipList(v).swap(cs->versionHistory);
|
|
+void clearConflictSet(ConflictSet2* cs, Version v) {
|
|
+#if USE_RADIX_TREE
|
|
+ cs->versionHistory = ConflictSet{ 0 };
|
|
+#else
|
|
+ SkipList().swap(cs->versionHistory);
|
|
+#endif
|
|
}
|
|
-void destroyConflictSet(ConflictSet* cs) {
|
|
+void destroyConflictSet(ConflictSet2* cs) {
|
|
delete cs;
|
|
}
|
|
|
|
-ConflictBatch::ConflictBatch(ConflictSet* cs,
|
|
+ConflictBatch::ConflictBatch(ConflictSet2* cs,
|
|
std::map<int, VectorRef<int>>* conflictingKeyRangeMap,
|
|
Arena* resolveBatchReplyArena)
|
|
: cs(cs), transactionCount(0), conflictingKeyRangeMap(conflictingKeyRangeMap),
|
|
@@ -971,11 +982,15 @@ void ConflictBatch::detectConflicts(Version now,
|
|
t = timer();
|
|
if (newOldestVersion > cs->oldestVersion) {
|
|
cs->oldestVersion = newOldestVersion;
|
|
+#if USE_RADIX_TREE
|
|
+ cs->versionHistory.setOldestVersion(newOldestVersion);
|
|
+#else
|
|
SkipList::Finger finger;
|
|
int temp;
|
|
cs->versionHistory.find(&cs->removalKey, &finger, &temp, 1);
|
|
cs->versionHistory.removeBefore(cs->oldestVersion, finger, combinedWriteConflictRanges.size() * 3 + 10);
|
|
cs->removalKey = finger.getValue();
|
|
+#endif
|
|
}
|
|
g_removeBefore += timer() - t;
|
|
}
|
|
@@ -984,8 +999,34 @@ void ConflictBatch::checkReadConflictRanges() {
|
|
if (combinedReadConflictRanges.empty())
|
|
return;
|
|
|
|
+#if USE_RADIX_TREE
|
|
+ Arena arena;
|
|
+ auto* reads = new (arena) ConflictSet::ReadRange[combinedReadConflictRanges.size()];
|
|
+
|
|
+ for (int i = 0; i < combinedReadConflictRanges.size(); ++i) {
|
|
+ auto& read = reads[i];
|
|
+ read.readVersion = combinedReadConflictRanges[i].version;
|
|
+ read.begin.p = combinedReadConflictRanges[i].begin.begin();
|
|
+ read.begin.len = combinedReadConflictRanges[i].begin.size();
|
|
+ read.end.p = combinedReadConflictRanges[i].end.begin();
|
|
+ read.end.len = combinedReadConflictRanges[i].end.size();
|
|
+ }
|
|
+ auto* results = new (arena) ConflictSet::Result[combinedReadConflictRanges.size()];
|
|
+ cs->versionHistory.check(reads, results, combinedReadConflictRanges.size());
|
|
+
|
|
+ for (int i = 0; i < combinedReadConflictRanges.size(); ++i) {
|
|
+ if (results[i] == ConflictSet::Conflict) {
|
|
+ transactionConflictStatus[combinedReadConflictRanges[i].transaction] = true;
|
|
+ if (combinedReadConflictRanges[i].conflictingKeyRange != nullptr) {
|
|
+ combinedReadConflictRanges[i].conflictingKeyRange->push_back(*combinedReadConflictRanges[i].cKRArena,
|
|
+ combinedReadConflictRanges[i].indexInTx);
|
|
+ }
|
|
+ }
|
|
+ }
|
|
+#else
|
|
cs->versionHistory.detectConflicts(
|
|
&combinedReadConflictRanges[0], combinedReadConflictRanges.size(), transactionConflictStatus);
|
|
+#endif
|
|
}
|
|
|
|
void ConflictBatch::addConflictRanges(Version now,
|
|
@@ -1015,7 +1056,23 @@ void ConflictBatch::mergeWriteConflictRanges(Version now) {
|
|
if (combinedWriteConflictRanges.empty())
|
|
return;
|
|
|
|
+#if USE_RADIX_TREE
|
|
+ Arena arena;
|
|
+ auto* writes = new (arena) ConflictSet::WriteRange[combinedWriteConflictRanges.size()];
|
|
+
|
|
+ for (int i = 0; i < combinedWriteConflictRanges.size(); ++i) {
|
|
+ auto& write = writes[i];
|
|
+ write.writeVersion = now;
|
|
+ write.begin.p = combinedWriteConflictRanges[i].first.begin();
|
|
+ write.begin.len = combinedWriteConflictRanges[i].first.size();
|
|
+ write.end.p = combinedWriteConflictRanges[i].second.begin();
|
|
+ write.end.len = combinedWriteConflictRanges[i].second.size();
|
|
+ }
|
|
+
|
|
+ cs->versionHistory.addWrites(writes, combinedWriteConflictRanges.size());
|
|
+#else
|
|
addConflictRanges(now, combinedWriteConflictRanges.begin(), combinedWriteConflictRanges.end(), &cs->versionHistory);
|
|
+#endif
|
|
}
|
|
|
|
void ConflictBatch::combineWriteConflictRanges() {
|
|
@@ -1115,7 +1172,7 @@ void skipListTest() {
|
|
|
|
double start;
|
|
|
|
- ConflictSet* cs = newConflictSet();
|
|
+ ConflictSet2* cs = newConflictSet();
|
|
|
|
Arena testDataArena;
|
|
VectorRef<VectorRef<KeyRangeRef>> testData;
|
|
@@ -1197,6 +1254,4 @@ void skipListTest() {
|
|
for (const auto& counter : skc) {
|
|
printf("%20s: %s\n", counter->getMetric().name().c_str(), counter->getMetric().formatted().c_str());
|
|
}
|
|
-
|
|
- printf("%d entries in version history\n", cs->versionHistory.count());
|
|
}
|
|
diff --git a/fdbserver/include/fdbserver/ConflictSet.h b/fdbserver/include/fdbserver/ConflictSet.h
|
|
index 90ed2c406..b7e31217c 100644
|
|
--- a/fdbserver/include/fdbserver/ConflictSet.h
|
|
+++ b/fdbserver/include/fdbserver/ConflictSet.h
|
|
@@ -28,13 +28,13 @@
|
|
#include "fdbclient/CommitTransaction.h"
|
|
#include "fdbserver/ResolverBug.h"
|
|
|
|
-struct ConflictSet;
|
|
-ConflictSet* newConflictSet();
|
|
-void clearConflictSet(ConflictSet*, Version);
|
|
-void destroyConflictSet(ConflictSet*);
|
|
+struct ConflictSet2;
|
|
+ConflictSet2* newConflictSet();
|
|
+void clearConflictSet(ConflictSet2*, Version);
|
|
+void destroyConflictSet(ConflictSet2*);
|
|
|
|
struct ConflictBatch {
|
|
- explicit ConflictBatch(ConflictSet*,
|
|
+ explicit ConflictBatch(ConflictSet2*,
|
|
std::map<int, VectorRef<int>>* conflictingKeyRangeMap = nullptr,
|
|
Arena* resolveBatchReplyArena = nullptr);
|
|
~ConflictBatch();
|
|
@@ -54,7 +54,7 @@ struct ConflictBatch {
|
|
void GetTooOldTransactions(std::vector<int>& tooOldTransactions);
|
|
|
|
private:
|
|
- ConflictSet* cs;
|
|
+ ConflictSet2* cs;
|
|
Standalone<VectorRef<struct TransactionInfo*>> transactionInfo;
|
|
std::vector<struct KeyInfo> points;
|
|
int transactionCount;
|