From 1a85e91113d27138b186dce5f4d773110c8e538c Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Fri, 22 Aug 2025 13:10:26 -0400 Subject: [PATCH] Add test for releasing connections back to the server --- CMakeLists.txt | 26 +++++++ design.md | 11 ++- src/ThreadPipeline.h | 4 +- src/connection.cpp | 12 ---- src/connection.hpp | 8 --- src/connection_registry.cpp | 14 ++-- src/connection_registry.hpp | 9 ++- src/server.cpp | 30 +++++--- tests/test_server_connection_return.cpp | 96 +++++++++++++++++++++++++ 9 files changed, 168 insertions(+), 42 deletions(-) create mode 100644 tests/test_server_connection_return.cpp diff --git a/CMakeLists.txt b/CMakeLists.txt index e71a0f5..a74bc9d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -178,6 +178,30 @@ target_include_directories(test_http_handler PRIVATE src) target_compile_definitions(test_http_handler PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN) +add_executable( + test_server_connection_return + tests/test_server_connection_return.cpp + src/server.cpp + src/connection.cpp + src/connection_registry.cpp + src/arena_allocator.cpp + src/config.cpp + src/http_handler.cpp + ${CMAKE_BINARY_DIR}/json_tokens.cpp) +add_dependencies(test_server_connection_return generate_json_tokens) +target_link_libraries( + test_server_connection_return + doctest::doctest + llhttp_static + Threads::Threads + toml11::toml11 + perfetto + weaseljson + simdutf::simdutf) +target_include_directories(test_server_connection_return PRIVATE src) +target_compile_definitions(test_server_connection_return + PRIVATE DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN) + add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp src/arena_allocator.cpp) target_link_libraries(bench_arena_allocator nanobench) @@ -218,6 +242,8 @@ add_test(NAME arena_allocator_tests COMMAND test_arena_allocator) add_test(NAME connection_registry_tests COMMAND test_connection_registry) add_test(NAME commit_request_tests COMMAND test_commit_request) add_test(NAME http_handler_tests COMMAND test_http_handler) +add_test(NAME server_connection_return_tests + COMMAND test_server_connection_return) add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator) add_test(NAME commit_request_benchmarks COMMAND bench_commit_request) add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison) diff --git a/design.md b/design.md index 5176159..302d28c 100644 --- a/design.md +++ b/design.md @@ -298,7 +298,16 @@ This write-side component is designed to integrate with: - **Configuration**: All configuration is TOML-based using `config.toml` (see `config.md`) - **Testing Strategy**: Run unit tests, benchmarks, and debug tools before submitting changes - **Build System**: CMake generates gperf hash tables at build time; always use ninja -- **Test Synchronization**: NEVER use sleep() for test synchronization - it makes tests slow and flaky. Use proper synchronization primitives like std::latch (C++20), condition variables, or promises/futures instead +- **Test Synchronization**: + - **ABSOLUTELY NEVER use sleep(), std::this_thread::sleep_for(), or any timeout-based waiting in tests** + - **NEVER use condition_variable.wait_for() or other timeout variants** + - Use deterministic synchronization only: + - **Blocking I/O** (blocking read/write calls that naturally wait) + - **condition_variable.wait()** with no timeout (waits indefinitely until condition is met) + - **std::latch, std::barrier, futures/promises** for coordination + - **RAII guards and resource management** for cleanup + - Tests should either pass (correct) or hang forever (indicates real bug to investigate) + - No timeouts, no flaky behavior, no false positives/negatives --- diff --git a/src/ThreadPipeline.h b/src/ThreadPipeline.h index dc26a40..155654a 100644 --- a/src/ThreadPipeline.h +++ b/src/ThreadPipeline.h @@ -315,8 +315,8 @@ public: // none available) Returns: StageGuard with batch of items to process [[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0, bool mayBlock = true) { - assert(stage < threadState.size()); - assert(thread < threadState[stage].size()); + assert(stage < int(threadState.size())); + assert(thread < int(threadState[stage].size())); auto batch = acquireHelper(stage, thread, maxBatch, mayBlock); return StageGuard{std::move(batch), &threadState[stage][thread]}; } diff --git a/src/connection.cpp b/src/connection.cpp index 91dfd32..75dafb5 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -131,15 +131,3 @@ bool Connection::writeBytes() { return false; } - -void Connection::tsan_acquire() { -#if __has_feature(thread_sanitizer) - tsan_sync_.load(std::memory_order_acquire); -#endif -} - -void Connection::tsan_release() { -#if __has_feature(thread_sanitizer) - tsan_sync_.store(0, std::memory_order_release); -#endif -} diff --git a/src/connection.hpp b/src/connection.hpp index 060075c..a757dac 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -2,7 +2,6 @@ #include "arena_allocator.hpp" #include "connection_handler.hpp" -#include #include #include #include @@ -352,8 +351,6 @@ private: // Networking interface - only accessible by Server int readBytes(char *buf, size_t buffer_size); bool writeBytes(); - void tsan_acquire(); - void tsan_release(); // Direct access methods for Server int getFd() const { return fd_; } @@ -374,9 +371,4 @@ private: // Whether or not to close the connection after completing writing the // response bool closeConnection_{false}; - - // TSAN support for epoll synchronization -#if __has_feature(thread_sanitizer) - std::atomic tsan_sync_; -#endif }; diff --git a/src/connection_registry.cpp b/src/connection_registry.cpp index a6ae0cb..5ff013f 100644 --- a/src/connection_registry.cpp +++ b/src/connection_registry.cpp @@ -1,5 +1,6 @@ #include "connection_registry.hpp" #include "connection.hpp" +#include #include #include #include @@ -19,7 +20,7 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) { // Allocate virtual address space using mmap // MAP_ANONYMOUS provides zero-initialized pages on-demand (lazy allocation) - connections_ = static_cast( + connections_ = static_cast *>( mmap(nullptr, aligned_size, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)); @@ -34,7 +35,7 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) { ConnectionRegistry::~ConnectionRegistry() { if (connections_ != nullptr) { for (size_t fd = 0; fd < max_fds_; ++fd) { - delete connections_[fd]; + delete connections_[fd].load(std::memory_order_relaxed); } if (munmap(connections_, aligned_size_) == -1) { perror("munmap"); @@ -47,16 +48,13 @@ void ConnectionRegistry::store(int fd, std::unique_ptr connection) { abort(); } // Release ownership from unique_ptr and store raw pointer - connections_[fd] = connection.release(); + connections_[fd].store(connection.release(), std::memory_order_release); } std::unique_ptr ConnectionRegistry::remove(int fd) { if (fd < 0 || static_cast(fd) >= max_fds_) { abort(); } - - Connection *conn = connections_[fd]; - connections_[fd] = nullptr; - // Transfer ownership back to unique_ptr - return std::unique_ptr(conn); + return std::unique_ptr( + connections_[fd].exchange(nullptr, std::memory_order_acquire)); } diff --git a/src/connection_registry.hpp b/src/connection_registry.hpp index db380e2..1a64544 100644 --- a/src/connection_registry.hpp +++ b/src/connection_registry.hpp @@ -63,7 +63,10 @@ public: ConnectionRegistry &operator=(ConnectionRegistry &&) = delete; private: - Connection **connections_; ///< mmap'd array of raw connection pointers - size_t max_fds_; ///< Maximum file descriptor limit - size_t aligned_size_; ///< Page-aligned size for munmap + std::atomic + *connections_; ///< mmap'd array of raw connection pointers. It's + ///< thread-safe without since epoll_ctl happens before + ///< epoll_wait, but this makes tsan happy /shrug. + size_t max_fds_; ///< Maximum file descriptor limit + size_t aligned_size_; ///< Page-aligned size for munmap }; diff --git a/src/server.cpp b/src/server.cpp index 17244ae..5d501ea 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -42,6 +42,13 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler, throw std::runtime_error("Failed to set provided listen fd non-blocking"); } } + + // Setup shutdown pipe for graceful shutdown + setup_shutdown_pipe(); + + // Create epoll instances immediately for createLocalConnection() support + create_epoll_instances(); + // If empty vector provided, listen_fds_ will be empty (no listening) // Server works purely with createLocalConnection() } @@ -78,8 +85,7 @@ Server::~Server() { } void Server::run() { - setup_shutdown_pipe(); - create_epoll_instances(); + // Shutdown pipe and epoll instances are now created in constructor // Create I/O threads locally in this call frame // CRITICAL: By owning threads in run()'s call frame, we guarantee they are @@ -140,7 +146,6 @@ void Server::receiveConnectionBack(std::unique_ptr connection) { event.events = EPOLLOUT | EPOLLONESHOT; } - connection->tsan_release(); int fd = connection->getFd(); event.data.fd = fd; @@ -159,7 +164,7 @@ void Server::receiveConnectionBack(std::unique_ptr connection) { int Server::createLocalConnection() { int sockets[2]; - if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sockets) != 0) { + if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) { perror("socketpair"); return -1; } @@ -167,6 +172,18 @@ int Server::createLocalConnection() { int server_fd = sockets[0]; // Server keeps this end int client_fd = sockets[1]; // Return this end to caller + int flags = fcntl(server_fd, F_GETFL, 0); + if (flags == -1) { + perror("fcntl F_GETFL on provided listen fd"); + throw std::runtime_error( + "Failed to get flags for server side of local connection"); + } + if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) { + perror("fcntl F_SETFL O_NONBLOCK on provided listen fd"); + throw std::runtime_error( + "Failed to set server side of local connection to non-blocking"); + } + // Create sockaddr_storage for the connection struct sockaddr_storage addr{}; addr.ss_family = AF_UNIX; @@ -304,9 +321,7 @@ void Server::start_io_threads(std::vector &threads) { // Handle existing connection events int fd = events[i].data.fd; std::unique_ptr conn = connection_registry_.remove(fd); - if (conn) { - conn->tsan_acquire(); - } + assert(conn); if (events[i].events & (EPOLLERR | EPOLLHUP)) { // unique_ptr will automatically delete on scope exit @@ -468,7 +483,6 @@ void Server::process_connection_batch( event.events = EPOLLOUT | EPOLLONESHOT; } - conn_ptr->tsan_release(); event.data.fd = fd; // Use file descriptor for epoll // Put connection back in registry since handler didn't take ownership. // Must happen before epoll_ctl diff --git a/tests/test_server_connection_return.cpp b/tests/test_server_connection_return.cpp new file mode 100644 index 0000000..5e9674c --- /dev/null +++ b/tests/test_server_connection_return.cpp @@ -0,0 +1,96 @@ +#include "../src/ThreadPipeline.h" +#include "config.hpp" +#include "connection.hpp" +#include "perfetto_categories.hpp" +#include "server.hpp" +#include +#include + +// Perfetto static storage for tests +PERFETTO_TRACK_EVENT_STATIC_STORAGE(); + +struct Message { + std::unique_ptr conn; + std::string data; + bool done; +}; + +struct EchoHandler : public ConnectionHandler { +private: + ThreadPipeline &pipeline; + +public: + explicit EchoHandler(ThreadPipeline &pipeline) + : pipeline(pipeline) {} + + void on_data_arrived(std::string_view data, + std::unique_ptr &conn_ptr) override { + assert(conn_ptr); + auto guard = pipeline.push(1, true); + for (auto &message : guard.batch) { + message.conn = std::move(conn_ptr); + message.data = data; + message.done = false; + } + } +}; + +TEST_CASE("Echo server with connection ownership transfer") { + weaseldb::Config config; + config.server.io_threads = 1; + config.server.epoll_instances = 1; + + ThreadPipeline pipeline{10, {1}}; + EchoHandler handler{pipeline}; + auto echoThread = std::thread{[&]() { + for (;;) { + auto guard = pipeline.acquire(0, 0); + for (auto &message : guard.batch) { + bool done = message.done; + if (done) { + return; + } + assert(message.conn); + message.conn->appendMessage(message.data); + Server::releaseBackToServer(std::move(message.conn)); + } + } + }}; + + // Create server with NO listen sockets (empty vector) + auto server = Server::create(config, handler, {}); + + std::thread server_thread([&server]() { server->run(); }); + + // Create local connection + int client_fd = server->createLocalConnection(); + REQUIRE(client_fd > 0); + + // Write some test data + const char *test_message = "Hello, World!"; + ssize_t bytes_written = write(client_fd, test_message, strlen(test_message)); + REQUIRE(bytes_written == strlen(test_message)); + + // Read the echoed response + char buffer[1024] = {0}; + ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer) - 1); + if (bytes_read == -1) { + perror("read failed"); + } + REQUIRE(bytes_read == strlen(test_message)); + + // Verify we got back exactly what we sent + CHECK(std::string(buffer, bytes_read) == std::string(test_message)); + + // Cleanup + close(client_fd); + server->shutdown(); + server_thread.join(); + { + auto guard = pipeline.push(1, true); + for (auto &message : guard.batch) { + message.done = true; + } + } + echoThread.join(); +}