Compare commits
18 Commits
cf-integri
...
f403c78410
| Author | SHA1 | Date | |
|---|---|---|---|
| f403c78410 | |||
| 08958d4109 | |||
| dcc5275ec9 | |||
| c5ef843f9e | |||
| b78e817e24 | |||
| 9c82f17e20 | |||
| 665a9313a4 | |||
| 6e66202d5e | |||
| a92271a205 | |||
| 0dbfb4deae | |||
| 6e229b6b36 | |||
| 2200de11c8 | |||
| b37feb58dd | |||
| 94a4802824 | |||
| 707dbdb391 | |||
| bdd343bb57 | |||
| 7b31bd5efe | |||
| e255e1a926 |
@@ -31,11 +31,24 @@ if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
|
||||
"MinSizeRel" "RelWithDebInfo")
|
||||
endif()
|
||||
|
||||
add_compile_options(-fdata-sections -ffunction-sections -Wswitch-enum
|
||||
-Werror=switch-enum -fPIC)
|
||||
add_compile_options(
|
||||
-Werror=switch-enum
|
||||
-Wswitch-enum
|
||||
-Wunused-variable
|
||||
-fPIC
|
||||
-fdata-sections
|
||||
-ffunction-sections
|
||||
-fno-jump-tables # https://github.com/llvm/llvm-project/issues/54247
|
||||
)
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
|
||||
add_link_options("-Wno-unused-command-line-argument")
|
||||
find_program(LLVM_OBJCOPY llvm-objcopy)
|
||||
if(LLVM_OBJCOPY)
|
||||
set(CMAKE_OBJCOPY
|
||||
${LLVM_OBJCOPY}
|
||||
CACHE FILEPATH "path to objcopy binary" FORCE)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
|
||||
@@ -56,6 +69,21 @@ if(HAS_FULL_RELRO)
|
||||
endif()
|
||||
cmake_pop_check_state()
|
||||
|
||||
if(CMAKE_SYSTEM_PROCESSOR STREQUAL aarch64)
|
||||
add_compile_options(-mbranch-protection=standard)
|
||||
else()
|
||||
add_compile_options(-fcf-protection)
|
||||
set(rewrite_endbr_flags "-fuse-ld=mold;LINKER:-z,rewrite-endbr")
|
||||
cmake_push_check_state()
|
||||
list(APPEND CMAKE_REQUIRED_LINK_OPTIONS ${rewrite_endbr_flags})
|
||||
check_cxx_source_compiles("int main(){}" HAS_REWRITE_ENDBR FAIL_REGEX
|
||||
"warning:")
|
||||
if(HAS_REWRITE_ENDBR)
|
||||
add_link_options(${rewrite_endbr_flags})
|
||||
endif()
|
||||
cmake_pop_check_state()
|
||||
endif()
|
||||
|
||||
set(version_script_flags
|
||||
LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/linker.map)
|
||||
cmake_push_check_state()
|
||||
@@ -81,19 +109,11 @@ else()
|
||||
add_link_options(-Wl,--gc-sections)
|
||||
endif()
|
||||
|
||||
if(NOT USE_SIMD_FALLBACK)
|
||||
cmake_push_check_state()
|
||||
list(APPEND CMAKE_REQUIRED_FLAGS -mavx)
|
||||
check_include_file_cxx("immintrin.h" HAS_AVX)
|
||||
if(HAS_AVX)
|
||||
if(USE_SIMD_FALLBACK)
|
||||
add_compile_definitions(USE_SIMD_FALLBACK)
|
||||
else()
|
||||
if(CMAKE_SYSTEM_PROCESSOR STREQUAL x86_64)
|
||||
add_compile_options(-mavx)
|
||||
add_compile_definitions(HAS_AVX)
|
||||
endif()
|
||||
cmake_pop_check_state()
|
||||
|
||||
check_include_file_cxx("arm_neon.h" HAS_ARM_NEON)
|
||||
if(HAS_ARM_NEON)
|
||||
add_compile_definitions(HAS_ARM_NEON)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
@@ -356,6 +376,15 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
|
||||
${symbol_imports})
|
||||
endif()
|
||||
|
||||
if(NOT CMAKE_CROSSCOMPILING)
|
||||
find_program(HARDENING_CHECK hardening-check)
|
||||
if(HARDENING_CHECK)
|
||||
add_test(NAME hardening_check
|
||||
COMMAND ${HARDENING_CHECK} $<TARGET_FILE:${PROJECT_NAME}>
|
||||
--nofortify --nostackprotector)
|
||||
endif()
|
||||
endif()
|
||||
|
||||
# bench
|
||||
add_executable(conflict_set_bench Bench.cpp)
|
||||
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME} nanobench)
|
||||
|
||||
182
ConflictSet.cpp
182
ConflictSet.cpp
@@ -14,6 +14,16 @@ See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
#if !defined(USE_SIMD_FALLBACK) && defined(__has_include)
|
||||
#if defined(__x86_64__) && __has_include("immintrin.h")
|
||||
#define HAS_AVX 1
|
||||
#include <immintrin.h>
|
||||
#elif __has_include("arm_neon.h")
|
||||
#define HAS_ARM_NEON 1
|
||||
#include <arm_neon.h>
|
||||
#endif
|
||||
#endif
|
||||
|
||||
#include "ConflictSet.h"
|
||||
#include "Internal.h"
|
||||
#include "LongestCommonPrefix.h"
|
||||
@@ -34,12 +44,6 @@ limitations under the License.
|
||||
#include <type_traits>
|
||||
#include <utility>
|
||||
|
||||
#ifdef HAS_AVX
|
||||
#include <immintrin.h>
|
||||
#elif defined(HAS_ARM_NEON)
|
||||
#include <arm_neon.h>
|
||||
#endif
|
||||
|
||||
#ifndef __SANITIZE_THREAD__
|
||||
#if defined(__has_feature)
|
||||
#if __has_feature(thread_sanitizer)
|
||||
@@ -696,8 +700,6 @@ constexpr int64_t kMaxFreeListBytes = 1 << 20;
|
||||
// doesn't meet the capacity constraints, it's freed and a new node is allocated
|
||||
// with the minimum capacity. The hope is that "unfit" nodes don't get stuck in
|
||||
// the free list.
|
||||
//
|
||||
// TODO valgrind annotations
|
||||
template <class T> struct NodeAllocator {
|
||||
|
||||
static_assert(std::derived_from<T, Node>);
|
||||
@@ -734,6 +736,7 @@ template <class T> struct NodeAllocator {
|
||||
p->parent = freeList;
|
||||
freeList = p;
|
||||
freeListSize += sizeof(T) + p->partialKeyCapacity;
|
||||
VALGRIND_MAKE_MEM_NOACCESS(p, sizeof(T) + p->partialKeyCapacity);
|
||||
}
|
||||
|
||||
void deferRelease(T *p, Node *forwardTo) {
|
||||
@@ -755,6 +758,13 @@ template <class T> struct NodeAllocator {
|
||||
void releaseDeferred() {
|
||||
if (deferredList != nullptr) {
|
||||
deferredListFront->parent = freeList;
|
||||
#ifndef NVALGRIND
|
||||
for (auto *iter = deferredList; iter != freeList;) {
|
||||
auto *tmp = iter;
|
||||
iter = (T *)iter->parent;
|
||||
VALGRIND_MAKE_MEM_NOACCESS(tmp, sizeof(T) + tmp->partialKeyCapacity);
|
||||
}
|
||||
#endif
|
||||
freeList = std::exchange(deferredList, nullptr);
|
||||
}
|
||||
for (T *n = std::exchange(deferredListOverflow, nullptr); n != nullptr;) {
|
||||
@@ -775,6 +785,7 @@ template <class T> struct NodeAllocator {
|
||||
assert(deferredList == nullptr);
|
||||
assert(deferredListOverflow == nullptr);
|
||||
for (T *iter = freeList; iter != nullptr;) {
|
||||
VALGRIND_MAKE_MEM_DEFINED(iter, sizeof(T));
|
||||
auto *tmp = iter;
|
||||
iter = (T *)iter->parent;
|
||||
removeNode(tmp);
|
||||
@@ -792,6 +803,7 @@ private:
|
||||
|
||||
T *allocate_helper(int minCapacity, int maxCapacity) {
|
||||
if (freeList != nullptr) {
|
||||
VALGRIND_MAKE_MEM_DEFINED(freeList, sizeof(T));
|
||||
freeListSize -= sizeof(T) + freeList->partialKeyCapacity;
|
||||
assume(freeList->partialKeyCapacity >= 0);
|
||||
assume(minCapacity >= 0);
|
||||
@@ -800,6 +812,11 @@ private:
|
||||
freeList->partialKeyCapacity <= maxCapacity) {
|
||||
auto *result = freeList;
|
||||
freeList = (T *)freeList->parent;
|
||||
VALGRIND_MAKE_MEM_UNDEFINED(result,
|
||||
sizeof(T) + result->partialKeyCapacity);
|
||||
VALGRIND_MAKE_MEM_DEFINED(&result->partialKeyCapacity,
|
||||
sizeof(result->partialKeyCapacity));
|
||||
VALGRIND_MAKE_MEM_DEFINED(&result->type, sizeof(result->type));
|
||||
return result;
|
||||
} else {
|
||||
auto *p = freeList;
|
||||
@@ -3183,6 +3200,12 @@ Node *firstGeqPhysical(Node *n, const TrivialSpan key) {
|
||||
#define PRESERVE_NONE
|
||||
#endif
|
||||
|
||||
#if __has_attribute(musttail) && __has_attribute(preserve_none)
|
||||
constexpr bool kEnableInterleaved = true;
|
||||
#else
|
||||
constexpr bool kEnableInterleaved = false;
|
||||
#endif
|
||||
|
||||
namespace check {
|
||||
|
||||
typedef PRESERVE_NONE void (*Continuation)(struct Job *, struct Context *);
|
||||
@@ -4969,51 +4992,50 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
check::Context context;
|
||||
context.readContext.impl = this;
|
||||
|
||||
#if __has_attribute(musttail)
|
||||
if (count == 1) {
|
||||
useSequential(reads, result, count, context);
|
||||
} else {
|
||||
constexpr int kConcurrent = 16;
|
||||
check::Job inProgress[kConcurrent];
|
||||
context.count = count;
|
||||
context.oldestVersionFullPrecision = oldestVersionFullPrecision;
|
||||
context.root = root;
|
||||
context.queries = reads;
|
||||
context.results = result;
|
||||
int64_t started = std::min(kConcurrent, count);
|
||||
context.started = started;
|
||||
for (int i = 0; i < started; i++) {
|
||||
inProgress[i].init(reads + i, result + i, root,
|
||||
oldestVersionFullPrecision);
|
||||
}
|
||||
for (int i = 0; i < started - 1; i++) {
|
||||
inProgress[i].next = inProgress + i + 1;
|
||||
}
|
||||
for (int i = 1; i < started; i++) {
|
||||
inProgress[i].prev = inProgress + i - 1;
|
||||
}
|
||||
inProgress[0].prev = inProgress + started - 1;
|
||||
inProgress[started - 1].next = inProgress;
|
||||
if constexpr (kEnableInterleaved) {
|
||||
if (count == 1) {
|
||||
useSequential(reads, result, count, context);
|
||||
} else {
|
||||
constexpr int kConcurrent = 16;
|
||||
check::Job inProgress[kConcurrent];
|
||||
context.count = count;
|
||||
context.oldestVersionFullPrecision = oldestVersionFullPrecision;
|
||||
context.root = root;
|
||||
context.queries = reads;
|
||||
context.results = result;
|
||||
int64_t started = std::min(kConcurrent, count);
|
||||
context.started = started;
|
||||
for (int i = 0; i < started; i++) {
|
||||
inProgress[i].init(reads + i, result + i, root,
|
||||
oldestVersionFullPrecision);
|
||||
}
|
||||
for (int i = 0; i < started - 1; i++) {
|
||||
inProgress[i].next = inProgress + i + 1;
|
||||
}
|
||||
for (int i = 1; i < started; i++) {
|
||||
inProgress[i].prev = inProgress + i - 1;
|
||||
}
|
||||
inProgress[0].prev = inProgress + started - 1;
|
||||
inProgress[started - 1].next = inProgress;
|
||||
|
||||
// Kick off the sequence of tail calls that finally returns once all jobs
|
||||
// are done
|
||||
inProgress->continuation(inProgress, &context);
|
||||
// Kick off the sequence of tail calls that finally returns once all
|
||||
// jobs are done
|
||||
inProgress->continuation(inProgress, &context);
|
||||
|
||||
#ifndef NDEBUG
|
||||
Arena arena;
|
||||
auto *results2 = new (arena) Result[count];
|
||||
check::Context context2;
|
||||
context2.readContext.impl = this;
|
||||
useSequential(reads, results2, count, context2);
|
||||
assert(memcmp(result, results2, count) == 0);
|
||||
assert(context.readContext == context2.readContext);
|
||||
Arena arena;
|
||||
auto *results2 = new (arena) Result[count];
|
||||
check::Context context2;
|
||||
context2.readContext.impl = this;
|
||||
useSequential(reads, results2, count, context2);
|
||||
assert(memcmp(result, results2, count) == 0);
|
||||
assert(context.readContext == context2.readContext);
|
||||
#endif
|
||||
}
|
||||
} else {
|
||||
useSequential(reads, result, count, context);
|
||||
}
|
||||
|
||||
#else
|
||||
useSequential(reads, result, count, context);
|
||||
#endif
|
||||
|
||||
for (int i = 0; i < count; ++i) {
|
||||
assert(reads[i].readVersion >= 0);
|
||||
assert(reads[i].readVersion <= newestVersionFullPrecision);
|
||||
@@ -5186,11 +5208,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
assert(allPointWrites || sorted);
|
||||
#endif
|
||||
|
||||
#if __has_attribute(musttail)
|
||||
constexpr bool kEnableInterleaved = true;
|
||||
#else
|
||||
constexpr bool kEnableInterleaved = false;
|
||||
#endif
|
||||
if (kEnableInterleaved && count > 1) {
|
||||
interleavedWrites(writes, count, InternalVersionT(writeVersion));
|
||||
} else {
|
||||
@@ -5330,6 +5347,11 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
gc_iterations_total.add(set_oldest_iterations_accum);
|
||||
if (n == nullptr) {
|
||||
removalKey = {};
|
||||
if (removalBufferSize > kMaxRemovalBufferSize) {
|
||||
safe_free(removalBuffer, removalBufferSize);
|
||||
removalBufferSize = kMinRemovalBufferSize;
|
||||
removalBuffer = (uint8_t *)safe_malloc(removalBufferSize);
|
||||
}
|
||||
oldestExtantVersion = oldestVersionAtGcBegin;
|
||||
oldest_extant_version.set(oldestExtantVersion);
|
||||
oldestVersionAtGcBegin = oldestVersionFullPrecision;
|
||||
@@ -5340,12 +5362,47 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
oldestExtantVersion, oldestVersionAtGcBegin);
|
||||
#endif
|
||||
} else {
|
||||
removalKeyArena = Arena();
|
||||
removalKey = getSearchPath(removalKeyArena, n);
|
||||
// Store the current search path to resume the scan later
|
||||
saveRemovalKey(n);
|
||||
}
|
||||
return fuel;
|
||||
}
|
||||
|
||||
void saveRemovalKey(Node *n) {
|
||||
uint8_t *cursor = removalBuffer + removalBufferSize;
|
||||
int size = 0;
|
||||
auto reserve = [&](int delta) {
|
||||
if (size + delta > removalBufferSize) [[unlikely]] {
|
||||
int newBufSize = std::max(removalBufferSize * 2, size + delta);
|
||||
uint8_t *newBuf = (uint8_t *)safe_malloc(newBufSize);
|
||||
memcpy(newBuf + newBufSize - size, cursor, size);
|
||||
safe_free(removalBuffer, removalBufferSize);
|
||||
removalBuffer = newBuf;
|
||||
removalBufferSize = newBufSize;
|
||||
cursor = newBuf + newBufSize - size;
|
||||
}
|
||||
};
|
||||
for (;;) {
|
||||
auto partialKey = TrivialSpan{n->partialKey(), n->partialKeyLen};
|
||||
reserve(partialKey.size());
|
||||
size += partialKey.size();
|
||||
cursor -= partialKey.size();
|
||||
memcpy(cursor, partialKey.data(), partialKey.size());
|
||||
|
||||
if (n->parent == nullptr) {
|
||||
break;
|
||||
}
|
||||
|
||||
reserve(1);
|
||||
++size;
|
||||
--cursor;
|
||||
*cursor = n->parentsIndex;
|
||||
|
||||
n = n->parent;
|
||||
}
|
||||
removalKey = {cursor, size};
|
||||
}
|
||||
|
||||
void setOldestVersion(int64_t newOldestVersion) {
|
||||
assert(newOldestVersion >= 0);
|
||||
assert(newOldestVersion <= newestVersionFullPrecision);
|
||||
@@ -5396,7 +5453,7 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
writeContext.~WriteContext();
|
||||
new (&writeContext) WriteContext();
|
||||
|
||||
removalKeyArena = Arena{};
|
||||
// Leave removalBuffer as is
|
||||
removalKey = {};
|
||||
keyUpdates = 10;
|
||||
|
||||
@@ -5428,11 +5485,16 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
~Impl() {
|
||||
eraseTree(root, &writeContext);
|
||||
safe_free(metrics, metricsCount * sizeof(metrics[0]));
|
||||
safe_free(removalBuffer, removalBufferSize);
|
||||
}
|
||||
|
||||
WriteContext writeContext;
|
||||
|
||||
Arena removalKeyArena;
|
||||
static constexpr int kMinRemovalBufferSize = 1 << 10;
|
||||
// Eventually downsize if larger than this value
|
||||
static constexpr int kMaxRemovalBufferSize = 1 << 16;
|
||||
uint8_t *removalBuffer = (uint8_t *)safe_malloc(kMinRemovalBufferSize);
|
||||
int removalBufferSize = kMinRemovalBufferSize;
|
||||
TrivialSpan removalKey;
|
||||
int64_t keyUpdates;
|
||||
|
||||
@@ -5858,13 +5920,13 @@ void checkVersionsGeqOldestExtant(Node *n,
|
||||
case Type_Node0: {
|
||||
} break;
|
||||
case Type_Node3: {
|
||||
auto *self = static_cast<Node3 *>(n);
|
||||
[[maybe_unused]] auto *self = static_cast<Node3 *>(n);
|
||||
for (int i = 0; i < 3; ++i) {
|
||||
assert(self->childMaxVersion[i] >= oldestExtantVersion);
|
||||
}
|
||||
} break;
|
||||
case Type_Node16: {
|
||||
auto *self = static_cast<Node16 *>(n);
|
||||
[[maybe_unused]] auto *self = static_cast<Node16 *>(n);
|
||||
for (int i = 0; i < 16; ++i) {
|
||||
assert(self->childMaxVersion[i] >= oldestExtantVersion);
|
||||
}
|
||||
@@ -5874,7 +5936,7 @@ void checkVersionsGeqOldestExtant(Node *n,
|
||||
for (int i = 0; i < 48; ++i) {
|
||||
assert(self->childMaxVersion[i] >= oldestExtantVersion);
|
||||
}
|
||||
for (auto m : self->maxOfMax) {
|
||||
for ([[maybe_unused]] auto m : self->maxOfMax) {
|
||||
assert(m >= oldestExtantVersion);
|
||||
}
|
||||
} break;
|
||||
@@ -5883,7 +5945,7 @@ void checkVersionsGeqOldestExtant(Node *n,
|
||||
for (int i = 0; i < 256; ++i) {
|
||||
assert(self->childMaxVersion[i] >= oldestExtantVersion);
|
||||
}
|
||||
for (auto m : self->maxOfMax) {
|
||||
for ([[maybe_unused]] auto m : self->maxOfMax) {
|
||||
assert(m >= oldestExtantVersion);
|
||||
}
|
||||
} break;
|
||||
|
||||
@@ -13,12 +13,14 @@ RUN TZ=America/Los_Angeles DEBIAN_FRONTEND=noninteractive apt-get install -y \
|
||||
ccache \
|
||||
cmake \
|
||||
curl \
|
||||
devscripts \
|
||||
g++-aarch64-linux-gnu \
|
||||
gcovr \
|
||||
git \
|
||||
gnupg \
|
||||
libc6-dbg \
|
||||
lsb-release \
|
||||
mold \
|
||||
ninja-build \
|
||||
pre-commit \
|
||||
python3-requests \
|
||||
|
||||
4
Jenkinsfile
vendored
4
Jenkinsfile
vendored
@@ -91,7 +91,7 @@ pipeline {
|
||||
minio bucket: 'jenkins', credentialsId: 'jenkins-minio', excludes: '', host: 'minio.weaselab.dev', includes: 'build/*.deb,build/*.rpm,paper/*.pdf', targetFolder: '${JOB_NAME}/${BUILD_NUMBER}/${STAGE_NAME}/'
|
||||
}
|
||||
}
|
||||
stage('Release [gcc]') {
|
||||
stage('gcc') {
|
||||
agent {
|
||||
dockerfile {
|
||||
args '-v /home/jenkins/ccache:/ccache'
|
||||
@@ -99,7 +99,7 @@ pipeline {
|
||||
}
|
||||
}
|
||||
steps {
|
||||
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_CXX_FLAGS=-DNVALGRIND")
|
||||
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++")
|
||||
recordIssues(tools: [gcc()])
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#include <cstdio>
|
||||
#include <cstring>
|
||||
#include <fcntl.h>
|
||||
#include <string_view>
|
||||
#include <span>
|
||||
#include <sys/mman.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
@@ -64,7 +64,7 @@ int main(int argc, const char **argv) {
|
||||
auto *const mapOriginal = begin;
|
||||
const auto sizeOriginal = size;
|
||||
|
||||
using StringView = std::basic_string_view<uint8_t>;
|
||||
using StringView = std::span<const uint8_t>;
|
||||
|
||||
StringView write;
|
||||
std::vector<StringView> reads;
|
||||
@@ -78,9 +78,9 @@ int main(int argc, const char **argv) {
|
||||
end = (uint8_t *)memchr(begin, '\n', size);
|
||||
|
||||
if (line.size() > 0 && line[0] == 'P') {
|
||||
write = line.substr(2, line.size());
|
||||
write = line.subspan(2, line.size());
|
||||
} else if (line.size() > 0 && line[0] == 'L') {
|
||||
reads.push_back(line.substr(2, line.size()));
|
||||
reads.push_back(line.subspan(2, line.size()));
|
||||
} else if (line.empty()) {
|
||||
{
|
||||
readRanges.resize(reads.size());
|
||||
|
||||
186
ServerBench.cpp
186
ServerBench.cpp
@@ -23,6 +23,97 @@
|
||||
#include "Internal.h"
|
||||
#include "third_party/nadeau.h"
|
||||
|
||||
constexpr int kCacheLine = 64; // TODO mac m1 is 128
|
||||
|
||||
template <class T> struct TxQueue {
|
||||
|
||||
explicit TxQueue(int lgSlotCount)
|
||||
: slotCount(1 << lgSlotCount), slotCountMask(slotCount - 1),
|
||||
slots(new T[slotCount]) {
|
||||
// Otherwise we can't tell the difference between full and empty.
|
||||
assert(!(slotCountMask & 0x80000000));
|
||||
}
|
||||
|
||||
/// Call from producer thread, after ensuring consumer is no longer accessing
|
||||
/// it somehow
|
||||
~TxQueue() { delete[] slots; }
|
||||
|
||||
/// Must be called from the producer thread
|
||||
void push(T t) {
|
||||
if (wouldBlock()) {
|
||||
// Wait for pops to change and try again
|
||||
consumer.pops.wait(producer.lastPopRead, std::memory_order_relaxed);
|
||||
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
|
||||
}
|
||||
slots[producer.pushesNonAtomic++ & slotCountMask] = std::move(t);
|
||||
// seq_cst so that the notify can't be ordered before the store
|
||||
producer.pushes.store(producer.pushesNonAtomic, std::memory_order_seq_cst);
|
||||
// We have to notify every time, since we don't know if this is the last
|
||||
// push ever
|
||||
producer.pushes.notify_one();
|
||||
}
|
||||
|
||||
/// Must be called from the producer thread
|
||||
uint32_t outstanding() {
|
||||
return producer.pushesNonAtomic -
|
||||
consumer.pops.load(std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
/// Returns true if a call to push might block. Must be called from the
|
||||
/// producer thread.
|
||||
bool wouldBlock() {
|
||||
// See if we can determine that overflow won't happen entirely from state
|
||||
// local to the producer
|
||||
if (producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1) {
|
||||
// Re-read pops with memory order
|
||||
producer.lastPopRead = consumer.pops.load(std::memory_order_acquire);
|
||||
return producer.pushesNonAtomic - producer.lastPopRead == slotCount - 1;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Valid until the next pop, or until this queue is destroyed.
|
||||
T *pop() {
|
||||
// See if we can determine that there's an entry we can pop entirely from
|
||||
// state local to the consumer
|
||||
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
||||
// Re-read pushes with memory order and try again
|
||||
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
|
||||
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
||||
// Wait for pushes to change and try again
|
||||
producer.pushes.wait(consumer.lastPushRead, std::memory_order_relaxed);
|
||||
consumer.lastPushRead = producer.pushes.load(std::memory_order_acquire);
|
||||
}
|
||||
}
|
||||
auto result = &slots[consumer.popsNonAtomic++ & slotCountMask];
|
||||
// We only have to write pops with memory order if we've run out of items.
|
||||
// We know that we'll eventually run out.
|
||||
if (consumer.lastPushRead - consumer.popsNonAtomic == 0) {
|
||||
// seq_cst so that the notify can't be ordered before the store
|
||||
consumer.pops.store(consumer.popsNonAtomic, std::memory_order_seq_cst);
|
||||
consumer.pops.notify_one();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
private:
|
||||
const uint32_t slotCount;
|
||||
const uint32_t slotCountMask;
|
||||
T *slots;
|
||||
struct alignas(kCacheLine) ProducerState {
|
||||
std::atomic<uint32_t> pushes{0};
|
||||
uint32_t pushesNonAtomic{0};
|
||||
uint32_t lastPopRead{0};
|
||||
};
|
||||
struct alignas(kCacheLine) ConsumerState {
|
||||
std::atomic<uint32_t> pops{0};
|
||||
uint32_t popsNonAtomic{0};
|
||||
uint32_t lastPushRead{0};
|
||||
};
|
||||
ProducerState producer;
|
||||
ConsumerState consumer;
|
||||
};
|
||||
|
||||
std::atomic<int64_t> transactions;
|
||||
|
||||
int64_t safeUnaryMinus(int64_t x) {
|
||||
@@ -64,35 +155,71 @@ template <class... Ts> std::string tupleKey(const Ts &...ts) {
|
||||
return result;
|
||||
}
|
||||
|
||||
constexpr int kWindowSize = 300000;
|
||||
constexpr int kTotalKeyRange = 1'000'000'000;
|
||||
constexpr int kWindowSize = 1'000'000;
|
||||
constexpr int kNumReadKeysPerTx = 10;
|
||||
constexpr int kNumWriteKeysPerTx = 5;
|
||||
|
||||
void workload(weaselab::ConflictSet *cs) {
|
||||
int64_t version = kWindowSize;
|
||||
constexpr int kNumWrites = 16;
|
||||
for (;; transactions.fetch_add(1, std::memory_order_relaxed)) {
|
||||
struct Transaction {
|
||||
std::vector<std::string> keys;
|
||||
std::vector<weaselab::ConflictSet::ReadRange> reads;
|
||||
std::vector<weaselab::ConflictSet::WriteRange> writes;
|
||||
int64_t version;
|
||||
int64_t oldestVersion;
|
||||
Transaction() = default;
|
||||
explicit Transaction(int64_t version)
|
||||
: version(version), oldestVersion(version - kWindowSize) {
|
||||
std::vector<int64_t> keyIndices;
|
||||
for (int i = 0; i < kNumWrites; ++i) {
|
||||
keyIndices.push_back(rand() % 100'000'000);
|
||||
for (int i = 0; i < std::max(kNumReadKeysPerTx, kNumWriteKeysPerTx); ++i) {
|
||||
keyIndices.push_back(rand() % kTotalKeyRange);
|
||||
}
|
||||
std::sort(keyIndices.begin(), keyIndices.end());
|
||||
std::vector<std::string> keys;
|
||||
std::vector<weaselab::ConflictSet::WriteRange> writes;
|
||||
constexpr std::string_view suffix = "this is a suffix";
|
||||
for (int i = 0; i < kNumWrites; ++i) {
|
||||
keys.push_back(tupleKey(0x100, i, keyIndices[i],
|
||||
suffix.substr(0, rand() % suffix.size()),
|
||||
rand()));
|
||||
constexpr std::string_view fullString =
|
||||
"this is a string, where a prefix of it is used as an element of the "
|
||||
"tuple forming the key";
|
||||
for (int i = 0; i < int(keyIndices.size()); ++i) {
|
||||
keys.push_back(
|
||||
tupleKey(0x100, keyIndices[i] / fullString.size(),
|
||||
fullString.substr(0, keyIndices[i] % fullString.size())));
|
||||
// printf("%s\n", printable(keys.back()).c_str());
|
||||
}
|
||||
for (int i = 0; i < kNumWrites; ++i) {
|
||||
for (int i = 0; i < kNumWriteKeysPerTx; ++i) {
|
||||
writes.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())},
|
||||
{nullptr, 0}});
|
||||
}
|
||||
cs->addWrites(writes.data(), writes.size(), version);
|
||||
cs->setOldestVersion(version - kWindowSize);
|
||||
++version;
|
||||
reads.push_back({{(const uint8_t *)keys[0].data(), int(keys[0].size())},
|
||||
{(const uint8_t *)keys[1].data(), int(keys[1].size())},
|
||||
version - kWindowSize});
|
||||
static_assert(kNumReadKeysPerTx >= 3);
|
||||
for (int i = 2; i < kNumReadKeysPerTx; ++i) {
|
||||
reads.push_back({{(const uint8_t *)keys[i].data(), int(keys[i].size())},
|
||||
{nullptr, 0},
|
||||
version - kWindowSize});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Transaction(Transaction &&) = default;
|
||||
Transaction &operator=(Transaction &&) = default;
|
||||
Transaction(Transaction const &) = delete;
|
||||
Transaction const &operator=(Transaction const &) = delete;
|
||||
};
|
||||
|
||||
struct Resolver {
|
||||
|
||||
void resolve(const weaselab::ConflictSet::ReadRange *reads, int readCount,
|
||||
const weaselab::ConflictSet::WriteRange *writes, int writeCount,
|
||||
int64_t newVersion, int64_t newOldestVersion) {
|
||||
results.resize(readCount);
|
||||
cs.check(reads, results.data(), readCount);
|
||||
cs.addWrites(writes, writeCount, newVersion);
|
||||
cs.setOldestVersion(newOldestVersion);
|
||||
}
|
||||
|
||||
ConflictSet cs{0};
|
||||
|
||||
private:
|
||||
std::vector<weaselab::ConflictSet::Result> results;
|
||||
};
|
||||
|
||||
// Adapted from getaddrinfo man page
|
||||
int getListenFd(const char *node, const char *service) {
|
||||
@@ -235,7 +362,8 @@ int main(int argc, char **argv) {
|
||||
{
|
||||
int listenFd = getListenFd(argv[1], argv[2]);
|
||||
|
||||
weaselab::ConflictSet cs{0};
|
||||
Resolver resolver;
|
||||
auto &cs = resolver.cs;
|
||||
weaselab::ConflictSet::MetricsV1 *metrics;
|
||||
int metricsCount;
|
||||
cs.getMetricsV1(&metrics, &metricsCount);
|
||||
@@ -284,7 +412,23 @@ int main(int argc, char **argv) {
|
||||
}
|
||||
#endif
|
||||
|
||||
auto w = std::thread{workload, &cs};
|
||||
TxQueue<std::unique_ptr<Transaction>> queue{10};
|
||||
|
||||
auto workloadThread = std::thread{[&]() {
|
||||
for (int64_t version = kWindowSize;;
|
||||
++version, transactions.fetch_add(1, std::memory_order_relaxed)) {
|
||||
auto tx = std::make_unique<Transaction>(version);
|
||||
queue.push(std::move(tx));
|
||||
}
|
||||
}};
|
||||
|
||||
auto resolverThread = std::thread{[&]() {
|
||||
for (;;) {
|
||||
auto tx = queue.pop()->get();
|
||||
resolver.resolve(tx->reads.data(), tx->reads.size(), tx->writes.data(),
|
||||
tx->writes.size(), tx->version, tx->oldestVersion);
|
||||
}
|
||||
}};
|
||||
|
||||
for (;;) {
|
||||
struct sockaddr_storage peer_addr = {};
|
||||
|
||||
@@ -767,7 +767,9 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
false, true);
|
||||
}
|
||||
|
||||
sortPoints(points);
|
||||
if (!std::is_sorted(points.begin(), points.end())) {
|
||||
sortPoints(points);
|
||||
}
|
||||
|
||||
int activeWriteCount = 0;
|
||||
std::vector<std::pair<StringRef, StringRef>> combinedWriteConflictRanges;
|
||||
@@ -794,7 +796,6 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
|
||||
int temp[stripeSize];
|
||||
int stripes = (stringCount + stripeSize - 1) / stripeSize;
|
||||
StringRef values[stripeSize];
|
||||
int64_t writeVersions[stripeSize / 2];
|
||||
int ss = stringCount - (stripes - 1) * stripeSize;
|
||||
int64_t entryDelta = 0;
|
||||
for (int s = stripes - 1; s >= 0; s--) {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
___chkstk_darwin
|
||||
___stack_chk_fail
|
||||
___stack_chk_guard
|
||||
__tlv_bootstrap
|
||||
@@ -5,6 +6,7 @@ _abort
|
||||
_bzero
|
||||
_free
|
||||
_malloc
|
||||
_memcmp
|
||||
_memcpy
|
||||
_memmove
|
||||
dyld_stub_binder
|
||||
BIN
corpus/03d3918b737a86ed38fbeae6dff198d6913b90b2
Normal file
BIN
corpus/03d3918b737a86ed38fbeae6dff198d6913b90b2
Normal file
Binary file not shown.
BIN
corpus/0d35b148f45f7e3f722f4ac298558ba0dd545b48
Normal file
BIN
corpus/0d35b148f45f7e3f722f4ac298558ba0dd545b48
Normal file
Binary file not shown.
BIN
corpus/1e7a4ca606559d15183818a6c3d93b2f132b2fff
Normal file
BIN
corpus/1e7a4ca606559d15183818a6c3d93b2f132b2fff
Normal file
Binary file not shown.
BIN
corpus/25046edfa84c50539e352d8a5c14d5a38cbccbc9
Normal file
BIN
corpus/25046edfa84c50539e352d8a5c14d5a38cbccbc9
Normal file
Binary file not shown.
BIN
corpus/3388003130da408556ca4ebbe6f1e3cc3e110b33
Normal file
BIN
corpus/3388003130da408556ca4ebbe6f1e3cc3e110b33
Normal file
Binary file not shown.
BIN
corpus/342740d94427af6509e3332d46d99e1091a5c065
Normal file
BIN
corpus/342740d94427af6509e3332d46d99e1091a5c065
Normal file
Binary file not shown.
BIN
corpus/3571178de127e769d6229057b205f9df36506cbd
Normal file
BIN
corpus/3571178de127e769d6229057b205f9df36506cbd
Normal file
Binary file not shown.
BIN
corpus/42620ad6039a83a92d5f5c5c8f764c59149a0852
Normal file
BIN
corpus/42620ad6039a83a92d5f5c5c8f764c59149a0852
Normal file
Binary file not shown.
BIN
corpus/486d0d68d44e9eb5ecfc9d64d23ae561dfc150ad
Normal file
BIN
corpus/486d0d68d44e9eb5ecfc9d64d23ae561dfc150ad
Normal file
Binary file not shown.
BIN
corpus/4a2e895a63fd0487d9115aac49305cfad276d901
Normal file
BIN
corpus/4a2e895a63fd0487d9115aac49305cfad276d901
Normal file
Binary file not shown.
BIN
corpus/557f000e67b09ad7647e480e5ca51c39c6fb56fd
Normal file
BIN
corpus/557f000e67b09ad7647e480e5ca51c39c6fb56fd
Normal file
Binary file not shown.
BIN
corpus/59eb125178004fa691678fa4aeab2acf9eb20d92
Normal file
BIN
corpus/59eb125178004fa691678fa4aeab2acf9eb20d92
Normal file
Binary file not shown.
BIN
corpus/5c9267b106f81105ccd6ff67115a494ad967d31c
Normal file
BIN
corpus/5c9267b106f81105ccd6ff67115a494ad967d31c
Normal file
Binary file not shown.
BIN
corpus/613b3f22e9c850e88e43d4a08ab3f4aa690db94a
Normal file
BIN
corpus/613b3f22e9c850e88e43d4a08ab3f4aa690db94a
Normal file
Binary file not shown.
BIN
corpus/638968595f06cd59f8654d24f043d0f80c30f2ea
Normal file
BIN
corpus/638968595f06cd59f8654d24f043d0f80c30f2ea
Normal file
Binary file not shown.
BIN
corpus/6ca1fb9210b8478a854ec5406201b582c3c45dac
Normal file
BIN
corpus/6ca1fb9210b8478a854ec5406201b582c3c45dac
Normal file
Binary file not shown.
BIN
corpus/72e3ccd7785f7ee34adbc41ad0fe37a4e3997fa8
Normal file
BIN
corpus/72e3ccd7785f7ee34adbc41ad0fe37a4e3997fa8
Normal file
Binary file not shown.
BIN
corpus/7343aeff21ad270485906be46e6c2398c6af46cd
Normal file
BIN
corpus/7343aeff21ad270485906be46e6c2398c6af46cd
Normal file
Binary file not shown.
BIN
corpus/76f4456c1817bdfd54d6a6731f41b5c7951df797
Normal file
BIN
corpus/76f4456c1817bdfd54d6a6731f41b5c7951df797
Normal file
Binary file not shown.
BIN
corpus/7bf8c8d06451c512507d251d477c3ec61bb869f2
Normal file
BIN
corpus/7bf8c8d06451c512507d251d477c3ec61bb869f2
Normal file
Binary file not shown.
BIN
corpus/80fb897a23d1bbce326a8ae8ed257559d1dee707
Normal file
BIN
corpus/80fb897a23d1bbce326a8ae8ed257559d1dee707
Normal file
Binary file not shown.
BIN
corpus/853694cf1e93f28aca05f2e96798ece03061d8a9
Normal file
BIN
corpus/853694cf1e93f28aca05f2e96798ece03061d8a9
Normal file
Binary file not shown.
BIN
corpus/9078ea2ea49ecbfda8dd547b82175f4f2cec85d0
Normal file
BIN
corpus/9078ea2ea49ecbfda8dd547b82175f4f2cec85d0
Normal file
Binary file not shown.
BIN
corpus/96e2a3c9f8d6cc448d67502664f737560b422671
Normal file
BIN
corpus/96e2a3c9f8d6cc448d67502664f737560b422671
Normal file
Binary file not shown.
BIN
corpus/9deef5d7e40663e48c50d5049014e84c3a8ec72f
Normal file
BIN
corpus/9deef5d7e40663e48c50d5049014e84c3a8ec72f
Normal file
Binary file not shown.
BIN
corpus/9f390812bd1313d3727cf6c3502178d35f8f091d
Normal file
BIN
corpus/9f390812bd1313d3727cf6c3502178d35f8f091d
Normal file
Binary file not shown.
BIN
corpus/a0d2aadd536eabbf2f847b8a35e7d2bec90eff43
Normal file
BIN
corpus/a0d2aadd536eabbf2f847b8a35e7d2bec90eff43
Normal file
Binary file not shown.
BIN
corpus/afab94e11f2b96751dd648bb294847901787a203
Normal file
BIN
corpus/afab94e11f2b96751dd648bb294847901787a203
Normal file
Binary file not shown.
BIN
corpus/b6c7adfbb014456cc17d18e945c2b0a92299bece
Normal file
BIN
corpus/b6c7adfbb014456cc17d18e945c2b0a92299bece
Normal file
Binary file not shown.
BIN
corpus/b78c3ce2010ea9e0447d31be62ca126f2445f2e4
Normal file
BIN
corpus/b78c3ce2010ea9e0447d31be62ca126f2445f2e4
Normal file
Binary file not shown.
BIN
corpus/c4eb7f9dbf6b1b3f36a61b4f89a3ba204a793f1a
Normal file
BIN
corpus/c4eb7f9dbf6b1b3f36a61b4f89a3ba204a793f1a
Normal file
Binary file not shown.
BIN
corpus/c68b79c358de72172c32a73e8c694a379d622424
Normal file
BIN
corpus/c68b79c358de72172c32a73e8c694a379d622424
Normal file
Binary file not shown.
BIN
corpus/cca06ae98268952fd1b77f6e2abd97eb297b1a56
Normal file
BIN
corpus/cca06ae98268952fd1b77f6e2abd97eb297b1a56
Normal file
Binary file not shown.
BIN
corpus/cd7b5b4c8fc745842394a39e261490cd2123f903
Normal file
BIN
corpus/cd7b5b4c8fc745842394a39e261490cd2123f903
Normal file
Binary file not shown.
BIN
corpus/cfb0a0258f9ecdb58e1773352784543819bc947d
Normal file
BIN
corpus/cfb0a0258f9ecdb58e1773352784543819bc947d
Normal file
Binary file not shown.
BIN
corpus/d1ea327a3ee50b28de31bc8d169bacfffaaf209a
Normal file
BIN
corpus/d1ea327a3ee50b28de31bc8d169bacfffaaf209a
Normal file
Binary file not shown.
BIN
corpus/d575af201f168a680ba0a5d6a76dc02e6b130ab8
Normal file
BIN
corpus/d575af201f168a680ba0a5d6a76dc02e6b130ab8
Normal file
Binary file not shown.
BIN
corpus/d74eceff242d2281c29b65030c7e6a9844d6016c
Normal file
BIN
corpus/d74eceff242d2281c29b65030c7e6a9844d6016c
Normal file
Binary file not shown.
BIN
corpus/d7d22a5670f5ac18870856171c4555a46998bb49
Normal file
BIN
corpus/d7d22a5670f5ac18870856171c4555a46998bb49
Normal file
Binary file not shown.
BIN
corpus/d8d0c3eb0e132b0b3eefd11f1ff6660267485168
Normal file
BIN
corpus/d8d0c3eb0e132b0b3eefd11f1ff6660267485168
Normal file
Binary file not shown.
BIN
corpus/d9bb47bd8cd24b80cb11704a30f219a8a0bb3e24
Normal file
BIN
corpus/d9bb47bd8cd24b80cb11704a30f219a8a0bb3e24
Normal file
Binary file not shown.
BIN
corpus/de1f029e16846e29376769b00f3441722dcb5cf5
Normal file
BIN
corpus/de1f029e16846e29376769b00f3441722dcb5cf5
Normal file
Binary file not shown.
BIN
corpus/e4429de0453e9a6a4798c09a152e8229e4c06d56
Normal file
BIN
corpus/e4429de0453e9a6a4798c09a152e8229e4c06d56
Normal file
Binary file not shown.
BIN
corpus/e7bb8ea46d657cb03bae68f19ec8727b1ad6e6bd
Normal file
BIN
corpus/e7bb8ea46d657cb03bae68f19ec8727b1ad6e6bd
Normal file
Binary file not shown.
BIN
corpus/e9d5413739467acb5d40c2ece14427cff7846196
Normal file
BIN
corpus/e9d5413739467acb5d40c2ece14427cff7846196
Normal file
Binary file not shown.
BIN
corpus/edda099a9ee07d78923e61f1db04de1ef46d2d4a
Normal file
BIN
corpus/edda099a9ee07d78923e61f1db04de1ef46d2d4a
Normal file
Binary file not shown.
BIN
corpus/f2dcdd4e3a67d7d1ca2ab43300750cb9c95fc069
Normal file
BIN
corpus/f2dcdd4e3a67d7d1ca2ab43300750cb9c95fc069
Normal file
Binary file not shown.
BIN
corpus/f63507ee01e5d9c3177050bb57367996c4c62e4b
Normal file
BIN
corpus/f63507ee01e5d9c3177050bb57367996c4c62e4b
Normal file
Binary file not shown.
BIN
corpus/fbae9c0c23d75abddc753fda7cd5d106906d889f
Normal file
BIN
corpus/fbae9c0c23d75abddc753fda7cd5d106906d889f
Normal file
Binary file not shown.
@@ -8,7 +8,7 @@ SRC_DIR="${0%/*}"
|
||||
BUILD_ARM="$(mktemp -d -t conflict-set-arm)"
|
||||
BUILD_X86="$(mktemp -d -t conflict-set-x86)"
|
||||
|
||||
cmake_args=(-DCMAKE_CXX_FLAGS=-DNVALGRIND -DCPACK_PACKAGING_INSTALL_PREFIX=/usr/local)
|
||||
cmake_args=(-DCMAKE_CXX_FLAGS=-DNVALGRIND -DCPACK_PACKAGING_INSTALL_PREFIX=/usr/local -DCMAKE_CXX_COMPILER=/opt/homebrew/opt/llvm/bin/clang++)
|
||||
|
||||
cmake -S"$SRC_DIR" -B"$BUILD_ARM" -DCMAKE_OSX_ARCHITECTURES=arm64 "${cmake_args[@]}"
|
||||
cmake --build "$BUILD_ARM" --target conflict-set --target conflict-set-static
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import struct
|
||||
from conflict_set import *
|
||||
|
||||
|
||||
@@ -164,6 +165,16 @@ def test_fixup_256():
|
||||
cs.check(read(0, bytes([1]), bytes([2])))
|
||||
|
||||
|
||||
def test_large_removal_buffer():
|
||||
with DebugConflictSet() as cs:
|
||||
for i in range(1000):
|
||||
# create extra gc work
|
||||
for j in range(100):
|
||||
cs.addWrites(1000 + i)
|
||||
cs.addWrites(1000 + i, write(struct.pack(">l", i) + bytes([0] * 100000)))
|
||||
cs.setOldestVersion(i)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# budget "pytest" for ctest integration without pulling in a dependency. You can of course still use pytest in local development.
|
||||
import argparse
|
||||
|
||||
Reference in New Issue
Block a user