3 Commits

Author SHA1 Message Date
andrew baddea7f57 Update and freeze pre-commit hooks
weaselab/conflict-set/pipeline/head There was a failure building this commit
2024-04-03 12:20:26 -07:00
andrew 2d3e7b9004 Add shellcheck to pre-commit
Closes #22
2024-04-03 12:19:55 -07:00
andrew c4862fee9b Add symbol tests for apple
closes #21
2024-04-03 12:15:08 -07:00
1120 changed files with 2843 additions and 5516 deletions
-1
View File
@@ -1,3 +1,2 @@
.cache
__pycache__
build
+5 -16
View File
@@ -13,7 +13,7 @@ repos:
- id: debug verbose check
name: disallow checking in DEBUG_VERBOSE=1
description: disallow checking in DEBUG_VERBOSE=1
entry: "^#define DEBUG_VERBOSE 1$"
entry: '^#define DEBUG_VERBOSE 1$'
language: pygrep
types: [c++]
- repo: local
@@ -21,22 +21,11 @@ repos:
- id: debug verbose check
name: disallow checking in SHOW_MEMORY=1
description: disallow checking in SHOW_MEMORY=1
entry: "^#define SHOW_MEMORY 1$"
entry: '^#define SHOW_MEMORY 1$'
language: pygrep
types: [c++]
- repo: local
hooks:
- id: sim cache misses check
name: disallow checking in SIM_CACHE_MISSES=1
description: disallow checking in SIM_CACHE_MISSES=1
entry: "^#define SIM_CACHE_MISSES 1$"
language: pygrep
types: [c++]
- repo: https://github.com/shellcheck-py/shellcheck-py
rev: a23f6b85d0fdd5bb9d564e2579e678033debbdff # frozen: v0.10.0.1
- repo: https://github.com/koalaman/shellcheck-precommit
rev: 2491238703a5d3415bb2b7ff11388bf775372f29 # frozen: v0.10.0
hooks:
- id: shellcheck
- repo: https://github.com/psf/black
rev: 552baf822992936134cbd31a38f69c8cfe7c0f05 # frozen: 24.3.0
hooks:
- id: black
# args: ["--severity=warning"] # Optionally only show errors and warnings
+2 -111
View File
@@ -7,6 +7,7 @@
void showMemory(const ConflictSet &cs);
#endif
#define ANKERL_NANOBENCH_IMPLEMENT
#include "third_party/nanobench.h"
constexpr int kNumKeys = 1000000;
@@ -257,114 +258,4 @@ void benchConflictSet() {
}
}
constexpr int kKeyLenForWorstCase = 50;
ConflictSet worstCaseConflictSetForRadixRangeRead(int cardinality) {
ConflictSet cs{0};
for (int i = 0; i < kKeyLenForWorstCase; ++i) {
for (int j = 0; j < cardinality; ++j) {
auto b = std::vector<uint8_t>(i, 0);
b.push_back(j);
auto e = std::vector<uint8_t>(i, 255);
e.push_back(255 - j);
weaselab::ConflictSet::WriteRange w[] = {{
{b.data(), int(b.size())},
{nullptr, 0},
},
{
{e.data(), int(e.size())},
{nullptr, 0},
}};
std::sort(std::begin(w), std::end(w),
[](const auto &lhs, const auto &rhs) {
int cl = std::min(lhs.begin.len, rhs.begin.len);
if (cl > 0) {
int c = memcmp(lhs.begin.p, rhs.begin.p, cl);
if (c != 0) {
return c < 0;
}
}
return lhs.begin.len < rhs.begin.len;
});
cs.addWrites(w, sizeof(w) / sizeof(w[0]), 0);
}
}
// Defeat short-circuiting on the left
{
auto k = std::vector<uint8_t>(kKeyLenForWorstCase, 0);
weaselab::ConflictSet::WriteRange w[] = {
{
{k.data(), int(k.size())},
{nullptr, 0},
},
};
cs.addWrites(w, sizeof(w) / sizeof(w[0]), 1);
}
// Defeat short-circuiting on the right
{
auto k = std::vector<uint8_t>(kKeyLenForWorstCase, 255);
weaselab::ConflictSet::WriteRange w[] = {
{
{k.data(), int(k.size())},
{nullptr, 0},
},
};
cs.addWrites(w, sizeof(w) / sizeof(w[0]), 1);
}
return cs;
}
void benchWorstCaseForRadixRangeRead() {
ankerl::nanobench::Bench bench;
std::unique_ptr<ConflictSet> cs[256];
for (int i = 0; i < 256; ++i) {
cs[i] =
std::make_unique<ConflictSet>(worstCaseConflictSetForRadixRangeRead(i));
}
auto begin = std::vector<uint8_t>(kKeyLenForWorstCase - 1, 0);
begin.push_back(1);
auto end = std::vector<uint8_t>(kKeyLenForWorstCase - 1, 255);
end.push_back(254);
weaselab::ConflictSet::Result result;
weaselab::ConflictSet::ReadRange r{
{begin.data(), int(begin.size())}, {end.data(), int(end.size())}, 0};
bench.run("worst case for radix tree", [&]() {
for (int i = 0; i < 256; ++i) {
result = weaselab::ConflictSet::TooOld;
cs[i]->check(&r, &result, 1);
if (result != weaselab::ConflictSet::Commit) {
abort();
}
}
});
// for (int i = 0; i < 256; ++i) {
// bench.run("worst case for radix tree, span " + std::to_string(i), [&]() {
// result = weaselab::ConflictSet::TooOld;
// cs[i]->check(&r, &result, 1);
// if (result != weaselab::ConflictSet::Commit) {
// abort();
// }
// });
// }
}
void benchCreateAndDestroy() {
ankerl::nanobench::Bench bench;
bench.run("create and destroy", [&]() { ConflictSet cs{0}; });
}
int main(void) {
benchConflictSet();
benchWorstCaseForRadixRangeRead();
benchCreateAndDestroy();
}
int main(void) { benchConflictSet(); }
+69 -126
View File
@@ -1,17 +1,13 @@
cmake_minimum_required(VERSION 3.18)
project(
conflict-set
VERSION 0.0.14
VERSION 0.0.2
DESCRIPTION
"A data structure for optimistic concurrency control on ranges of bitwise-lexicographically-ordered keys."
HOMEPAGE_URL "https://git.weaselab.dev/weaselab/conflict-set"
LANGUAGES C CXX)
set(CMAKE_CXX_STANDARD 20)
file(WRITE ${CMAKE_CURRENT_BINARY_DIR}/version.txt ${PROJECT_VERSION})
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/version.txt.in
${CMAKE_CURRENT_SOURCE_DIR}/paper/version.txt)
include(CMakePushCheckState)
include(CheckCXXCompilerFlag)
include(CheckIncludeFileCXX)
@@ -19,6 +15,12 @@ include(CheckCXXSourceCompiles)
set(DEFAULT_BUILD_TYPE "Release")
if(EMSCRIPTEN OR CMAKE_SYSTEM_NAME STREQUAL WASI)
set(WASM ON)
else()
set(WASM OFF)
endif()
if(NOT CMAKE_BUILD_TYPE AND NOT CMAKE_CONFIGURATION_TYPES)
message(
STATUS
@@ -33,10 +35,6 @@ endif()
add_compile_options(-fdata-sections -ffunction-sections -Wswitch-enum
-Werror=switch-enum -fPIC)
if(NOT APPLE)
# This causes some versions of clang to crash on macos
add_compile_options(-g -fno-omit-frame-pointer)
endif()
set(full_relro_flags "-pie;LINKER:-z,relro,-z,now,-z,noexecstack")
cmake_push_check_state()
@@ -47,8 +45,7 @@ if(HAS_FULL_RELRO)
endif()
cmake_pop_check_state()
set(version_script_flags
LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/linker.map)
set(version_script_flags LINKER:--version-script=${CMAKE_SOURCE_DIR}/linker.map)
cmake_push_check_state()
list(APPEND CMAKE_REQUIRED_LINK_OPTIONS ${version_script_flags})
check_cxx_source_compiles("int main(){}" HAS_VERSION_SCRIPT FAIL_REGEX
@@ -58,11 +55,9 @@ cmake_pop_check_state()
option(USE_SIMD_FALLBACK
"Use fallback implementations of functions that use SIMD" OFF)
option(DISABLE_TSAN "Disable TSAN" OFF)
# This is encouraged according to
# https://valgrind.org/docs/manual/manual-core-adv.html#manual-core-adv.clientreq
include_directories(SYSTEM ${CMAKE_CURRENT_SOURCE_DIR}/third_party/valgrind)
include_directories(SYSTEM ${CMAKE_SOURCE_DIR}/third_party/valgrind)
add_compile_options($<$<COMPILE_LANGUAGE:CXX>:-Wno-invalid-offsetof>)
@@ -72,7 +67,22 @@ else()
add_link_options(-Wl,--gc-sections)
endif()
if(EMSCRIPTEN)
# https://github.com/emscripten-core/emscripten/issues/15377#issuecomment-1285167486
add_link_options(-lnodefs.js -lnoderawfs.js)
add_link_options(-s ALLOW_MEMORY_GROWTH)
endif()
if(NOT USE_SIMD_FALLBACK)
cmake_push_check_state()
list(APPEND CMAKE_REQUIRED_FLAGS -msimd128)
check_include_file_cxx("wasm_simd128.h" HAS_WASM_SIMD)
if(HAS_WASM_SIMD)
add_compile_options(-msimd128)
add_compile_definitions(HAS_WASM_SIMD)
endif()
cmake_pop_check_state()
cmake_push_check_state()
list(APPEND CMAKE_REQUIRED_FLAGS -mavx)
check_include_file_cxx("immintrin.h" HAS_AVX)
@@ -94,20 +104,19 @@ add_library(${PROJECT_NAME}-object OBJECT ConflictSet.cpp)
target_compile_options(${PROJECT_NAME}-object PRIVATE -fno-exceptions
-fvisibility=hidden)
target_include_directories(${PROJECT_NAME}-object
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
PRIVATE ${CMAKE_SOURCE_DIR}/include)
add_library(${PROJECT_NAME} SHARED $<TARGET_OBJECTS:${PROJECT_NAME}-object>)
set_target_properties(
${PROJECT_NAME} PROPERTIES LIBRARY_OUTPUT_DIRECTORY
"${CMAKE_CURRENT_BINARY_DIR}/radix_tree")
"${CMAKE_BINARY_DIR}/radix_tree")
if(NOT CMAKE_BUILD_TYPE STREQUAL Debug)
set_target_properties(${PROJECT_NAME} PROPERTIES LINKER_LANGUAGE C)
endif()
if(HAS_VERSION_SCRIPT)
target_link_options(
${PROJECT_NAME} PRIVATE
LINKER:--version-script=${CMAKE_CURRENT_SOURCE_DIR}/linker.map)
target_link_options(${PROJECT_NAME} PRIVATE
LINKER:--version-script=${CMAKE_SOURCE_DIR}/linker.map)
endif()
add_library(${PROJECT_NAME}-static STATIC
@@ -119,55 +128,54 @@ endif()
if(APPLE)
add_custom_command(
TARGET ${PROJECT_NAME}-static
PRE_LINK
COMMAND ${CMAKE_CURRENT_SOURCE_DIR}/privatize_symbols_macos.sh
PRE_BUILD
COMMAND ${CMAKE_SOURCE_DIR}/privatize_symbols_macos.sh
$<TARGET_OBJECTS:${PROJECT_NAME}-object>)
else()
add_custom_command(
TARGET ${PROJECT_NAME}-static
POST_BUILD
COMMAND
${CMAKE_OBJCOPY}
--keep-global-symbols=${CMAKE_CURRENT_SOURCE_DIR}/symbol-exports.txt
${CMAKE_OBJCOPY} --keep-global-symbols=${CMAKE_SOURCE_DIR}/symbols.txt
$<TARGET_FILE:${PROJECT_NAME}-static> || echo
"Proceeding with all symbols global in static library")
endif()
set(TEST_FLAGS -Wall -Wextra -Wunreachable-code -Wpedantic -UNDEBUG)
include(CTest)
# disable tests if this is being used through e.g. FetchContent
if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_library(nanobench ${CMAKE_CURRENT_SOURCE_DIR}/nanobench.cpp)
set(TEST_FLAGS -Wall -Wextra -Wunreachable-code -Wpedantic -UNDEBUG)
if(BUILD_TESTING)
# corpus tests, which are tests curated by libfuzzer. The goal is to get broad
# coverage with a small number of tests.
file(GLOB CORPUS_TESTS ${CMAKE_CURRENT_SOURCE_DIR}/corpus/*)
file(GLOB CORPUS_TESTS ${CMAKE_SOURCE_DIR}/corpus/*)
# extra testing that relies on shared libraries, which aren't available with
# wasm
if(NOT WASM)
# Shared library version of FoundationDB's skip list implementation
add_library(skip_list SHARED SkipList.cpp)
target_compile_options(skip_list PRIVATE -fno-exceptions -fvisibility=hidden)
target_include_directories(skip_list
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
set_target_properties(
skip_list PROPERTIES LIBRARY_OUTPUT_DIRECTORY
"${CMAKE_CURRENT_BINARY_DIR}/skip_list")
target_compile_options(skip_list PRIVATE -fno-exceptions
-fvisibility=hidden)
target_include_directories(skip_list PUBLIC ${CMAKE_SOURCE_DIR}/include)
set_target_properties(skip_list PROPERTIES LIBRARY_OUTPUT_DIRECTORY
"${CMAKE_BINARY_DIR}/skip_list")
set_target_properties(skip_list PROPERTIES OUTPUT_NAME ${PROJECT_NAME})
set_target_properties(skip_list PROPERTIES VERSION ${PROJECT_VERSION}
SOVERSION ${PROJECT_VERSION_MAJOR})
set_target_properties(
skip_list PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION
${PROJECT_VERSION_MAJOR})
# Shared library version of a std::unordered_map-based conflict set (point
# queries only)
add_library(hash_table SHARED HashTable.cpp)
target_compile_options(hash_table PRIVATE -fno-exceptions -fvisibility=hidden)
target_include_directories(hash_table
PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_options(hash_table PRIVATE -fno-exceptions
-fvisibility=hidden)
target_include_directories(hash_table PUBLIC ${CMAKE_SOURCE_DIR}/include)
set_target_properties(
hash_table PROPERTIES LIBRARY_OUTPUT_DIRECTORY
"${CMAKE_CURRENT_BINARY_DIR}/hash_table")
"${CMAKE_BINARY_DIR}/hash_table")
set_target_properties(hash_table PROPERTIES OUTPUT_NAME ${PROJECT_NAME})
set_target_properties(
hash_table PROPERTIES VERSION ${PROJECT_VERSION} SOVERSION
@@ -181,13 +189,13 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
get_filename_component(hash ${TEST} NAME)
add_test(NAME skip_list_${hash} COMMAND driver_skip_list ${TEST})
endforeach()
endif()
# ad hoc testing
add_executable(conflict_set_main ConflictSet.cpp)
target_include_directories(conflict_set_main
PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/include)
target_compile_definitions(conflict_set_main PRIVATE ENABLE_MAIN)
target_link_libraries(conflict_set_main PRIVATE nanobench)
if(NOT APPLE)
# libfuzzer target, to generate/manage corpus
@@ -226,7 +234,7 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
endforeach()
# tsan tests
if(NOT CMAKE_CROSSCOMPILING AND NOT DISABLE_TSAN)
if(NOT CMAKE_CROSSCOMPILING AND NOT CMAKE_BUILD_TYPE STREQUAL Debug)
add_executable(tsan_driver ConflictSet.cpp FuzzTestDriver.cpp)
target_compile_options(tsan_driver PRIVATE ${TEST_FLAGS} -fsanitize=thread)
target_link_options(tsan_driver PRIVATE -fsanitize=thread)
@@ -250,38 +258,20 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
# scripted tests. Written manually to fill in anything libfuzzer couldn't
# find.
if(NOT CMAKE_CROSSCOMPILING)
find_package(Python3 REQUIRED COMPONENTS Interpreter)
set_property(
DIRECTORY
APPEND
PROPERTY CMAKE_CONFIGURE_DEPENDS
${CMAKE_CURRENT_SOURCE_DIR}/test_conflict_set.py)
execute_process(
COMMAND ${Python3_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/test_conflict_set.py list
OUTPUT_VARIABLE SCRIPT_TESTS)
add_executable(script_test ScriptTest.cpp)
target_compile_options(script_test PRIVATE ${TEST_FLAGS})
target_link_libraries(script_test PRIVATE ${PROJECT_NAME})
file(GLOB SCRIPT_TESTS ${CMAKE_SOURCE_DIR}/script_tests/*)
foreach(TEST ${SCRIPT_TESTS})
add_test(
NAME script_test_${TEST}
COMMAND
${Python3_EXECUTABLE}
${CMAKE_CURRENT_SOURCE_DIR}/test_conflict_set.py test ${TEST}
--build-dir ${CMAKE_CURRENT_BINARY_DIR})
get_filename_component(name ${TEST} NAME)
add_test(NAME conflict_set_script_${name} COMMAND script_test ${TEST})
endforeach()
endif()
find_program(VALGRIND_EXE valgrind)
if(VALGRIND_EXE AND NOT CMAKE_CROSSCOMPILING)
list(LENGTH CORPUS_TESTS len)
math(EXPR last "${len} - 1")
set(partition_size 100)
foreach(i RANGE 0 ${last} ${partition_size})
list(SUBLIST CORPUS_TESTS ${i} ${partition_size} partition)
add_test(NAME conflict_set_blackbox_valgrind_${i}
add_test(NAME conflict_set_blackbox_valgrind
COMMAND ${VALGRIND_EXE} --error-exitcode=99 --
$<TARGET_FILE:driver> ${partition})
endforeach()
$<TARGET_FILE:driver> ${CORPUS_TESTS})
endif()
# api smoke tests
@@ -305,63 +295,39 @@ if(CMAKE_SOURCE_DIR STREQUAL CMAKE_CURRENT_SOURCE_DIR AND BUILD_TESTING)
add_test(NAME conflict_set_cxx_api_test COMMAND conflict_set_cxx_api_test)
# symbol visibility tests
if(NOT CMAKE_BUILD_TYPE STREQUAL Debug)
if(NOT WASM AND NOT CMAKE_BUILD_TYPE STREQUAL Debug)
if(APPLE)
set(symbol_exports ${CMAKE_CURRENT_SOURCE_DIR}/apple-symbol-exports.txt)
set(symbol_imports ${CMAKE_CURRENT_SOURCE_DIR}/apple-symbol-imports.txt)
set(symbol_exports ${CMAKE_SOURCE_DIR}/apple-symbol-exports.txt)
set(symbol_imports ${CMAKE_SOURCE_DIR}/apple-symbol-imports.txt)
else()
set(symbol_exports ${CMAKE_CURRENT_SOURCE_DIR}/symbol-exports.txt)
if(CMAKE_SYSTEM_PROCESSOR STREQUAL aarch64)
set(symbol_imports
${CMAKE_CURRENT_SOURCE_DIR}/aarch64-symbol-imports.txt)
else()
set(symbol_imports ${CMAKE_CURRENT_SOURCE_DIR}/symbol-imports.txt)
endif()
set(symbol_exports ${CMAKE_SOURCE_DIR}/symbol-exports.txt)
set(symbol_imports ${CMAKE_SOURCE_DIR}/symbol-imports.txt)
endif()
add_test(
NAME conflict_set_shared_symbols
COMMAND
${CMAKE_CURRENT_SOURCE_DIR}/test_symbols.sh
$<TARGET_FILE:${PROJECT_NAME}> ${symbol_exports} ${symbol_imports})
${CMAKE_SOURCE_DIR}/test_symbols.sh $<TARGET_FILE:${PROJECT_NAME}>
${symbol_exports} ${symbol_imports})
add_test(
NAME conflict_set_static_symbols
COMMAND
${CMAKE_CURRENT_SOURCE_DIR}/test_symbols.sh
${CMAKE_SOURCE_DIR}/test_symbols.sh
$<TARGET_FILE:${PROJECT_NAME}-static> ${symbol_exports}
${symbol_imports})
endif()
# bench
add_executable(conflict_set_bench Bench.cpp)
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME} nanobench)
target_link_libraries(conflict_set_bench PRIVATE ${PROJECT_NAME})
set_target_properties(conflict_set_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(real_data_bench RealDataBench.cpp)
target_link_libraries(real_data_bench PRIVATE ${PROJECT_NAME})
set_target_properties(real_data_bench PROPERTIES SKIP_BUILD_RPATH ON)
# fuzzer-based perf
add_executable(driver_perf TestDriver.cpp)
target_compile_definitions(driver_perf PRIVATE PERF_TEST=1)
target_link_libraries(driver_perf PRIVATE ${PROJECT_NAME})
# server bench
add_executable(server_bench ServerBench.cpp)
target_link_libraries(server_bench PRIVATE ${PROJECT_NAME})
set_target_properties(server_bench PROPERTIES SKIP_BUILD_RPATH ON)
add_executable(interleaving_test InterleavingTest.cpp)
# work around lack of musttail for gcc
if(CMAKE_CXX_COMPILER_ID STREQUAL "GNU" AND CMAKE_BUILD_TYPE STREQUAL "Debug")
target_compile_options(interleaving_test PRIVATE -Og
-foptimize-sibling-calls)
endif()
target_link_libraries(interleaving_test PRIVATE nanobench)
endif()
# packaging
set(CPACK_PACKAGE_CONTACT andrew@weaselab.dev)
set(CMAKE_INSTALL_DEFAULT_COMPONENT_NAME all)
set(CPACK_PACKAGE_VENDOR "Weaselab")
set(CPACK_RESOURCE_FILE_LICENSE "${CMAKE_CURRENT_SOURCE_DIR}/LICENSE")
@@ -375,26 +341,6 @@ set(CPACK_RPM_FILE_NAME RPM-DEFAULT)
# deb
set(CPACK_DEBIAN_FILE_NAME DEB-DEFAULT)
# see *-imports.txt - dependency versions need to be synced with symbol versions
if(CMAKE_SYSTEM_PROCESSOR STREQUAL aarch64)
set(CPACK_DEBIAN_PACKAGE_DEPENDS "libc6 (>= 2.17)")
else()
set(CPACK_DEBIAN_PACKAGE_DEPENDS "libc6 (>= 2.14)")
endif()
# macos
set(CMAKE_OSX_DEPLOYMENT_TARGET 11.0)
if(APPLE)
find_program(PANDOC_EXE pandoc)
if(PANDOC_EXE)
execute_process(COMMAND ${PANDOC_EXE} ${CMAKE_CURRENT_SOURCE_DIR}/README.md
-o ${CMAKE_CURRENT_BINARY_DIR}/README.txt)
set(CPACK_RESOURCE_FILE_README ${CMAKE_CURRENT_BINARY_DIR}/README.txt)
endif()
configure_file(${CMAKE_CURRENT_SOURCE_DIR}/LICENSE
${CMAKE_CURRENT_BINARY_DIR}/LICENSE.txt COPYONLY)
set(CPACK_RESOURCE_FILE_LICENSE ${CMAKE_CURRENT_BINARY_DIR}/LICENSE.txt)
endif()
include(CPack)
@@ -420,11 +366,8 @@ install(
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR})
install(DIRECTORY include/
DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${PROJECT_NAME})
install(EXPORT ${PROJECT_NAME}Config
DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/${PROJECT_NAME}/cmake)
cpack_add_component(all)
+1389 -2675
View File
File diff suppressed because it is too large Load Diff
-7
View File
@@ -98,13 +98,6 @@ void ConflictSet::setOldestVersion(int64_t oldestVersion) {
int64_t ConflictSet::getBytes() const { return -1; }
void ConflictSet::getMetricsV1(MetricsV1 **metrics, int *count) const {
*metrics = nullptr;
*count = 0;
}
double ConflictSet::MetricsV1::getValue() const { return 0; }
ConflictSet::ConflictSet(int64_t oldestVersion)
: impl(new(safe_malloc(sizeof(Impl))) Impl{oldestVersion}) {}
-256
View File
@@ -1,256 +0,0 @@
#include <alloca.h>
#include <cassert>
#ifdef __x86_64__
#include <immintrin.h>
#endif
#include "third_party/nanobench.h"
struct Job {
int *input;
// Returned void* is a function pointer to the next continuation. We have to
// use void* because otherwise the type would be recursive.
typedef void *(*continuation)(Job *);
continuation next;
};
void *stepJob(Job *j) {
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
return done ? nullptr : (void *)stepJob;
}
void sequential(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
do {
jobs[i]->next = (Job::continuation)jobs[i]->next(jobs[i]);
} while (jobs[i]->next);
}
}
void sequentialNoFuncPtr(Job **jobs, int count) {
for (int i = 0; i < count; ++i) {
while (stepJob(jobs[i]))
;
}
}
void interleaveSwapping(Job **jobs, int remaining) {
int current = 0;
while (remaining > 0) {
auto next = (Job::continuation)jobs[current]->next(jobs[current]);
jobs[current]->next = next;
if (next == nullptr) {
jobs[current] = jobs[remaining - 1];
--remaining;
} else {
++current;
}
if (current == remaining) {
current = 0;
}
}
}
void interleaveBoundedCyclicList(Job **jobs, int count) {
if (count == 0) {
return;
}
constexpr int kConcurrent = 32;
Job *inProgress[kConcurrent];
int nextJob[kConcurrent];
int started = std::min(kConcurrent, count);
for (int i = 0; i < started; i++) {
inProgress[i] = jobs[i];
nextJob[i] = i + 1;
}
nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)inProgress[job]->next(inProgress[job]);
inProgress[job]->next = next;
if (next == nullptr) {
if (started == count) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
} else {
int temp = started++;
inProgress[job] = jobs[temp];
}
}
prevJob = job;
job = nextJob[job];
}
}
#ifndef __has_attribute
#define __has_attribute(x) 0
#endif
#if __has_attribute(musttail)
#define MUSTTAIL __attribute__((musttail))
#else
#define MUSTTAIL
#endif
struct Context {
constexpr static int kConcurrent = 32;
Job **jobs;
Job *inProgress[kConcurrent];
void (*continuation[kConcurrent])(Context *, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
int nextJob[kConcurrent];
};
void keepGoing(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count);
void complete(Context *context, int64_t prevJob, int64_t job, int64_t started,
int64_t count) {
if (started == count) {
if (prevJob == job) {
return;
}
context->nextJob[prevJob] = context->nextJob[job];
job = prevJob;
} else {
context->inProgress[job] = context->jobs[started++];
context->continuation[job] = stepJobTailCall;
}
prevJob = job;
job = context->nextJob[job];
MUSTTAIL return context->continuation[job](context, prevJob, job, started,
count);
}
void stepJobTailCall(Context *context, int64_t prevJob, int64_t job,
int64_t started, int64_t count) {
auto *j = context->inProgress[job];
auto done = --(*j->input) == 0;
#ifdef __x86_64__
_mm_clflush(j->input);
#endif
if (done) {
MUSTTAIL return complete(context, prevJob, job, started, count);
} else {
context->continuation[job] = stepJobTailCall;
MUSTTAIL return keepGoing(context, prevJob, job, started, count);
}
}
void useTailCalls(Job **jobs, int count) {
if (count == 0) {
return;
}
Context context;
context.jobs = jobs;
int64_t started = std::min(Context::kConcurrent, count);
for (int i = 0; i < started; i++) {
context.inProgress[i] = jobs[i];
context.nextJob[i] = i + 1;
context.continuation[i] = stepJobTailCall;
}
context.nextJob[started - 1] = 0;
int prevJob = started - 1;
int job = 0;
return context.continuation[job](&context, prevJob, job, started, count);
}
void interleaveCyclicList(Job **jobs, int count) {
auto *nextJob = (int *)alloca(sizeof(int) * count);
for (int i = 0; i < count - 1; ++i) {
nextJob[i] = i + 1;
}
nextJob[count - 1] = 0;
int prevJob = count - 1;
int job = 0;
for (;;) {
auto next = (Job::continuation)jobs[job]->next(jobs[job]);
jobs[job]->next = next;
if (next == nullptr) {
if (prevJob == job)
break;
nextJob[prevJob] = nextJob[job];
job = prevJob;
}
prevJob = job;
job = nextJob[job];
}
}
int main() {
ankerl::nanobench::Bench bench;
constexpr int kNumJobs = 10000;
bench.relative(true);
Job jobs[kNumJobs];
Job jobsCopy[kNumJobs];
int iters = 0;
int originalInput[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
originalInput[i] = rand() % 5 + 3;
jobs[i].input = new int{originalInput[i]};
jobs[i].next = stepJob;
iters += *jobs[i].input;
}
bench.batch(iters);
for (auto [scheduler, name] :
{std::make_pair(sequentialNoFuncPtr, "sequentialNoFuncPtr"),
std::make_pair(sequential, "sequential"),
std::make_pair(useTailCalls, "useTailCalls"),
std::make_pair(interleaveSwapping, "interleavingSwapping"),
std::make_pair(interleaveBoundedCyclicList,
"interleaveBoundedCyclicList"),
std::make_pair(interleaveCyclicList, "interleaveCyclicList")}) {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
for (int i = 0; i < kNumJobs; ++i) {
if (*jobsCopy[i].input != 0) {
fprintf(stderr, "%s failed\n", name);
abort();
}
}
bench.run(name, [&]() {
for (int i = 0; i < kNumJobs; ++i) {
*jobs[i].input = originalInput[i];
}
memcpy(jobsCopy, jobs, sizeof(jobs));
Job *ps[kNumJobs];
for (int i = 0; i < kNumJobs; ++i) {
ps[i] = jobsCopy + i;
}
scheduler(ps, kNumJobs);
});
}
for (int i = 0; i < kNumJobs; ++i) {
delete jobs[i].input;
}
}
+59 -240
View File
@@ -20,6 +20,7 @@ using namespace weaselab;
#include <thread>
#include <unordered_set>
#include <utility>
#include <vector>
#include <callgrind.h>
@@ -98,7 +99,8 @@ __attribute__((always_inline)) inline void safe_free(void *p, size_t s) {
mallocBytesDelta -= s;
#if SHOW_MEMORY
mallocBytes -= s;
#endif
free(p);
#else
#ifndef NDEBUG
(char *&)p -= kMallocHeaderSize;
size_t expected;
@@ -106,6 +108,7 @@ __attribute__((always_inline)) inline void safe_free(void *p, size_t s) {
assert(s == expected);
#endif
free(p);
#endif
}
// ==================== BEGIN ARENA IMPL ====================
@@ -254,75 +257,10 @@ template <class T> struct ArenaAlloc {
void deallocate(T *, size_t) noexcept {}
};
template <class T> struct Vector {
static_assert(std::is_trivially_destructible_v<T>);
static_assert(std::is_trivially_copyable_v<T>);
explicit Vector(Arena *arena)
: arena(arena), t(nullptr), size_(0), capacity(0) {}
void append(std::span<const T> slice) {
if (size_ + int(slice.size()) > capacity) {
grow(std::max<int>(size_ + slice.size(), capacity * 2));
}
if (slice.size() > 0) {
memcpy(const_cast<std::remove_const_t<T> *>(t) + size_, slice.data(),
slice.size() * sizeof(T));
}
size_ += slice.size();
}
// Caller must write to the returned slice
std::span<T> unsafePrepareAppend(int appendSize) {
if (size_ + appendSize > capacity) {
grow(std::max<int>(size_ + appendSize, capacity * 2));
}
auto result = std::span<T>(t + size_, appendSize);
size_ += appendSize;
return result;
}
void push_back(const T &t) { append(std::span<const T>(&t, 1)); }
T *begin() { return t; }
T *end() { return t + size_; }
T *data() { return t; }
T &back() {
assert(size_ > 0);
return t[size_ - 1];
}
T &operator[](int i) {
assert(i >= 0 && i < size_);
return t[i];
}
void pop_back() {
assert(size_ > 0);
--size_;
}
int size() const { return size_; }
operator std::span<const T>() const { return std::span(t, size_); }
private:
void grow(int newCapacity) {
capacity = newCapacity;
auto old = std::span<const T>(*this);
t = (T *)new (std::align_val_t(alignof(T)), *arena)
uint8_t[capacity * sizeof(T)];
size_ = 0;
append(old);
}
Arena *arena;
T *t;
int size_;
int capacity;
};
template <class T> auto vector(Arena &arena) { return Vector<T>(&arena); }
template <class T> using Vector = std::vector<T, ArenaAlloc<T>>;
template <class T> auto vector(Arena &arena) {
return Vector<T>(ArenaAlloc<T>(&arena));
}
template <class T, class C> using Set = std::set<T, C, ArenaAlloc<T>>;
template <class T, class C = std::less<T>> auto set(Arena &arena) {
return Set<T, C>(ArenaAlloc<T>(&arena));
@@ -476,15 +414,13 @@ inline uint32_t Arbitrary::bounded(uint32_t s) {
// ==================== END ARBITRARY IMPL ====================
struct ReferenceImpl {
explicit ReferenceImpl(int64_t oldestVersion)
: oldestVersion(oldestVersion), newestVersion(oldestVersion) {
explicit ReferenceImpl(int64_t oldestVersion) : oldestVersion(oldestVersion) {
writeVersionMap[""] = oldestVersion;
}
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
int count) const {
for (int i = 0; i < count; ++i) {
if (reads[i].readVersion < oldestVersion ||
reads[i].readVersion < newestVersion - 2e9) {
if (reads[i].readVersion < oldestVersion) {
results[i] = ConflictSet::TooOld;
continue;
}
@@ -506,8 +442,6 @@ struct ReferenceImpl {
}
void addWrites(const ConflictSet::WriteRange *writes, int count,
int64_t writeVersion) {
assert(writeVersion >= newestVersion);
newestVersion = writeVersion;
for (int i = 0; i < count; ++i) {
auto begin =
std::string((const char *)writes[i].begin.p, writes[i].begin.len);
@@ -527,12 +461,11 @@ struct ReferenceImpl {
}
void setOldestVersion(int64_t oldestVersion) {
assert(oldestVersion >= this->oldestVersion);
assert(oldestVersion >= oldestVersion);
this->oldestVersion = oldestVersion;
}
int64_t oldestVersion;
int64_t newestVersion;
std::map<std::string, int64_t> writeVersionMap;
};
@@ -587,52 +520,41 @@ inline const char *resultToStr(ConflictSet::Result r) {
namespace {
template <class ConflictSetImpl, bool kEnableAssertions = true>
struct TestDriver {
Arbitrary *arbitrary;
explicit TestDriver(Arbitrary &a) : arbitrary(&a) {
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr, "%p Initial version: {%" PRId64 "}\n", this, writeVersion);
#endif
}
template <class ConflictSetImpl> struct TestDriver {
Arbitrary arbitrary;
explicit TestDriver(const uint8_t *data, size_t size)
: arbitrary({data, size}) {}
int64_t oldestVersion = arbitrary->next();
int64_t writeVersion = oldestVersion;
int64_t writeVersion = 0;
int64_t oldestVersion = 0;
ConflictSetImpl cs{oldestVersion};
ReferenceImpl refImpl{oldestVersion};
constexpr static auto kMaxKeySuffixLen = 8;
constexpr static auto kMaxKeyLen = 8;
bool ok = true;
const int prefixLen = arbitrary->bounded(512);
const int prefixByte = arbitrary->randT<uint8_t>();
// Call until it returns true, for "done". Check internal invariants etc
// between calls to next.
bool next() {
assert(cs.getBytes() >= 0);
if (!arbitrary->hasEntropy()) {
if (!arbitrary.hasEntropy()) {
return true;
}
Arena arena;
{
int numPointWrites = arbitrary->bounded(100);
int numRangeWrites = arbitrary->bounded(100);
int64_t v =
(writeVersion +=
arbitrary->bounded(10) ? arbitrary->bounded(10) : arbitrary->next());
int numPointWrites = arbitrary.bounded(100);
int numRangeWrites = arbitrary.bounded(100);
int64_t v = ++writeVersion;
auto *writes =
new (arena) ConflictSet::WriteRange[numPointWrites + numRangeWrites];
auto keys = set<std::string_view>(arena);
while (int(keys.size()) < numPointWrites + numRangeWrites * 2) {
if (!arbitrary->hasEntropy()) {
if (!arbitrary.hasEntropy()) {
return true;
}
int keyLen = prefixLen + arbitrary->bounded(kMaxKeySuffixLen);
int keyLen = arbitrary.bounded(kMaxKeyLen);
auto *begin = new (arena) uint8_t[keyLen];
memset(begin, prefixByte, prefixLen);
arbitrary->randomBytes(begin + prefixLen, keyLen - prefixLen);
arbitrary.randomBytes(begin, keyLen);
keys.insert(std::string_view((const char *)begin, keyLen));
}
@@ -642,7 +564,7 @@ struct TestDriver {
rangesRemaining = numRangeWrites;
pointsRemaining > 0 || rangesRemaining > 0; ++i) {
bool pointRead = pointsRemaining > 0 && rangesRemaining > 0
? bool(arbitrary->bounded(2))
? bool(arbitrary.bounded(2))
: pointsRemaining > 0;
if (pointRead) {
assert(pointsRemaining > 0);
@@ -661,123 +583,45 @@ struct TestDriver {
++iter;
--rangesRemaining;
}
#if DEBUG_VERBOSE && !defined(NDEBUG)
if (writes[i].end.len == 0) {
fprintf(stderr, "Write: {%s} -> %" PRId64 "\n",
printable(writes[i].begin).c_str(), writeVersion);
} else {
fprintf(stderr, "Write: [%s, %s) -> %" PRId64 "\n",
printable(writes[i].begin).c_str(),
printable(writes[i].end).c_str(), writeVersion);
}
#endif
}
assert(iter == keys.end());
assert(i == numPointWrites + numRangeWrites);
// Test non-canonical writes
if (numPointWrites > 0) {
int overlaps = arbitrary->bounded(numPointWrites);
for (int i = 0; i < numPointWrites + numRangeWrites && overlaps > 0;
++i) {
if (writes[i].end.len == 0) {
int keyLen = prefixLen + arbitrary->bounded(kMaxKeySuffixLen);
auto *begin = new (arena) uint8_t[keyLen];
memset(begin, prefixByte, prefixLen);
arbitrary->randomBytes(begin + prefixLen, keyLen - prefixLen);
writes[i].end.len = keyLen;
writes[i].end.p = begin;
auto c =
std::span<const uint8_t>(writes[i].begin.p,
writes[i].begin.len) <=>
std::span<const uint8_t>(writes[i].end.p, writes[i].end.len);
if (c > 0) {
using std::swap;
swap(writes[i].begin, writes[i].end);
} else if (c == 0) {
// It's a point write after all, I guess
writes[i].end.len = 0;
}
--overlaps;
}
}
}
if (arbitrary->bounded(2)) {
// Shuffle writes
for (int i = numPointWrites + numRangeWrites - 1; i > 0; --i) {
int j = arbitrary->bounded(i + 1);
if (i != j) {
using std::swap;
swap(writes[i], writes[j]);
}
}
}
oldestVersion +=
arbitrary->bounded(10) ? arbitrary->bounded(10) : arbitrary->next();
oldestVersion = std::min(oldestVersion, writeVersion);
#ifdef THREAD_TEST
std::latch ready{1};
std::thread thread2{[&]() {
ready.count_down();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
}};
ready.wait();
#endif
#if DEBUG_VERBOSE && !defined(NDEBUG)
for (int i = 0; i < numPointWrites + numRangeWrites; ++i) {
if (writes[i].end.len == 0) {
fprintf(stderr, "%p Write: {%s}\n", this,
printable(writes[i].begin).c_str());
} else {
fprintf(stderr, "%p Write: [%s, %s)\n", this,
printable(writes[i].begin).c_str(),
printable(writes[i].end).c_str());
}
}
fprintf(stderr, "%p Write @ %" PRId64 "\n", this, v);
#endif
CALLGRIND_START_INSTRUMENTATION;
cs.addWrites(writes, numPointWrites + numRangeWrites, v);
CALLGRIND_STOP_INSTRUMENTATION;
if constexpr (kEnableAssertions) {
refImpl.addWrites(writes, numPointWrites + numRangeWrites, v);
}
#if DEBUG_VERBOSE && !defined(NDEBUG)
fprintf(stderr, "%p Set oldest version: %" PRId64 "\n", this,
oldestVersion = std::max<int64_t>(writeVersion - arbitrary.bounded(10),
oldestVersion);
#endif
CALLGRIND_START_INSTRUMENTATION;
cs.setOldestVersion(oldestVersion);
CALLGRIND_STOP_INSTRUMENTATION;
if constexpr (kEnableAssertions) {
refImpl.setOldestVersion(oldestVersion);
}
#ifdef THREAD_TEST
thread2.join();
#endif
}
{
int numPointReads = arbitrary->bounded(100);
int numRangeReads = arbitrary->bounded(100);
int64_t v = std::max<int64_t>(writeVersion - (arbitrary->bounded(10)
? arbitrary->bounded(10)
: arbitrary->next()),
0);
int numPointReads = arbitrary.bounded(100);
int numRangeReads = arbitrary.bounded(100);
int64_t v = std::max<int64_t>(writeVersion - arbitrary.bounded(10), 0);
auto *reads =
new (arena) ConflictSet::ReadRange[numPointReads + numRangeReads];
auto keys = set<std::string_view>(arena);
while (int(keys.size()) < numPointReads + numRangeReads * 2) {
if (!arbitrary->hasEntropy()) {
if (!arbitrary.hasEntropy()) {
return true;
}
int keyLen = prefixLen + arbitrary->bounded(kMaxKeySuffixLen);
int keyLen = arbitrary.bounded(kMaxKeyLen);
auto *begin = new (arena) uint8_t[keyLen];
memset(begin, prefixByte, prefixLen);
arbitrary->randomBytes(begin + prefixLen, keyLen - prefixLen);
arbitrary.randomBytes(begin, keyLen);
keys.insert(std::string_view((const char *)begin, keyLen));
}
@@ -786,7 +630,7 @@ struct TestDriver {
for (int pointsRemaining = numPointReads, rangesRemaining = numRangeReads;
pointsRemaining > 0 || rangesRemaining > 0; ++i) {
bool pointRead = pointsRemaining > 0 && rangesRemaining > 0
? bool(arbitrary->bounded(2))
? bool(arbitrary.bounded(2))
: pointsRemaining > 0;
if (pointRead) {
assert(pointsRemaining > 0);
@@ -808,12 +652,12 @@ struct TestDriver {
reads[i].readVersion = v;
#if DEBUG_VERBOSE && !defined(NDEBUG)
if (reads[i].end.len == 0) {
fprintf(stderr, "%p Read: {%s} @ %" PRId64 "\n", this,
printable(reads[i].begin).c_str(), reads[i].readVersion);
fprintf(stderr, "Read: {%s} @ %d\n",
printable(reads[i].begin).c_str(), int(reads[i].readVersion));
} else {
fprintf(stderr, "%p Read: [%s, %s) @ %" PRId64 "\n", this,
fprintf(stderr, "Read: [%s, %s) @ %d\n",
printable(reads[i].begin).c_str(),
printable(reads[i].end).c_str(), reads[i].readVersion);
printable(reads[i].end).c_str(), int(reads[i].readVersion));
}
#endif
}
@@ -830,15 +674,7 @@ struct TestDriver {
std::latch ready{1};
std::thread thread2{[&]() {
ready.count_down();
// Call all const methods
cs.check(reads, results3, numPointReads + numRangeReads);
cs.getBytes();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
}};
ready.wait();
#endif
@@ -847,38 +683,24 @@ struct TestDriver {
cs.check(reads, results1, numPointReads + numRangeReads);
CALLGRIND_STOP_INSTRUMENTATION;
if constexpr (kEnableAssertions) {
// Call remaining const methods
cs.getBytes();
ConflictSet::MetricsV1 *m;
int count;
cs.getMetricsV1(&m, &count);
for (int i = 0; i < count; ++i) {
m[i].getValue();
}
refImpl.check(reads, results2, numPointReads + numRangeReads);
}
auto compareResults = [reads, this](ConflictSet::Result *results1,
ConflictSet::Result *results2,
int count) {
auto compareResults = [reads](ConflictSet::Result *results1,
ConflictSet::Result *results2, int count) {
for (int i = 0; i < count; ++i) {
if (results1[i] != results2[i]) {
if (reads[i].end.len == 0) {
fprintf(stderr,
"Expected %s, got %s for read of {%s} at version %" PRId64
"\n",
resultToStr(results2[i]), resultToStr(results1[i]),
printable(reads[i].begin).c_str(), reads[i].readVersion);
} else {
fprintf(
stderr,
"%p Expected %s, got %s for read of {%s} at version %" PRId64
"Expected %s, got %s for read of [%s, %s) at version %" PRId64
"\n",
(void *)this, resultToStr(results2[i]),
resultToStr(results1[i]), printable(reads[i].begin).c_str(),
reads[i].readVersion);
} else {
fprintf(stderr,
"%p Expected %s, got %s for read of [%s, %s) at version "
"%" PRId64 "\n",
(void *)this, resultToStr(results2[i]),
resultToStr(results1[i]),
resultToStr(results2[i]), resultToStr(results1[i]),
printable(reads[i].begin).c_str(),
printable(reads[i].end).c_str(), reads[i].readVersion);
}
@@ -888,13 +710,10 @@ struct TestDriver {
return true;
};
if constexpr (kEnableAssertions) {
if (!compareResults(results1, results2,
numPointReads + numRangeReads)) {
if (!compareResults(results1, results2, numPointReads + numRangeReads)) {
ok = false;
return true;
}
}
#ifdef THREAD_TEST
thread2.join();
Vendored
+5 -34
View File
@@ -48,28 +48,6 @@ pipeline {
recordIssues(tools: [clang()])
}
}
stage('64 bit versions') {
agent {
dockerfile {
args '-v /home/jenkins/ccache:/ccache'
reuseNode true
}
}
steps {
CleanBuildAndTest("-DCMAKE_CXX_FLAGS=-DUSE_64_BIT=1")
}
}
stage('Debug') {
agent {
dockerfile {
args '-v /home/jenkins/ccache:/ccache'
reuseNode true
}
}
steps {
CleanBuildAndTest("-DCMAKE_BUILD_TYPE=Debug")
}
}
stage('SIMD fallback') {
agent {
dockerfile {
@@ -128,18 +106,11 @@ pipeline {
}
}
steps {
script {
filter_args = "-f ConflictSet.cpp -f LongestCommonPrefix.h -f Metrics.h"
}
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug -DDISABLE_TSAN=ON")
sh """
gcovr ${filter_args} --cobertura > build/coverage.xml
"""
recordCoverage qualityGates: [[criticality: 'NOTE', metric: 'MODULE']], tools: [[parser: 'COBERTURA', pattern: 'build/coverage.xml']]
sh """
gcovr ${filter_args}
gcovr ${filter_args} --fail-under-line 100 > /dev/null
"""
CleanBuildAndTest("-DCMAKE_C_COMPILER=gcc -DCMAKE_CXX_COMPILER=g++ -DCMAKE_C_FLAGS=--coverage -DCMAKE_CXX_FLAGS=--coverage -DCMAKE_BUILD_TYPE=Debug")
sh '''
gcovr -f ConflictSet.cpp --cobertura > build/coverage.xml
'''
cobertura autoUpdateHealth: false, autoUpdateStability: false, coberturaReportFile: 'build/coverage.xml', conditionalCoverageTargets: '70, 0, 0', failUnhealthy: false, failUnstable: false, lineCoverageTargets: '80, 0, 0', maxNumberOfBuilds: 0, methodCoverageTargets: '80, 0, 0', onlyStable: false, sourceEncoding: 'ASCII', zoomCoverageChart: false
}
}
}
-185
View File
@@ -1,185 +0,0 @@
#pragma once
#include <assert.h>
#include <bit>
#include <stdint.h>
#include <string.h>
#ifdef HAS_AVX
#include <immintrin.h>
#elif HAS_ARM_NEON
#include <arm_neon.h>
#endif
#ifndef __SANITIZE_THREAD__
#if defined(__has_feature)
#if __has_feature(thread_sanitizer)
#define __SANITIZE_THREAD__
#endif
#endif
#endif
#if defined(HAS_AVX) || defined(HAS_ARM_NEON)
constexpr int kStride = 64;
#else
constexpr int kStride = 16;
#endif
constexpr int kUnrollFactor = 4;
inline bool compareStride(const uint8_t *ap, const uint8_t *bp) {
#if defined(HAS_ARM_NEON)
static_assert(kStride == 64);
uint8x16_t x[4]; // GCOVR_EXCL_LINE
for (int i = 0; i < 4; ++i) {
x[i] = vceqq_u8(vld1q_u8(ap + i * 16), vld1q_u8(bp + i * 16));
}
auto results = vreinterpretq_u16_u8(
vandq_u8(vandq_u8(x[0], x[1]), vandq_u8(x[2], x[3])));
bool eq = vget_lane_u64(vreinterpret_u64_u8(vshrn_n_u16(results, 4)), 0) ==
uint64_t(-1);
#elif defined(HAS_AVX)
static_assert(kStride == 64);
__m128i x[4]; // GCOVR_EXCL_LINE
for (int i = 0; i < 4; ++i) {
x[i] = _mm_cmpeq_epi8(_mm_loadu_si128((__m128i *)(ap + i * 16)),
_mm_loadu_si128((__m128i *)(bp + i * 16)));
}
auto eq =
_mm_movemask_epi8(_mm_and_si128(_mm_and_si128(x[0], x[1]),
_mm_and_si128(x[2], x[3]))) == 0xffff;
#else
// Hope it gets vectorized
auto eq = memcmp(ap, bp, kStride) == 0;
#endif
return eq;
}
// Precondition: ap[:kStride] != bp[:kStride]
inline int firstNeqStride(const uint8_t *ap, const uint8_t *bp) {
#if defined(HAS_AVX)
static_assert(kStride == 64);
uint64_t c[kStride / 16]; // GCOVR_EXCL_LINE
for (int i = 0; i < kStride; i += 16) {
const auto a = _mm_loadu_si128((__m128i *)(ap + i));
const auto b = _mm_loadu_si128((__m128i *)(bp + i));
const auto compared = _mm_cmpeq_epi8(a, b);
c[i / 16] = _mm_movemask_epi8(compared) & 0xffff;
}
return std::countr_zero(~(c[0] | c[1] << 16 | c[2] << 32 | c[3] << 48));
#elif defined(HAS_ARM_NEON)
static_assert(kStride == 64);
for (int i = 0; i < kStride; i += 16) {
// 0xff for each match
uint16x8_t results =
vreinterpretq_u16_u8(vceqq_u8(vld1q_u8(ap + i), vld1q_u8(bp + i)));
// 0xf for each mismatch
uint64_t bitfield =
~vget_lane_u64(vreinterpret_u64_u8(vshrn_n_u16(results, 4)), 0);
if (bitfield) {
return i + (std::countr_zero(bitfield) >> 2);
}
}
__builtin_unreachable(); // GCOVR_EXCL_LINE
#else
int i = 0;
for (; i < kStride - 1; ++i) {
if (*ap++ != *bp++) {
break;
}
}
return i;
#endif
}
// This gets covered in local development
// GCOVR_EXCL_START
#if defined(HAS_AVX) && !defined(__SANITIZE_THREAD__)
__attribute__((target("avx512f,avx512bw"))) inline int
longestCommonPrefix(const uint8_t *ap, const uint8_t *bp, int cl) {
int i = 0;
int end = cl & ~63;
while (i < end) {
const uint64_t eq =
_mm512_cmpeq_epi8_mask(_mm512_loadu_epi8(ap), _mm512_loadu_epi8(bp));
if (eq != uint64_t(-1)) {
return i + std::countr_one(eq);
}
i += 64;
ap += 64;
bp += 64;
}
if (i < cl) {
const uint64_t mask = (uint64_t(1) << (cl - i)) - 1;
const uint64_t eq = _mm512_cmpeq_epi8_mask(
_mm512_maskz_loadu_epi8(mask, ap), _mm512_maskz_loadu_epi8(mask, bp));
return i + std::countr_one(eq & mask);
}
assert(i == cl);
return i;
}
__attribute__((target("default")))
#endif
// GCOVR_EXCL_STOP
inline int
longestCommonPrefix(const uint8_t *ap, const uint8_t *bp, int cl) {
if (!(cl >= 0)) {
__builtin_unreachable(); // GCOVR_EXCL_LINE
}
int i = 0;
int end;
// kStride * kUnrollCount at a time
end = cl & ~(kStride * kUnrollFactor - 1);
while (i < end) {
for (int j = 0; j < kUnrollFactor; ++j) {
if (!compareStride(ap, bp)) {
return i + firstNeqStride(ap, bp);
}
i += kStride;
ap += kStride;
bp += kStride;
}
}
// kStride at a time
end = cl & ~(kStride - 1);
while (i < end) {
if (!compareStride(ap, bp)) {
return i + firstNeqStride(ap, bp);
}
i += kStride;
ap += kStride;
bp += kStride;
}
// word at a time
end = cl & ~(sizeof(uint64_t) - 1);
while (i < end) {
uint64_t a; // GCOVR_EXCL_LINE
uint64_t b; // GCOVR_EXCL_LINE
memcpy(&a, ap, 8);
memcpy(&b, bp, 8);
const auto mismatched = a ^ b;
if (mismatched) {
return i + std::countr_zero(mismatched) / 8;
}
i += 8;
ap += 8;
bp += 8;
}
// byte at a time
while (i < cl) {
if (*ap != *bp) {
break;
}
++ap;
++bp;
++i;
}
return i;
}
-64
View File
@@ -1,64 +0,0 @@
#pragma once
#include "ConflictSet.h"
#include "Internal.h"
#include <assert.h>
#include <atomic>
#include <tuple>
struct Metric {
Metric *prev;
const char *name;
const char *help;
weaselab::ConflictSet::MetricsV1::Type type;
std::atomic<int64_t> value;
protected:
Metric(Metric *&metricList, int &metricsCount, const char *name,
const char *help, weaselab::ConflictSet::MetricsV1::Type type)
: prev(std::exchange(metricList, this)), name(name), help(help),
type(type), value(0) {
++metricsCount;
}
};
struct Gauge : private Metric {
Gauge(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Gauge) {}
void set(int64_t value) {
this->value.store(value, std::memory_order_relaxed);
}
};
struct Counter : private Metric {
Counter(Metric *&metricList, int &metricsCount, const char *name,
const char *help)
: Metric(metricList, metricsCount, name, help,
weaselab::ConflictSet::MetricsV1::Counter) {}
// Expensive. Accumulate locally and then call add instead of repeatedly
// calling add.
void add(int64_t value) {
assert(value >= 0);
static_assert(std::atomic<int64_t>::is_always_lock_free);
this->value.fetch_add(value, std::memory_order_relaxed);
}
};
inline weaselab::ConflictSet::MetricsV1 *initMetrics(Metric *metricsList,
int metricsCount) {
weaselab::ConflictSet::MetricsV1 *metrics =
(weaselab::ConflictSet::MetricsV1 *)safe_malloc(metricsCount *
sizeof(metrics[0]));
for (auto [i, m] = std::make_tuple(metricsCount - 1, metricsList); i >= 0;
--i, m = m->prev) {
metrics[i].name = m->name;
metrics[i].help = m->help;
metrics[i].p = m;
metrics[i].type = m->type;
}
return metrics;
}
+74 -28
View File
@@ -1,38 +1,84 @@
A data structure for optimistic concurrency control on ranges of bitwise-lexicographically-ordered keys.
Intended as an alternative to FoundationDB's skip list.
Intended to replace FoundationDB's skip list.
Hardware for all benchmarks is an AMD Ryzen 9 7900 with (2x32GB) 5600MT/s CL28-34-34-89 1.35V RAM
Hardware for all benchmarks is a mac m1 2020.
# Microbenchmark
# FoundationDB's benchmark
## Skip list
| ns/op | op/s | err% | ins/op | cyc/op | IPC | bra/op | miss% | total | benchmark
|--------------------:|--------------------:|--------:|----------------:|----------------:|-------:|---------------:|--------:|----------:|:----------
| 172.03 | 5,812,791.77 | 0.4% | 3,130.62 | 879.00 | 3.562 | 509.23 | 0.0% | 0.01 | `point reads`
| 167.44 | 5,972,130.71 | 0.2% | 3,065.14 | 862.27 | 3.555 | 494.30 | 0.0% | 0.01 | `prefix reads`
| 238.77 | 4,188,130.84 | 0.9% | 3,589.93 | 1,259.30 | 2.851 | 637.12 | 0.0% | 0.01 | `range reads`
| 424.01 | 2,358,426.70 | 0.2% | 5,620.05 | 2,242.35 | 2.506 | 854.80 | 1.7% | 0.01 | `point writes`
| 418.45 | 2,389,780.56 | 0.4% | 5,525.07 | 2,211.05 | 2.499 | 831.71 | 1.7% | 0.01 | `prefix writes`
| 254.87 | 3,923,568.88 | 2.6% | 3,187.01 | 1,366.50 | 2.332 | 529.11 | 2.7% | 0.02 | `range writes`
| 675.96 | 1,479,374.50 | 3.3% | 7,735.41 | 3,468.60 | 2.230 | 1,386.02 | 1.8% | 0.01 | `monotonic increasing point writes`
| 137,986.20 | 7,247.10 | 0.6% | 789,752.33 | 699,462.00 | 1.129 | 144,824.14 | 0.0% | 0.01 | `worst case for radix tree`
| 21.63 | 46,231,564.03 | 1.0% | 448.00 | 107.14 | 4.181 | 84.00 | 0.0% | 0.01 | `create and destroy`
```
New conflict set: 1.957 sec
0.639 Mtransactions/sec
2.555 Mkeys/sec
Detect only: 1.845 sec
0.678 Mtransactions/sec
2.710 Mkeys/sec
Skiplist only: 1.263 sec
0.990 Mtransactions/sec
3.960 Mkeys/sec
Performance counters:
Build: 0.0546
Add: 0.0563
Detect: 1.84
D.Sort: 0.412
D.Combine: 0.0141
D.CheckRead: 0.671
D.CheckIntraBatch: 0.0068
D.MergeWrite: 0.592
D.RemoveBefore: 0.146
```
## Radix tree (this implementation)
| ns/op | op/s | err% | ins/op | cyc/op | IPC | bra/op | miss% | total | benchmark
|--------------------:|--------------------:|--------:|----------------:|----------------:|-------:|---------------:|--------:|----------:|:----------
| 12.88 | 77,653,350.77 | 0.5% | 185.37 | 64.45 | 2.876 | 41.51 | 0.4% | 0.01 | `point reads`
| 14.67 | 68,179,354.49 | 0.1% | 271.44 | 73.40 | 3.698 | 53.70 | 0.3% | 0.01 | `prefix reads`
| 34.84 | 28,701,444.36 | 0.3% | 715.74 | 175.27 | 4.084 | 127.30 | 0.2% | 0.01 | `range reads`
| 17.12 | 58,422,988.28 | 0.2% | 314.30 | 86.11 | 3.650 | 39.82 | 0.4% | 0.01 | `point writes`
| 31.42 | 31,830,804.65 | 0.1% | 591.06 | 158.07 | 3.739 | 82.67 | 0.2% | 0.01 | `prefix writes`
| 37.37 | 26,759,432.70 | 2.2% | 681.98 | 188.95 | 3.609 | 96.10 | 0.1% | 0.01 | `range writes`
| 76.72 | 13,035,140.63 | 2.3% | 1,421.28 | 387.17 | 3.671 | 257.76 | 0.1% | 0.01 | `monotonic increasing point writes`
| 297,452.00 | 3,361.89 | 0.9% | 3,508,083.00 | 1,500,834.67 | 2.337 | 727,525.33 | 0.1% | 0.01 | `worst case for radix tree`
| 87.70 | 11,402,490.60 | 1.0% | 1,795.00 | 442.09 | 4.060 | 297.00 | 0.0% | 0.01 | `create and destroy`
```
New conflict set: 1.366 sec
0.915 Mtransactions/sec
3.660 Mkeys/sec
Detect only: 1.248 sec
1.002 Mtransactions/sec
4.007 Mkeys/sec
Skiplist only: 0.573 sec
2.182 Mtransactions/sec
8.730 Mkeys/sec
Performance counters:
Build: 0.0594
Add: 0.0572
Detect: 1.25
D.Sort: 0.418
D.Combine: 0.0149
D.CheckRead: 0.232
D.CheckIntraBatch: 0.0067
D.MergeWrite: 0.341
D.RemoveBefore: 0.232
```
# Our benchmark
## Skip list
| ns/op | op/s | err% | total | benchmark
|--------------------:|--------------------:|--------:|----------:|:----------
| 246.99 | 4,048,700.59 | 0.2% | 0.01 | `point reads`
| 260.16 | 3,843,784.65 | 0.1% | 0.01 | `prefix reads`
| 493.35 | 2,026,953.19 | 0.1% | 0.01 | `range reads`
| 462.05 | 2,164,289.23 | 0.6% | 0.01 | `point writes`
| 448.19 | 2,231,205.25 | 0.9% | 0.01 | `prefix writes`
| 255.83 | 3,908,845.72 | 1.5% | 0.02 | `range writes`
| 582.63 | 1,716,349.02 | 1.3% | 0.01 | `monotonic increasing point writes`
## Radix tree (this implementation)
| ns/op | op/s | err% | total | benchmark
|--------------------:|--------------------:|--------:|----------:|:----------
| 19.42 | 51,483,206.67 | 0.3% | 0.01 | `point reads`
| 58.43 | 17,115,612.57 | 0.1% | 0.01 | `prefix reads`
| 216.09 | 4,627,766.60 | 0.2% | 0.01 | `range reads`
| 28.35 | 35,267,567.72 | 0.2% | 0.01 | `point writes`
| 43.43 | 23,026,226.17 | 0.2% | 0.01 | `prefix writes`
| 50.00 | 20,000,000.00 | 0.0% | 0.01 | `range writes`
| 92.38 | 10,824,863.69 | 4.1% | 0.01 | `monotonic increasing point writes`
# "Real data" test
@@ -41,13 +87,13 @@ Point queries only, best of three runs. Gc ratio is the ratio of time spent doin
## skip list
```
Check: 4.47891 seconds, 364.05 MB/s, Add: 4.55599 seconds, 123.058 MB/s, Gc ratio: 37.1145%
Check: 11.3385 seconds, 329.718 MB/s, Add: 5.35612 seconds, 131.072 MB/s, Gc ratio: 45.7173%
```
## radix tree
```
Check: 0.953012 seconds, 1710.94 MB/s, Add: 1.30025 seconds, 431.188 MB/s, Gc ratio: 43.9816%, Peak idle memory: 2.28375e+06
Check: 2.48583 seconds, 1503.93 MB/s, Add: 2.12768 seconds, 329.954 MB/s, Gc ratio: 41.7943%
```
## hash table
@@ -55,5 +101,5 @@ Check: 0.953012 seconds, 1710.94 MB/s, Add: 1.30025 seconds, 431.188 MB/s, Gc ra
(The hash table implementation doesn't work on range queries, and its purpose is to provide an idea of how fast point queries can be)
```
Check: 0.804094 seconds, 2027.81 MB/s, Add: 0.652952 seconds, 858.645 MB/s, Gc ratio: 35.3885%
Check: 1.83386 seconds, 2038.6 MB/s, Add: 0.601411 seconds, 1167.32 MB/s, Gc ratio: 48.9776%
```
-10
View File
@@ -129,16 +129,6 @@ int main(int argc, const char **argv) {
close(fd);
}
ConflictSet::MetricsV1 *metrics;
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
for (int i = 0; i < metricsCount; ++i) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
printf("Check: %g seconds, %g MB/s, Add: %g seconds, %g MB/s, Gc ratio: "
"%g%%, Peak idle memory: %g\n",
checkTime, checkBytes / checkTime * 1e-6, addTime,
+155
View File
@@ -0,0 +1,155 @@
#include "Internal.h"
#include <ConflictSet.h>
#include <chrono>
#include <cstring>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>
inline size_t getPageSize() {
static size_t kPageSize = sysconf(_SC_PAGESIZE);
return kPageSize;
}
/// Helper for rounding up to page size (or some other alignment)
constexpr inline size_t rightAlign(size_t offset, size_t alignment) {
return offset % alignment == 0 ? offset
: ((offset / alignment) + 1) * alignment;
}
using StringView = std::basic_string_view<uint8_t>;
inline StringView operator"" _v(const char *str, size_t size) {
return {reinterpret_cast<const uint8_t *>(str), size};
}
int main(int argc, const char **argv) {
ConflictSet cs{0};
ReferenceImpl ref{0};
for (int i = 1; i < argc; ++i) {
int fd = open(argv[i], O_RDONLY);
struct stat st;
if (fstat(fd, &st) == -1) {
int err = errno;
fprintf(stderr, "stat error %s - %s\n", argv[i], strerror(err));
fflush(stderr);
abort();
}
int64_t size = rightAlign(st.st_size, getPageSize());
const uint8_t *begin =
(uint8_t *)mmap(0, size, PROT_READ, MAP_PRIVATE, fd, 0);
madvise((void *)begin, size, MADV_SEQUENTIAL);
auto *const mapOriginal = begin;
const auto sizeOriginal = size;
StringView b;
StringView e;
int64_t v = 0;
int64_t lastWriteVersion = 0;
int64_t lastOldestVersion = 0;
std::vector<ConflictSet::WriteRange> writeRanges;
std::vector<ConflictSet::ReadRange> readRanges;
std::vector<ConflictSet::Result> results;
for (uint8_t *end = (uint8_t *)memchr(begin, '\n', size); end != nullptr;) {
StringView line{begin, static_cast<size_t>(end - begin)};
size -= end - begin + 1;
begin = end + 1;
end = (uint8_t *)memchr(begin, '\n', size);
if (line.starts_with("begin"_v)) {
b = line.substr("begin "_v.size(), line.size());
printf("b <- %.*s\n", int(b.size()), b.data());
} else if (line.starts_with("end"_v)) {
e = line.substr("end "_v.size(), line.size());
printf("e <- %.*s\n", int(e.size()), e.data());
} else if (line.starts_with("version"_v)) {
line = line.substr("version "_v.size(), line.size());
v = 0;
for (auto c : line) {
v = v * 10 + int(c) - int('0');
}
printf("v <- %" PRId64 "\n", v);
} else if (line.starts_with("pointread"_v)) {
printf("pointread\n");
ConflictSet::ReadRange r;
r.begin.p = b.data();
r.begin.len = b.size();
r.end.len = 0;
r.readVersion = v;
readRanges.push_back(r);
} else if (line.starts_with("pointwrite"_v)) {
printf("pointwrite\n");
assert(writeRanges.empty() ||
(writeRanges.back().end.len == 0 ? writeRanges.back().begin
: writeRanges.back().end) < b);
ConflictSet::WriteRange w;
w.begin.p = b.data();
w.begin.len = b.size();
w.end.len = 0;
writeRanges.push_back(w);
} else if (line.starts_with("rangeread"_v)) {
printf("rangeread\n");
ConflictSet::ReadRange r;
r.begin.p = b.data();
r.begin.len = b.size();
r.end.p = e.data();
r.end.len = e.size();
r.readVersion = v;
readRanges.push_back(r);
} else if (line.starts_with("rangewrite"_v)) {
printf("rangewrite\n");
assert(b < e);
assert(writeRanges.empty() ||
(writeRanges.back().end.len == 0 ? writeRanges.back().begin
: writeRanges.back().end) < b);
ConflictSet::WriteRange w;
w.begin.p = b.data();
w.begin.len = b.size();
w.end.p = e.data();
w.end.len = e.size();
writeRanges.push_back(w);
} else if (line.starts_with("check"_v)) {
printf("check\n");
Arena arena;
auto *expected = new (arena) ConflictSet::Result[readRanges.size()];
auto *actual = new (arena) ConflictSet::Result[readRanges.size()];
ref.check(readRanges.data(), expected, readRanges.size());
cs.check(readRanges.data(), actual, readRanges.size());
for (int i = 0; i < int(readRanges.size()); ++i) {
if (expected[i] != actual[i]) {
fprintf(stderr, "Expected %s, got %s at index %d\n",
resultToStr(expected[i]), resultToStr(actual[i]), i);
return 1;
}
}
readRanges = {};
} else if (line.starts_with("addwrites"_v)) {
printf("addwrites\n");
assert(v > lastWriteVersion);
lastWriteVersion = v;
cs.addWrites(writeRanges.data(), writeRanges.size(), v);
ref.addWrites(writeRanges.data(), writeRanges.size(), v);
writeRanges = {};
} else if (line.starts_with("setoldest"_v)) {
printf("setoldest\n");
assert(v > lastOldestVersion);
lastOldestVersion = v;
cs.setOldestVersion(v);
ref.setOldestVersion(v);
} else if (line.empty() || line.starts_with(";"_v)) {
// skip
} else {
printf("Unrecognized line: %.*s\n", int(line.size()), line.data());
}
}
munmap((void *)mapOriginal, sizeOriginal);
close(fd);
}
}
-347
View File
@@ -1,347 +0,0 @@
#include <atomic>
#include <cstdint>
#include <errno.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <string>
#include <string_view>
#include <sys/ioctl.h>
#include <sys/resource.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/uio.h>
#include <thread>
#include <unistd.h>
#include <utility>
#include <vector>
#include "ConflictSet.h"
#include "third_party/nadeau.h"
std::atomic<int64_t> transactions;
constexpr int kWindowSize = 10000000;
constexpr int kNumPrefixes = 250000;
std::string makeKey(int64_t num, int suffixLen) {
std::string result;
result.resize(sizeof(int64_t) + suffixLen);
int64_t be = __builtin_bswap64(num);
memcpy(result.data(), &be, sizeof(int64_t));
memset(result.data() + sizeof(int64_t), 0, suffixLen);
return result;
}
void workload(weaselab::ConflictSet *cs) {
int64_t version = kWindowSize;
for (int i = 0; i < kNumPrefixes; ++i) {
for (int j = 0; j < 50; ++j) {
weaselab::ConflictSet::WriteRange wr;
auto k = makeKey(i, j);
wr.begin.p = (const uint8_t *)k.data();
wr.begin.len = k.size();
wr.end.len = 0;
cs->addWrites(&wr, 1, version);
}
}
++version;
for (int i = 0; i < kNumPrefixes; ++i) {
weaselab::ConflictSet::WriteRange wr;
auto k = makeKey(i, 50);
wr.begin.p = (const uint8_t *)k.data();
wr.begin.len = k.size();
wr.end.len = 0;
cs->addWrites(&wr, 1, version);
}
std::vector<weaselab::ConflictSet::Result> results(10);
for (;; transactions.fetch_add(1, std::memory_order_relaxed)) {
std::vector<std::string> keys(10);
for (auto &k : keys) {
k = makeKey(rand() % kNumPrefixes, 49);
}
std::vector<weaselab::ConflictSet::ReadRange> reads(10);
for (int i = 0; i < reads.size(); ++i) {
reads[i].begin.p = (const uint8_t *)(keys[i].data());
reads[i].begin.len = keys[i].size();
reads[i].end.len = 0;
reads[i].readVersion = version - 1;
}
cs->check(reads.data(), results.data(), 10);
}
}
// Adapted from getaddrinfo man page
int getListenFd(const char *node, const char *service) {
struct addrinfo hints;
struct addrinfo *result, *rp;
int sfd, s;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = nullptr;
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
s = getaddrinfo(node, service, &hints, &result);
if (s != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
abort();
}
/* getaddrinfo() returns a list of address structures.
Try each address until we successfully bind(2).
If socket(2) (or bind(2)) fails, we (close the socket
and) try the next address. */
for (rp = result; rp != nullptr; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) {
continue;
}
int val = 1;
setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val));
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */
}
close(sfd);
}
freeaddrinfo(result); /* No longer needed */
if (rp == nullptr) { /* No address succeeded */
fprintf(stderr, "Could not bind\n");
abort();
}
int rv = listen(sfd, SOMAXCONN);
if (rv) {
perror("listen()");
abort();
}
return sfd;
}
// HTTP response
//
std::string_view part1 =
"HTTP/1.1 200 OK \r\nContent-type: text/plain; version=0.0.4; "
"charset=utf-8; escaping=values\r\nContent-Length: ";
// Decimal content length
std::string_view part2 = "\r\n\r\n";
// Body
double toSeconds(timeval t) {
return double(t.tv_sec) + double(t.tv_usec) * 1e-6;
}
#ifdef __linux__
#include <linux/perf_event.h>
struct PerfCounter {
PerfCounter(int type, int config, const std::string &labels = {},
int groupLeaderFd = -1)
: labels(labels) {
struct perf_event_attr pe;
memset(&pe, 0, sizeof(pe));
pe.type = type;
pe.size = sizeof(pe);
pe.config = config;
pe.inherit = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
fd = perf_event_open(&pe, 0, -1, groupLeaderFd, 0);
if (fd < 0 && errno != ENOENT && errno != EINVAL) {
perror(labels.c_str());
}
}
int64_t total() const {
int64_t count;
if (read(fd, &count, sizeof(count)) != sizeof(count)) {
perror("read instructions from perf");
abort();
}
return count;
}
PerfCounter(PerfCounter &&other)
: fd(std::exchange(other.fd, -1)), labels(std::move(other.labels)) {}
PerfCounter &operator=(PerfCounter &&other) {
fd = std::exchange(other.fd, -1);
labels = std::move(other.labels);
return *this;
}
~PerfCounter() {
if (fd >= 0) {
close(fd);
}
}
bool ok() const { return fd >= 0; }
const std::string &getLabels() const { return labels; }
int getFd() const { return fd; }
private:
int fd;
std::string labels;
static long perf_event_open(struct perf_event_attr *hw_event, pid_t pid,
int cpu, int group_fd, unsigned long flags) {
int ret;
ret = syscall(SYS_perf_event_open, hw_event, pid, cpu, group_fd, flags);
return ret;
}
};
#endif
int main(int argc, char **argv) {
if (argc != 3) {
goto fail;
}
{
int listenFd = getListenFd(argv[1], argv[2]);
weaselab::ConflictSet cs{0};
weaselab::ConflictSet::MetricsV1 *metrics;
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
#ifdef __linux__
PerfCounter instructions{PERF_TYPE_HARDWARE, PERF_COUNT_HW_INSTRUCTIONS};
PerfCounter cycles{PERF_TYPE_HARDWARE, PERF_COUNT_HW_CPU_CYCLES, "",
instructions.getFd()};
std::vector<PerfCounter> cacheCounters;
for (auto [id, idStr] : std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_L1D, "l1d"},
{PERF_COUNT_HW_CACHE_L1I, "l1i"},
{PERF_COUNT_HW_CACHE_LL, "ll"},
{PERF_COUNT_HW_CACHE_DTLB, "dtlb"},
{PERF_COUNT_HW_CACHE_ITLB, "itlb"},
{PERF_COUNT_HW_CACHE_BPU, "bpu"},
{PERF_COUNT_HW_CACHE_NODE, "node"},
}) {
for (auto [op, opStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_OP_READ, "read"},
{PERF_COUNT_HW_CACHE_OP_WRITE, "write"},
{PERF_COUNT_HW_CACHE_OP_PREFETCH, "prefetch"},
}) {
int groupLeaderFd = -1;
for (auto [result, resultStr] :
std::initializer_list<std::pair<int, std::string>>{
{PERF_COUNT_HW_CACHE_RESULT_MISS, "miss"},
{PERF_COUNT_HW_CACHE_RESULT_ACCESS, "access"},
}) {
auto labels = "{id=\"" + idStr + "\", op=\"" + opStr +
"\", result=\"" + resultStr + "\"}";
cacheCounters.emplace_back(PERF_TYPE_HW_CACHE,
id | (op << 8) | (result << 16), labels,
groupLeaderFd);
if (!cacheCounters.back().ok()) {
cacheCounters.pop_back();
} else {
if (groupLeaderFd == -1) {
groupLeaderFd = cacheCounters.back().getFd();
}
}
}
}
}
#endif
auto w = std::thread{workload, &cs};
for (;;) {
struct sockaddr_storage peer_addr = {};
socklen_t peer_addr_len = sizeof(peer_addr);
const int connfd =
accept(listenFd, (struct sockaddr *)&peer_addr, &peer_addr_len);
std::string body;
rusage r;
getrusage(RUSAGE_SELF, &r);
body += "# HELP process_cpu_seconds_total Total user and system CPU time "
"spent in seconds.\n# TYPE process_cpu_seconds_total counter\n"
"process_cpu_seconds_total ";
body += std::to_string(toSeconds(r.ru_utime) + toSeconds(r.ru_stime));
body += "\n";
body += "# HELP process_resident_memory_bytes Resident memory size in "
"bytes.\n# TYPE process_resident_memory_bytes gauge\n"
"process_resident_memory_bytes ";
body += std::to_string(getCurrentRSS());
body += "\n";
body += "# HELP transactions_total Total number of transactions\n"
"# TYPE transactions_total counter\n"
"transactions_total ";
body += std::to_string(transactions.load(std::memory_order_relaxed));
body += "\n";
#ifdef __linux__
body += "# HELP instructions_total Total number of instructions\n"
"# TYPE instructions_total counter\n"
"instructions_total ";
body += std::to_string(instructions.total());
body += "\n";
body += "# HELP cycles_total Total number of cycles\n"
"# TYPE cycles_total counter\n"
"cycles_total ";
body += std::to_string(cycles.total());
body += "\n";
body += "# HELP cache_event_total Total number of cache events\n"
"# TYPE cache_event_total counter\n";
for (const auto &counter : cacheCounters) {
body += "cache_event_total" + counter.getLabels() + " " +
std::to_string(counter.total()) + "\n";
}
#endif
for (int i = 0; i < metricsCount; ++i) {
body += "# HELP ";
body += metrics[i].name;
body += " ";
body += metrics[i].help;
body += "\n";
body += "# TYPE ";
body += metrics[i].name;
body += " ";
body += metrics[i].type == metrics[i].Counter ? "counter" : "gauge";
body += "\n";
body += metrics[i].name;
body += " ";
body += std::to_string(metrics[i].getValue());
body += "\n";
}
auto len = std::to_string(body.size());
iovec iov[] = {
{(void *)part1.data(), part1.size()},
{(void *)len.data(), len.size()},
{(void *)part2.data(), part2.size()},
{(void *)body.data(), body.size()},
};
int written;
do {
written = writev(connfd, iov, sizeof(iov) / sizeof(iov[0]));
} while (written < 0 && errno == EINTR);
close(connfd);
}
}
fail:
fprintf(stderr, "Expected ./%s <host> <port>\n", argv[0]);
return 1;
}
+76 -369
View File
@@ -16,17 +16,11 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* This source code is modified to compile outside of FoundationDB
*/
#include "ConflictSet.h"
#include "Internal.h"
#include "Metrics.h"
#include <algorithm>
#include <span>
#include <vector>
std::span<const uint8_t> keyAfter(Arena &arena, std::span<const uint8_t> key) {
auto result =
@@ -44,7 +38,7 @@ std::span<const uint8_t> copyToArena(Arena &arena,
}
using Version = int64_t;
#define force_inline inline __attribute__((always_inline))
#define force_inline __attribute__((always_inline))
using StringRef = std::span<const uint8_t>;
struct KeyRangeRef {
@@ -56,124 +50,11 @@ struct KeyRangeRef {
: begin(begin), end(keyAfter(arena, begin)) {}
};
struct KeyInfo {
StringRef key;
bool begin;
bool write;
static thread_local uint32_t g_seed = 0;
KeyInfo() = default;
KeyInfo(StringRef key, bool begin, bool write)
: key(key), begin(begin), write(write) {}
};
force_inline int extra_ordering(const KeyInfo &ki) {
return ki.begin * 2 + (ki.write ^ ki.begin);
}
// returns true if done with string
force_inline bool getCharacter(const KeyInfo &ki, int character,
int &outputCharacter) {
// normal case
if (character < ki.key.size()) {
outputCharacter = 5 + ki.key.begin()[character];
return false;
}
// termination
if (character == ki.key.size()) {
outputCharacter = 0;
return false;
}
if (character == ki.key.size() + 1) {
// end/begin+read/write relative sorting
outputCharacter = extra_ordering(ki);
return false;
}
outputCharacter = 0;
return true;
}
bool operator<(const KeyInfo &lhs, const KeyInfo &rhs) {
int i = std::min(lhs.key.size(), rhs.key.size());
int c = memcmp(lhs.key.data(), rhs.key.data(), i);
if (c != 0)
return c < 0;
// Always sort shorter keys before longer keys.
if (lhs.key.size() < rhs.key.size()) {
return true;
}
if (lhs.key.size() > rhs.key.size()) {
return false;
}
// When the keys are the same length, use the extra ordering constraint.
return extra_ordering(lhs) < extra_ordering(rhs);
}
bool operator==(const KeyInfo &lhs, const KeyInfo &rhs) {
return !(lhs < rhs || rhs < lhs);
}
struct SortTask {
int begin;
int size;
int character;
SortTask(int begin, int size, int character)
: begin(begin), size(size), character(character) {}
};
void sortPoints(std::vector<KeyInfo> &points) {
std::vector<SortTask> tasks;
std::vector<KeyInfo> newPoints;
std::vector<int> counts;
tasks.emplace_back(0, points.size(), 0);
while (tasks.size()) {
SortTask st = tasks.back();
tasks.pop_back();
if (st.size < 10) {
std::sort(points.begin() + st.begin, points.begin() + st.begin + st.size);
continue;
}
newPoints.resize(st.size);
counts.assign(256 + 5, 0);
// get counts
int c;
bool allDone = true;
for (int i = st.begin; i < st.begin + st.size; i++) {
allDone &= getCharacter(points[i], st.character, c);
counts[c]++;
}
if (allDone)
continue;
// calculate offsets from counts and build next level of tasks
int total = 0;
for (int i = 0; i < counts.size(); i++) {
int temp = counts[i];
if (temp > 1)
tasks.emplace_back(st.begin + total, temp, st.character + 1);
counts[i] = total;
total += temp;
}
// put in their places
for (int i = st.begin; i < st.begin + st.size; i++) {
getCharacter(points[i], st.character, c);
newPoints[counts[c]++] = points[i];
}
// copy back into original points array
for (int i = 0; i < st.size; i++)
points[st.begin + i] = newPoints[i];
}
static inline int skfastrand() {
g_seed = g_seed * 1664525L + 1013904223L;
return g_seed;
}
static int compare(const StringRef &a, const StringRef &b) {
@@ -201,24 +82,20 @@ struct ReadConflictRange {
}
};
static constexpr int MaxLevels = 26;
struct RandomLevel {
explicit RandomLevel(uint32_t seed) : seed(seed) {}
int next() {
int result = __builtin_clz(seed | (uint32_t(-1) >> (MaxLevels - 1)));
seed = seed * 1664525L + 1013904223L;
return result;
}
private:
uint32_t seed;
};
class SkipList {
private:
RandomLevel randomLevel{0};
static constexpr int MaxLevels = 26;
int randomLevel() const {
uint32_t i = uint32_t(skfastrand()) >> (32 - (MaxLevels - 1));
int level = 0;
while (i & 1) {
i >>= 1;
level++;
}
assert(level < MaxLevels);
return level;
}
// Represent a node in the SkipList. The node has multiple (i.e., level)
// pointers to other nodes, and keeps a record of the max versions for each
@@ -391,21 +268,13 @@ public:
}
explicit SkipList(Version version = 0) {
#if DEBUG_VERBOSE
fprintf(stderr, "skip_list: create\n");
#endif
header = Node::create(StringRef(), MaxLevels - 1);
for (int l = 0; l < MaxLevels; l++) {
header->setNext(l, nullptr);
header->setMaxVersion(l, version);
}
}
~SkipList() {
#if DEBUG_VERBOSE
fprintf(stderr, "skip_list: destroy\n");
#endif
destroy();
}
~SkipList() { destroy(); }
SkipList(SkipList &&other) noexcept : header(other.header) {
other.header = nullptr;
}
@@ -416,33 +285,27 @@ public:
}
void swap(SkipList &other) { std::swap(header, other.header); }
// Returns the change in the number of entries
int64_t addConflictRanges(const Finger *fingers, int rangeCount,
void addConflictRanges(const Finger *fingers, int rangeCount,
Version version) {
int64_t result = rangeCount;
for (int r = rangeCount - 1; r >= 0; r--) {
const Finger &startF = fingers[r * 2];
const Finger &endF = fingers[r * 2 + 1];
if (endF.found() == nullptr) {
++result;
if (endF.found() == nullptr)
insert(endF, endF.finger[0]->getMaxVersion(0));
}
result -= remove(startF, endF);
remove(startF, endF);
insert(startF, version);
}
return result;
}
// Return number of iterations of main loop
int detectConflicts(ReadConflictRange *ranges, int count,
void detectConflicts(ReadConflictRange *ranges, int count,
ConflictSet::Result *transactionConflictStatus) const {
const int M = 16;
int nextJob[M];
CheckMax inProgress[M];
if (!count)
return 0;
return;
int started = std::min(M, count);
for (int i = 0; i < started; i++) {
@@ -453,9 +316,8 @@ public:
int prevJob = started - 1;
int job = 0;
int iters = 0;
// vtune: 340 parts
for (;; ++iters) {
while (true) {
if (inProgress[job].advance()) {
if (started == count) {
if (prevJob == job)
@@ -471,7 +333,6 @@ public:
prevJob = job;
job = nextJob[job];
}
return iters;
}
void find(const StringRef *values, Finger *results, int *temp, int count) {
@@ -565,10 +426,9 @@ public:
}
private:
// Returns the number of entries removed
int64_t remove(const Finger &start, const Finger &end) {
void remove(const Finger &start, const Finger &end) {
if (start.finger[0] == end.finger[0])
return 0;
return;
Node *x = start.finger[0]->getNext(0);
@@ -577,20 +437,17 @@ private:
if (start.finger[i] != end.finger[i])
start.finger[i]->setNext(i, end.finger[i]->getNext(i));
int64_t result = 0;
while (true) {
Node *next = x->getNext(0);
x->destroy();
++result;
if (x == end.finger[0])
break;
x = next;
}
return result;
}
void insert(const Finger &f, Version version) {
int level = randomLevel.next();
int level = randomLevel();
// std::cout << std::string((const char*)value,length) << " level: " <<
// level << std::endl;
Node *x = Node::create(f.value, level);
@@ -706,27 +563,16 @@ private:
};
};
struct ReadContext {
int64_t commits_accum = 0;
int64_t conflicts_accum = 0;
int64_t too_olds_accum = 0;
int64_t check_bytes_accum = 0;
};
struct SkipListConflictSet {};
struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
Impl(int64_t oldestVersion)
: oldestVersion(oldestVersion), newestVersion(oldestVersion),
skipList(oldestVersion) {
metrics = initMetrics(metricsList, metricsCount);
}
~Impl() { safe_free(metrics, metricsCount * sizeof(metrics[0])); }
: oldestVersion(oldestVersion), skipList(oldestVersion) {}
void check(const ConflictSet::ReadRange *reads, ConflictSet::Result *results,
int count) {
ReadContext tls;
int count) const {
Arena arena;
auto *ranges = new (arena) ReadConflictRange[count];
for (int i = 0; i < count; ++i) {
tls.check_bytes_accum += reads[i].begin.len + reads[i].end.len;
ranges[i].begin = {reads[i].begin.p, size_t(reads[i].begin.len)};
ranges[i].end = reads[i].end.len > 0
? StringRef{reads[i].end.p, size_t(reads[i].end.len)}
@@ -734,60 +580,18 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
ranges[i].version = reads[i].readVersion;
results[i] = ConflictSet::Commit;
}
int iters = skipList.detectConflicts(ranges, count, results);
skipList.detectConflicts(ranges, count, results);
for (int i = 0; i < count; ++i) {
if (reads[i].readVersion < oldestVersion ||
reads[i].readVersion < newestVersion - 2e9) {
if (reads[i].readVersion < oldestVersion) {
results[i] = TooOld;
}
tls.commits_accum += results[i] == Commit;
tls.conflicts_accum += results[i] == Conflict;
tls.too_olds_accum += results[i] == TooOld;
}
range_read_iterations_total.add(iters);
range_read_total.add(count);
commits_total.add(tls.commits_accum);
conflicts_total.add(tls.conflicts_accum);
too_olds_total.add(tls.too_olds_accum);
check_bytes_total.add(tls.check_bytes_accum);
}
void addWrites(const ConflictSet::WriteRange *writes, int count,
int64_t writeVersion) {
auto points = std::vector<KeyInfo>(count * 2);
Arena arena;
for (int r = 0; r < count; r++) {
points.emplace_back(StringRef(writes[r].begin.p, writes[r].begin.len),
true, true);
points.emplace_back(
writes[r].end.len > 0
? StringRef{writes[r].end.p, size_t(writes[r].end.len)}
: keyAfter(arena, points.back().key),
false, true);
}
sortPoints(points);
int activeWriteCount = 0;
std::vector<std::pair<StringRef, StringRef>> combinedWriteConflictRanges;
for (const KeyInfo &point : points) {
if (point.write) {
if (point.begin) {
activeWriteCount++;
if (activeWriteCount == 1)
combinedWriteConflictRanges.emplace_back(point.key, StringRef());
} else /*if (point.end)*/ {
activeWriteCount--;
if (activeWriteCount == 0)
combinedWriteConflictRanges.back().second = point.key;
}
}
}
assert(writeVersion >= newestVersion);
newestVersion = writeVersion;
const int stringCount = combinedWriteConflictRanges.size() * 2;
const int stringCount = count * 2;
const int stripeSize = 16;
SkipList::Finger fingers[stripeSize];
@@ -796,33 +600,28 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
StringRef values[stripeSize];
int64_t writeVersions[stripeSize / 2];
int ss = stringCount - (stripes - 1) * stripeSize;
int64_t entryDelta = 0;
for (int s = stripes - 1; s >= 0; s--) {
for (int i = 0; i * 2 < ss; ++i) {
const auto &w = combinedWriteConflictRanges[s * stripeSize / 2 + i];
values[i * 2] = w.first;
values[i * 2 + 1] = w.second;
const auto &w = writes[s * stripeSize / 2 + i];
values[i * 2] = {w.begin.p, size_t(w.begin.len)};
values[i * 2 + 1] = w.end.len > 0
? StringRef{w.end.p, size_t(w.end.len)}
: keyAfter(arena, values[i * 2]);
keyUpdates += 3;
}
skipList.find(values, fingers, temp, ss);
entryDelta += skipList.addConflictRanges(fingers, ss / 2, writeVersion);
skipList.addConflictRanges(fingers, ss / 2, writeVersion);
ss = stripeSize;
}
// Run gc at least 200% the rate we're inserting entries
keyUpdates += std::max<int64_t>(entryDelta, 0) * 2;
}
void setOldestVersion(int64_t oldestVersion) {
// This isn't 100% accurate. It overcounts if you hit the end
gc_iterations_total.add(keyUpdates);
assert(oldestVersion >= this->oldestVersion);
this->oldestVersion = oldestVersion;
SkipList::Finger finger;
int temp;
std::span<const uint8_t> key = removalKey;
skipList.find(&key, &finger, &temp, 1);
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 0));
skipList.removeBefore(oldestVersion, finger, std::exchange(keyUpdates, 10));
removalArena = Arena();
removalKey = copyToArena(
removalArena, {finger.getValue().data(), finger.getValue().size()});
@@ -830,149 +629,54 @@ struct __attribute__((visibility("hidden"))) ConflictSet::Impl {
int64_t totalBytes = 0;
MetricsV1 *metrics;
int metricsCount = 0;
Metric *metricsList = nullptr;
#define GAUGE(name, help) \
Gauge name { metricsList, metricsCount, #name, help }
#define COUNTER(name, help) \
Counter name { metricsList, metricsCount, #name, help }
// ==================== METRICS DEFINITIONS ====================
COUNTER(range_read_total, "Total number of range reads checked");
COUNTER(range_read_iterations_total,
"Total number of iterations of the main loops for range read checks");
COUNTER(commits_total,
"Total number of checks where the result is \"commit\"");
COUNTER(conflicts_total,
"Total number of checks where the result is \"conflict\"");
COUNTER(too_olds_total,
"Total number of checks where the result is \"too old\"");
COUNTER(check_bytes_total, "Total number of key bytes checked");
GAUGE(memory_bytes, "Total number of bytes in use");
COUNTER(nodes_allocated_total,
"The total number of physical tree nodes allocated");
COUNTER(nodes_released_total,
"The total number of physical tree nodes released");
COUNTER(insert_iterations_total,
"The total number of iterations of the main loop for insertion. "
"Includes searches where the entry already existed, and so insertion "
"did not take place");
COUNTER(entries_inserted_total,
"The total number of entries inserted in the tree");
COUNTER(entries_erased_total,
"The total number of entries erased from the tree");
COUNTER(
gc_iterations_total,
"The total number of iterations of the main loop for garbage collection");
COUNTER(write_bytes_total, "Total number of key bytes in calls to addWrites");
GAUGE(oldest_version,
"The lowest version that doesn't result in \"TooOld\" for checks");
GAUGE(newest_version, "The version of the most recent call to addWrites");
// ==================== END METRICS DEFINITIONS ====================
#undef GAUGE
#undef COUNTER
void getMetricsV1(MetricsV1 **metrics, int *count) {
*metrics = this->metrics;
*count = metricsCount;
}
private:
int64_t keyUpdates = 0;
int64_t keyUpdates = 10;
Arena removalArena;
std::span<const uint8_t> removalKey;
int64_t oldestVersion;
int64_t newestVersion;
SkipList skipList;
};
// Internal entry points. Public entry points should just delegate to these
void internal_check(ConflictSet::Impl *impl,
const ConflictSet::ReadRange *reads,
ConflictSet::Result *results, int count) {
impl->check(reads, results, count);
}
void internal_addWrites(ConflictSet::Impl *impl,
const ConflictSet::WriteRange *writes, int count,
int64_t writeVersion) {
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
}
#endif
}
void internal_setOldestVersion(ConflictSet::Impl *impl, int64_t oldestVersion) {
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
impl->memory_bytes.set(impl->totalBytes);
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
}
#endif
}
ConflictSet::Impl *internal_create(int64_t oldestVersion) {
mallocBytesDelta = 0;
auto *result = new (safe_malloc(sizeof(ConflictSet::Impl)))
ConflictSet::Impl{oldestVersion};
result->totalBytes += mallocBytesDelta;
return result;
}
void internal_destroy(ConflictSet::Impl *impl) {
impl->~Impl();
safe_free(impl, sizeof(ConflictSet::Impl));
}
int64_t internal_getBytes(ConflictSet::Impl *impl) { return impl->totalBytes; }
void internal_getMetricsV1(ConflictSet::Impl *impl,
ConflictSet::MetricsV1 **metrics, int *count) {
return impl->getMetricsV1(metrics, count);
}
double internal_getMetricValue(const ConflictSet::MetricsV1 *metric) {
return ((Metric *)metric->p)->value.load(std::memory_order_relaxed);
}
void ConflictSet::check(const ReadRange *reads, Result *results,
int count) const {
internal_check(impl, reads, results, count);
impl->check(reads, results, count);
}
void ConflictSet::addWrites(const WriteRange *writes, int count,
int64_t writeVersion) {
internal_addWrites(impl, writes, count, writeVersion);
mallocBytesDelta = 0;
impl->addWrites(writes, count, writeVersion);
impl->totalBytes += mallocBytesDelta;
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
}
#endif
}
void ConflictSet::setOldestVersion(int64_t oldestVersion) {
internal_setOldestVersion(impl, oldestVersion);
mallocBytesDelta = 0;
impl->setOldestVersion(oldestVersion);
impl->totalBytes += mallocBytesDelta;
#if SHOW_MEMORY
if (impl->totalBytes != mallocBytes) {
abort();
}
#endif
}
int64_t ConflictSet::getBytes() const { return internal_getBytes(impl); }
void ConflictSet::getMetricsV1(MetricsV1 **metrics, int *count) const {
return internal_getMetricsV1(impl, metrics, count);
}
double ConflictSet::MetricsV1::getValue() const {
return internal_getMetricValue(this);
}
int64_t ConflictSet::getBytes() const { return impl->totalBytes; }
ConflictSet::ConflictSet(int64_t oldestVersion)
: impl(internal_create(oldestVersion)) {}
: impl((mallocBytesDelta = 0,
new(safe_malloc(sizeof(Impl))) Impl{oldestVersion})) {
impl->totalBytes += mallocBytesDelta;
}
ConflictSet::~ConflictSet() {
if (impl) {
internal_destroy(impl);
impl->~Impl();
safe_free(impl, sizeof(Impl));
}
}
@@ -993,34 +697,37 @@ extern "C" {
__attribute__((__visibility__("default"))) void
ConflictSet_check(void *cs, const ConflictSet_ReadRange *reads,
ConflictSet_Result *results, int count) {
internal_check((ConflictSet::Impl *)cs, reads, results, count);
((ConflictSet::Impl *)cs)->check(reads, results, count);
}
__attribute__((__visibility__("default"))) void
ConflictSet_addWrites(void *cs, const ConflictSet_WriteRange *writes, int count,
int64_t writeVersion) {
internal_addWrites((ConflictSet::Impl *)cs, writes, count, writeVersion);
((ConflictSet::Impl *)cs)->addWrites(writes, count, writeVersion);
}
__attribute__((__visibility__("default"))) void
ConflictSet_setOldestVersion(void *cs, int64_t oldestVersion) {
internal_setOldestVersion((ConflictSet::Impl *)cs, oldestVersion);
((ConflictSet::Impl *)cs)->setOldestVersion(oldestVersion);
}
__attribute__((__visibility__("default"))) void *
ConflictSet_create(int64_t oldestVersion) {
return internal_create(oldestVersion);
return new (safe_malloc(sizeof(ConflictSet::Impl)))
ConflictSet::Impl{oldestVersion};
}
__attribute__((__visibility__("default"))) void ConflictSet_destroy(void *cs) {
internal_destroy((ConflictSet::Impl *)cs);
using Impl = ConflictSet::Impl;
((Impl *)cs)->~Impl();
safe_free(cs, sizeof(Impl));
}
__attribute__((__visibility__("default"))) int64_t
ConflictSet_getBytes(void *cs) {
return internal_getBytes((ConflictSet::Impl *)cs);
using Impl = ConflictSet::Impl;
return ((Impl *)cs)->totalBytes;
}
}
#if SHOW_MEMORY
struct __attribute__((visibility("default"))) PeakPrinter {
~PeakPrinter() {
printf("--- skip_list ---\n");
printf("malloc bytes: %g\n", double(mallocBytes));
printf("Peak malloc bytes: %g\n", double(peakMallocBytes));
}
+4 -55
View File
@@ -3,68 +3,17 @@
#include <fstream>
#include <sstream>
#ifndef PERF_TEST
#define PERF_TEST 0
#endif
int main(int argc, char **argv) {
for (int i = 1; i < argc; ++i) {
std::ifstream t(argv[i], std::ios::binary);
std::stringstream buffer;
buffer << t.rdbuf();
auto str = buffer.str();
Arbitrary arbitrary({(const uint8_t *)str.data(), str.size()});
TestDriver<ConflictSet, !PERF_TEST> driver1{arbitrary};
TestDriver<ConflictSet, !PERF_TEST> driver2{arbitrary};
bool done1 = false;
bool done2 = false;
for (;;) {
if (!done1) {
done1 = driver1.next();
if (!driver1.ok) {
TestDriver<ConflictSet> driver{(const uint8_t *)str.data(), str.size()};
while (!driver.next())
;
if (!driver.ok) {
abort();
}
}
if (!done2) {
done2 = driver2.next();
if (!driver2.ok) {
abort();
}
}
if (done1 && done2) {
break;
}
}
{
ConflictSet::MetricsV1 *metrics;
int metricsCount;
driver1.cs.getMetricsV1(&metrics, &metricsCount);
printf("#################### METRICS for ConflictSet 1 for %s "
"####################\n",
argv[i]);
for (int i = 0; i < metricsCount; ++i) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
puts("");
}
{
ConflictSet::MetricsV1 *metrics;
int metricsCount;
driver2.cs.getMetricsV1(&metrics, &metricsCount);
printf("#################### METRICS for ConflictSet 2 for %s "
"####################\n",
argv[i]);
for (int i = 0; i < metricsCount; ++i) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
puts("");
}
}
}
-10
View File
@@ -1,10 +0,0 @@
__aarch64_ldadd8_relax
__getauxval@GLIBC_2.17
__stack_chk_fail@GLIBC_2.17
__stack_chk_guard@GLIBC_2.17
abort@GLIBC_2.17
free@GLIBC_2.17
malloc@GLIBC_2.17
memcpy@GLIBC_2.17
memmove@GLIBC_2.17
memset@GLIBC_2.17
-2
View File
@@ -13,7 +13,5 @@ __ZN8weaselab11ConflictSetC2Ex
__ZN8weaselab11ConflictSetD1Ev
__ZN8weaselab11ConflictSetD2Ev
__ZN8weaselab11ConflictSetaSEOS0_
__ZNK8weaselab11ConflictSet12getMetricsV1EPPNS0_9MetricsV1EPi
__ZNK8weaselab11ConflictSet5checkEPKNS0_9ReadRangeEPNS0_6ResultEi
__ZNK8weaselab11ConflictSet8getBytesEv
__ZNK8weaselab11ConflictSet9MetricsV18getValueEv
-3
View File
@@ -1,5 +1,3 @@
___stack_chk_fail
___stack_chk_guard
__tlv_bootstrap
_abort
_bzero
@@ -7,4 +5,3 @@ _free
_malloc
_memcpy
_memmove
dyld_stub_binder
-138
View File
@@ -1,138 +0,0 @@
import ctypes
import enum
import os
from typing import Optional
class _Key(ctypes.Structure):
_fields_ = [("p", ctypes.POINTER(ctypes.c_ubyte)), ("len", ctypes.c_int)]
class ReadRange(ctypes.Structure):
_fields_ = [
("begin", _Key),
("end", _Key),
("readVersion", ctypes.c_int64),
]
class WriteRange(ctypes.Structure):
_fields_ = [("begin", _Key), ("end", _Key)]
class Result(enum.Enum):
COMMIT = 0
CONFLICT = 1
TOO_OLD = 2
def write(begin: bytes, end: Optional[bytes] = None) -> WriteRange:
b = (ctypes.c_ubyte * len(begin)).from_buffer(bytearray(begin))
if end is None:
e = (ctypes.c_ubyte * 0)()
else:
e = (ctypes.c_ubyte * len(end)).from_buffer(bytearray(end))
return WriteRange(_Key(b, len(b)), _Key(e, len(e)))
def read(version: int, begin: bytes, end: Optional[bytes] = None) -> ReadRange:
b = (ctypes.c_ubyte * len(begin)).from_buffer(bytearray(begin))
if end is None:
e = (ctypes.c_ubyte * 0)()
else:
e = (ctypes.c_ubyte * len(end)).from_buffer(bytearray(end))
return ReadRange(_Key(b, len(b)), _Key(e, len(e)), version)
class ConflictSet:
def __init__(
self,
version: int = 0,
build_dir: Optional[str] = None,
implementation: Optional[str] = None,
) -> None:
self._lib = None
if build_dir is None:
build_dir = os.path.dirname(__file__) + "/build"
if implementation is None:
implementation = "radix_tree"
for f in (
build_dir + "/" + implementation + "/libconflict-set.so.0",
os.path.dirname(__file__)
+ "/build/"
+ implementation
+ "/libconflict-set.0.dylib",
):
try:
self._lib = ctypes.cdll.LoadLibrary(f)
except:
pass
if self._lib is None:
import sys
print(
"Could not find libconflict-set implementation " + implementation,
file=sys.stderr,
)
sys.exit(1)
self._lib.ConflictSet_create.argtypes = (ctypes.c_int64,)
self._lib.ConflictSet_create.restype = ctypes.c_void_p
self._lib.ConflictSet_check.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(ReadRange),
ctypes.POINTER(ctypes.c_int),
ctypes.c_int,
)
self._lib.ConflictSet_addWrites.argtypes = (
ctypes.c_void_p,
ctypes.POINTER(WriteRange),
ctypes.c_int,
ctypes.c_int64,
)
self._lib.ConflictSet_setOldestVersion.argtypes = (
ctypes.c_void_p,
ctypes.c_int64,
)
self._lib.ConflictSet_destroy.argtypes = (ctypes.c_void_p,)
self._lib.ConflictSet_getBytes.argtypes = (ctypes.c_void_p,)
self._lib.ConflictSet_getBytes.restype = ctypes.c_int64
self.p = self._lib.ConflictSet_create(version)
def addWrites(self, version: int, *writes: WriteRange):
self._lib.ConflictSet_addWrites(
self.p, (WriteRange * len(writes))(*writes), len(writes), version
)
def check(self, *reads: ReadRange) -> list[Result]:
r = (ctypes.c_int * len(reads))()
self._lib.ConflictSet_check(
self.p, (ReadRange * len(reads))(*reads), r, len(reads)
)
return [Result(x) for x in r]
def setOldestVersion(self, version: int) -> None:
self._lib.ConflictSet_setOldestVersion(self.p, version)
def getBytes(self) -> int:
return self._lib.ConflictSet_getBytes(self.p)
def __enter__(self):
return self
def close(self) -> None:
if self.p is not None:
self._lib.ConflictSet_destroy(self.p)
self.p = None
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
-11
View File
@@ -1,7 +1,6 @@
#include "ConflictSet.h"
#include <cassert>
#include <cstdio>
using namespace weaselab;
@@ -22,14 +21,4 @@ int main(void) {
assert(result == ConflictSet::Conflict);
int64_t bytes = cs.getBytes();
assert(bytes > 0);
ConflictSet::MetricsV1 *metrics;
int metricsCount;
cs.getMetricsV1(&metrics, &metricsCount);
for (int i = 0; i < metricsCount; ++i) {
printf("# HELP %s %s\n", metrics[i].name, metrics[i].help);
printf("# TYPE %s %s\n", metrics[i].name,
metrics[i].type == metrics[i].Counter ? "counter" : "gauge");
printf("%s %g\n", metrics[i].name, metrics[i].getValue());
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

Some files were not shown because too many files have changed in this diff Show More