Process acceptable subranges interleaved

This commit is contained in:
2024-10-29 14:41:42 -07:00
parent c06afeb81e
commit 7aac73ee80
3 changed files with 82 additions and 28 deletions

View File

@@ -91,6 +91,15 @@ static_assert(kNominalVersionWindow <= kMaxCorrectVersionWindow);
#define USE_64_BIT 0 #define USE_64_BIT 0
#endif #endif
auto operator<(const ConflictSet::WriteRange &lhs,
const ConflictSet::WriteRange &rhs) {
if (lhs.end.len == 0) {
return lhs.begin < rhs.begin;
} else {
return lhs.end < rhs.begin;
}
}
struct InternalVersionT { struct InternalVersionT {
constexpr InternalVersionT() = default; constexpr InternalVersionT() = default;
constexpr explicit InternalVersionT(int64_t value) : value(value) {} constexpr explicit InternalVersionT(int64_t value) : value(value) {}
@@ -4691,8 +4700,8 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
check_bytes_total.add(check_byte_accum); check_bytes_total.add(check_byte_accum);
} }
void interleavedPointWrites(const WriteRange *writes, int count, void interleavedWrites(const WriteRange *writes, int count,
InternalVersionT writeVersion) { InternalVersionT writeVersion) {
// Phase 1: Search for insertion points concurrently, without modifying the // Phase 1: Search for insertion points concurrently, without modifying the
// structure of the tree. // structure of the tree.
@@ -4754,6 +4763,44 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
} }
} }
void insertPointWritesOrSorted(const WriteRange *writes, int count,
InternalVersionT writeVersion) {
#ifndef NDEBUG
bool allPointWrites = true;
for (int i = 0; i < count; ++i) {
allPointWrites = allPointWrites && writes[i].end.len == 0;
}
bool sorted = true;
for (int i = 1; i < count; ++i) {
sorted = sorted && writes[i - 1] < writes[i];
}
assert(allPointWrites || sorted);
#endif
#if __has_attribute(preserve_none) && __has_attribute(musttail)
// TODO make this work for sorted range writes
constexpr bool kEnableInterleaved = false;
#else
constexpr bool kEnableInterleaved = false;
#endif
if (kEnableInterleaved && count > 1) {
interleavedWrites(writes, count, InternalVersionT(writeVersion));
} else {
for (int i = 0; i < count; ++i) {
const auto &w = writes[i];
auto begin = std::span<const uint8_t>(w.begin.p, w.begin.len);
auto end = std::span<const uint8_t>(w.end.p, w.end.len);
if (w.end.len > 0) {
addWriteRange(root, begin, end, InternalVersionT(writeVersion),
&writeContext, this);
} else {
addPointWrite(root, begin, InternalVersionT(writeVersion),
&writeContext);
}
}
}
}
void addWrites(const WriteRange *writes, int count, int64_t writeVersion) { void addWrites(const WriteRange *writes, int count, int64_t writeVersion) {
#if !USE_64_BIT #if !USE_64_BIT
// There could be other conflict sets in the same thread. We need // There could be other conflict sets in the same thread. We need
@@ -4796,36 +4843,31 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
} }
for (int i = 0; i < count; ++i) { for (int i = 0; i < count; ++i) {
const auto &w = writes[i]; writeContext.accum.write_bytes += writes[i].begin.len + writes[i].end.len;
writeContext.accum.write_bytes += w.begin.len + w.end.len;
} }
#if __has_attribute(preserve_none) && __has_attribute(musttail) if (count > 0) {
bool allPointWrites = true; int firstNotInserted = 0;
for (int i = 0; i < count; ++i) { bool batchHasOnlyPointWrites = writes[0].end.len == 0;
if (writes[i].end.len > 0) { bool batchIsSorted = true;
allPointWrites = false; for (int i = 1; i < count; ++i) {
break; batchIsSorted = batchIsSorted && writes[i - 1] < writes[i];
} batchHasOnlyPointWrites =
} batchHasOnlyPointWrites && writes[i].end.len == 0;
#else
bool allPointWrites = false; if (!(batchIsSorted || batchHasOnlyPointWrites)) {
#endif insertPointWritesOrSorted(writes + firstNotInserted,
if (allPointWrites && count > 1) { i - firstNotInserted,
interleavedPointWrites(writes, count, InternalVersionT(writeVersion)); InternalVersionT(writeVersion));
} else { firstNotInserted = i;
for (int i = 0; i < count; ++i) { batchHasOnlyPointWrites = writes[i].end.len == 0;
const auto &w = writes[i]; batchIsSorted = true;
auto begin = std::span<const uint8_t>(w.begin.p, w.begin.len);
auto end = std::span<const uint8_t>(w.end.p, w.end.len);
if (w.end.len > 0) {
addWriteRange(root, begin, end, InternalVersionT(writeVersion),
&writeContext, this);
} else {
addPointWrite(root, begin, InternalVersionT(writeVersion),
&writeContext);
} }
} }
assert(batchIsSorted || batchHasOnlyPointWrites);
insertPointWritesOrSorted(writes + firstNotInserted,
count - firstNotInserted,
InternalVersionT(writeVersion));
} }
writeContext.releaseDeferred(); writeContext.releaseDeferred();

View File

@@ -49,6 +49,17 @@ operator<=>(const std::span<const uint8_t> &lhs,
return lhs.size() <=> size_t(rhs.len); return lhs.size() <=> size_t(rhs.len);
} }
[[nodiscard]] inline auto operator<=>(const ConflictSet::Key &lhs,
const ConflictSet::Key &rhs) noexcept {
int cl = std::min<int>(lhs.len, rhs.len);
if (cl > 0) {
if (auto c = memcmp(lhs.p, rhs.p, cl) <=> 0; c != 0) {
return c;
}
}
return lhs.len <=> rhs.len;
}
// This header contains code that we want to reuse outside of ConflictSet.cpp or // This header contains code that we want to reuse outside of ConflictSet.cpp or
// want to exclude from coverage since it's only testing related. // want to exclude from coverage since it's only testing related.

View File

@@ -5,6 +5,7 @@ __stack_chk_fail@GLIBC_2.4
__tls_get_addr@GLIBC_2.3 __tls_get_addr@GLIBC_2.3
abort@GLIBC_2.2.5 abort@GLIBC_2.2.5
free@GLIBC_2.2.5 free@GLIBC_2.2.5
memcmp@GLIBC_2.2.5
malloc@GLIBC_2.2.5 malloc@GLIBC_2.2.5
memcpy@GLIBC_2.14 memcpy@GLIBC_2.14
memmove@GLIBC_2.2.5 memmove@GLIBC_2.2.5