From de6f38694f02fc6dff1fb6ee6f7cb46628edf12f Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Fri, 12 Sep 2025 18:31:57 -0400 Subject: [PATCH] std::unique_ptr -> Ref --- src/connection.hpp | 5 ++- src/connection_handler.hpp | 13 +++--- src/connection_registry.cpp | 59 ++++++++++++------------- src/connection_registry.hpp | 12 +++-- src/http_handler.cpp | 8 ++-- src/http_handler.hpp | 7 ++- src/pipeline_entry.hpp | 14 +++--- src/server.cpp | 30 ++++++------- src/server.hpp | 13 +++--- tests/test_http_handler.cpp | 6 +-- tests/test_server_connection_return.cpp | 4 +- 11 files changed, 79 insertions(+), 92 deletions(-) diff --git a/src/connection.hpp b/src/connection.hpp index 1aca340..155c581 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -259,7 +259,7 @@ struct Connection { * } * * void on_data_arrived(std::string_view data, - * std::unique_ptr& conn_ptr) override { + * Ref& conn_ptr) override { * auto* state = static_cast(conn_ptr->user_data); * // Use state for protocol processing... * } @@ -333,6 +333,9 @@ private: size_t epoll_index, ConnectionHandler *handler, WeakRef server); + template + friend Ref make_ref(Args &&...args); + // Networking interface - only accessible by Server int readBytes(char *buf, size_t buffer_size); bool writeBytes(); diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 8491255..c364376 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -1,9 +1,10 @@ #pragma once -#include #include #include +#include "reference.hpp" + // Forward declaration to avoid circular dependency struct Connection; @@ -39,8 +40,7 @@ public: * after the call to on_data_arrived. * @note May be called from an arbitrary server thread. */ - virtual void on_data_arrived(std::string_view /*data*/, - std::unique_ptr &) {}; + virtual void on_data_arrived(std::string_view /*data*/, Ref &) {}; /** * Called when data has been successfully written to the connection. @@ -55,7 +55,7 @@ public: * @note May be called from an arbitrary server thread. * @note Called during writes, not necessarily when buffer becomes empty */ - virtual void on_write_progress(std::unique_ptr &) {} + virtual void on_write_progress(Ref &) {} /** * Called when the connection's outgoing write buffer becomes empty. @@ -72,7 +72,7 @@ public: * @note May be called from an arbitrary server thread. * @note Only called on transitions from non-empty → empty buffer */ - virtual void on_write_buffer_drained(std::unique_ptr &) {} + virtual void on_write_buffer_drained(Ref &) {} /** * Called when a new connection is established. @@ -107,6 +107,5 @@ public: * * @param batch A span of unique_ptrs to the connections in the batch. */ - virtual void - on_batch_complete(std::span> /*batch*/) {} + virtual void on_batch_complete(std::span> /*batch*/) {} }; diff --git a/src/connection_registry.cpp b/src/connection_registry.cpp index 3a7bd82..af98f8f 100644 --- a/src/connection_registry.cpp +++ b/src/connection_registry.cpp @@ -1,6 +1,5 @@ #include "connection_registry.hpp" #include "connection.hpp" -#include #include #include #include @@ -14,49 +13,49 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) { } max_fds_ = rlim.rlim_cur; - // Calculate size rounded up to page boundary - size_t array_size = max_fds_ * sizeof(Connection *); - size_t page_size = getpagesize(); - size_t aligned_size = (array_size + page_size - 1) & ~(page_size - 1); + // // Calculate size rounded up to page boundary + // size_t array_size = max_fds_ * sizeof(Connection *); + // size_t page_size = getpagesize(); + // size_t aligned_size = (array_size + page_size - 1) & ~(page_size - 1); - // Allocate virtual address space using mmap - // MAP_ANONYMOUS provides zero-initialized pages on-demand (lazy allocation) - connections_ = static_cast *>( - mmap(nullptr, aligned_size, PROT_READ | PROT_WRITE, - MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)); + // // Allocate virtual address space using mmap + // // MAP_ANONYMOUS provides zero-initialized pages on-demand (lazy + // allocation) connections_ = static_cast *>( + // mmap(nullptr, aligned_size, PROT_READ | PROT_WRITE, + // MAP_PRIVATE | MAP_ANONYMOUS, -1, 0)); - if (connections_ == MAP_FAILED) { - perror("mmap"); - std::abort(); - } + // if (connections_ == MAP_FAILED) { + // perror("mmap"); + // std::abort(); + // } - // Store aligned size for munmap - aligned_size_ = aligned_size; + // // Store aligned size for munmap + // aligned_size_ = aligned_size; + connections_ = new Ref[max_fds_]; } ConnectionRegistry::~ConnectionRegistry() { - if (connections_ != nullptr) { - for (int fd = 0; fd < static_cast(max_fds_); ++fd) { - delete connections_[fd].load(std::memory_order_relaxed); - } - if (munmap(connections_, aligned_size_) == -1) { - perror("munmap"); - } - } + delete[] connections_; + // if (connections_ != nullptr) { + // for (int fd = 0; fd < static_cast(max_fds_); ++fd) { + // delete connections_[fd].load(std::memory_order_relaxed); + // } + // if (munmap(connections_, aligned_size_) == -1) { + // perror("munmap"); + // } + // } } -void ConnectionRegistry::store(int fd, std::unique_ptr connection) { +void ConnectionRegistry::store(int fd, Ref connection) { if (fd < 0 || static_cast(fd) >= max_fds_) { std::abort(); } - // Release ownership from unique_ptr and store raw pointer - connections_[fd].store(connection.release(), std::memory_order_release); + connections_[fd] = std::move(connection); } -std::unique_ptr ConnectionRegistry::remove(int fd) { +Ref ConnectionRegistry::remove(int fd) { if (fd < 0 || static_cast(fd) >= max_fds_) { std::abort(); } - return std::unique_ptr( - connections_[fd].exchange(nullptr, std::memory_order_acquire)); + return std::move(connections_[fd]); } diff --git a/src/connection_registry.hpp b/src/connection_registry.hpp index 03ae0ef..7fb1e29 100644 --- a/src/connection_registry.hpp +++ b/src/connection_registry.hpp @@ -1,10 +1,11 @@ #pragma once #include -#include #include #include +#include "reference.hpp" + struct Connection; /** @@ -38,7 +39,7 @@ public: * @param fd File descriptor (must be valid and < max_fds_) * @param connection unique_ptr to the connection (ownership transferred) */ - void store(int fd, std::unique_ptr connection); + void store(int fd, Ref connection); /** * Remove a connection from the registry and transfer ownership to caller. @@ -47,7 +48,7 @@ public: * @param fd File descriptor * @return unique_ptr to the connection, or nullptr if not found */ - std::unique_ptr remove(int fd); + Ref remove(int fd); /** * Get the maximum number of file descriptors supported. @@ -63,10 +64,7 @@ public: ConnectionRegistry &operator=(ConnectionRegistry &&) = delete; private: - std::atomic - *connections_; ///< mmap'd array of raw connection pointers. It's - ///< thread-safe without since epoll_ctl happens before - ///< epoll_wait, but this makes tsan happy /shrug. + Ref *connections_; size_t max_fds_; ///< Maximum file descriptor limit size_t aligned_size_; ///< Page-aligned size for munmap }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index d348b3d..0631ba7 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -76,8 +76,7 @@ void HttpHandler::on_connection_closed(Connection &conn) { conn.user_data = nullptr; } -void HttpHandler::on_write_buffer_drained( - std::unique_ptr &conn_ptr) { +void HttpHandler::on_write_buffer_drained(Ref &conn_ptr) { // Reset arena after all messages have been written for the next request auto *state = static_cast(conn_ptr->user_data); if (state) { @@ -89,8 +88,7 @@ void HttpHandler::on_write_buffer_drained( on_connection_established(*conn_ptr); } -void HttpHandler::on_batch_complete( - std::span> batch) { +void HttpHandler::on_batch_complete(std::span> batch) { // Collect commit, status, and health check requests for pipeline processing int pipeline_count = 0; @@ -147,7 +145,7 @@ void HttpHandler::on_batch_complete( } void HttpHandler::on_data_arrived(std::string_view data, - std::unique_ptr &conn_ptr) { + Ref &conn_ptr) { auto *state = static_cast(conn_ptr->user_data); if (!state) { send_error_response(*conn_ptr, 500, "Internal server error", true); diff --git a/src/http_handler.hpp b/src/http_handler.hpp index f671799..b80bd9e 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -135,10 +135,9 @@ struct HttpHandler : ConnectionHandler { void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; void on_data_arrived(std::string_view data, - std::unique_ptr &conn_ptr) override; - void on_write_buffer_drained(std::unique_ptr &conn_ptr) override; - void on_batch_complete( - std::span> /*batch*/) override; + Ref &conn_ptr) override; + void on_write_buffer_drained(Ref &conn_ptr) override; + void on_batch_complete(std::span> /*batch*/) override; // llhttp callbacks (public for HttpConnectionState access) static int onUrl(llhttp_t *parser, const char *at, size_t length); diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index dc35654..0b78359 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -9,14 +9,13 @@ * Contains connection with parsed CommitRequest. */ struct CommitEntry { - std::unique_ptr connection; + Ref connection; int64_t assigned_version = 0; // Set by sequence stage bool resolve_success = false; // Set by resolve stage bool persist_success = false; // Set by persist stage CommitEntry() = default; // Default constructor for variant - explicit CommitEntry(std::unique_ptr conn) - : connection(std::move(conn)) {} + explicit CommitEntry(Ref conn) : connection(std::move(conn)) {} }; /** @@ -24,12 +23,11 @@ struct CommitEntry { * then transfer to status threadpool. */ struct StatusEntry { - std::unique_ptr connection; + Ref connection; int64_t version_upper_bound = 0; // Set by sequence stage StatusEntry() = default; // Default constructor for variant - explicit StatusEntry(std::unique_ptr conn) - : connection(std::move(conn)) {} + explicit StatusEntry(Ref conn) : connection(std::move(conn)) {} }; /** @@ -38,10 +36,10 @@ struct StatusEntry { * Resolve stage can perform configurable CPU work for benchmarking. */ struct HealthCheckEntry { - std::unique_ptr connection; + Ref connection; HealthCheckEntry() = default; // Default constructor for variant - explicit HealthCheckEntry(std::unique_ptr conn) + explicit HealthCheckEntry(Ref conn) : connection(std::move(conn)) {} }; diff --git a/src/server.cpp b/src/server.cpp index dd03ff4..9670e4a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -139,7 +138,7 @@ void Server::shutdown() { } } -void Server::release_back_to_server(std::unique_ptr connection) { +void Server::release_back_to_server(Ref connection) { if (!connection) { return; // Nothing to release } @@ -154,7 +153,7 @@ void Server::release_back_to_server(std::unique_ptr connection) { // unique_ptr destructs } -void Server::receiveConnectionBack(std::unique_ptr connection) { +void Server::receiveConnectionBack(Ref connection) { if (!connection) { return; // Nothing to process } @@ -216,9 +215,9 @@ int Server::create_local_connection() { epoll_fds_.size(); // Create Connection object - auto connection = std::unique_ptr(new Connection( + auto connection = make_ref( addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed), - epoll_index, &handler_, self_.copy())); + epoll_index, &handler_, self_.copy()); // Store in registry connection_registry_.store(server_fd, std::move(connection)); @@ -316,8 +315,7 @@ void Server::start_io_threads(std::vector &threads) { int epollfd = get_epoll_for_thread(thread_id); std::vector events(config_.server.event_batch_size); - std::vector> batch( - config_.server.event_batch_size); + std::vector> batch(config_.server.event_batch_size); std::vector batch_events(config_.server.event_batch_size); std::vector ready_listen_fds; // Reused across iterations to avoid allocation @@ -351,7 +349,7 @@ void Server::start_io_threads(std::vector &threads) { // Handle existing connection events int fd = events[i].data.fd; - std::unique_ptr conn = connection_registry_.remove(fd); + Ref conn = connection_registry_.remove(fd); assert(conn); if (events[i].events & (EPOLLERR | EPOLLHUP)) { @@ -419,10 +417,10 @@ void Server::start_io_threads(std::vector &threads) { // Transfer ownership from registry to batch processing size_t epoll_index = thread_id % epoll_fds_.size(); - batch[batch_count] = std::unique_ptr(new Connection( + batch[batch_count] = make_ref( addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), - epoll_index, &handler_, self_.copy())); + epoll_index, &handler_, self_.copy()); batch_events[batch_count] = EPOLLIN; // New connections always start with read batch_count++; @@ -449,8 +447,7 @@ void Server::start_io_threads(std::vector &threads) { } } -void Server::process_connection_reads(std::unique_ptr &conn, - int events) { +void Server::process_connection_reads(Ref &conn, int events) { assert(conn); // Handle EPOLLIN - read data and process it if (events & EPOLLIN) { @@ -481,8 +478,7 @@ void Server::process_connection_reads(std::unique_ptr &conn, } } -void Server::process_connection_writes(std::unique_ptr &conn, - int /*events*/) { +void Server::process_connection_writes(Ref &conn, int /*events*/) { assert(conn); // For simplicity, we always attempt to write when an event fires. We could be // more precise and skip the write if we detect that we've already seen EAGAIN @@ -521,9 +517,9 @@ void Server::process_connection_writes(std::unique_ptr &conn, } } -void Server::process_connection_batch( - int epollfd, std::span> batch, - std::span events) { +void Server::process_connection_batch(int epollfd, + std::span> batch, + std::span events) { // First process writes for each connection for (int i = 0; i < static_cast(batch.size()); ++i) { diff --git a/src/server.hpp b/src/server.hpp index 128d433..dd3a205 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -106,7 +106,7 @@ struct Server { * * @param connection unique_ptr to the connection being released back */ - static void release_back_to_server(std::unique_ptr connection); + static void release_back_to_server(Ref connection); private: friend struct Connection; @@ -158,14 +158,11 @@ private: int get_epoll_for_thread(int thread_id) const; // Helper for processing connection I/O - void process_connection_reads(std::unique_ptr &conn_ptr, - int events); - void process_connection_writes(std::unique_ptr &conn_ptr, - int events); + void process_connection_reads(Ref &conn_ptr, int events); + void process_connection_writes(Ref &conn_ptr, int events); // Helper for processing a batch of connections with their events - void process_connection_batch(int epollfd, - std::span> batch, + void process_connection_batch(int epollfd, std::span> batch, std::span events); /** @@ -176,7 +173,7 @@ private: * * @param connection Unique pointer to the connection being released back */ - void receiveConnectionBack(std::unique_ptr connection); + void receiveConnectionBack(Ref connection); // Make non-copyable and non-movable Server(const Server &) = delete; diff --git a/tests/test_http_handler.cpp b/tests/test_http_handler.cpp index f9499bf..2b23aba 100644 --- a/tests/test_http_handler.cpp +++ b/tests/test_http_handler.cpp @@ -32,11 +32,11 @@ struct MockConnectionHandler : public ConnectionHandler { bool write_progress_called = false; bool write_buffer_drained_called = false; - void on_write_progress(std::unique_ptr &) override { + void on_write_progress(Ref &) override { write_progress_called = true; } - void on_write_buffer_drained(std::unique_ptr &) override { + void on_write_buffer_drained(Ref &) override { write_buffer_drained_called = true; } }; @@ -50,7 +50,7 @@ TEST_CASE("ConnectionHandler hooks") { CHECK_FALSE(handler.write_buffer_drained_called); // Would normally be called by Server during write operations - std::unique_ptr null_conn; + Ref null_conn; handler.on_write_progress(null_conn); handler.on_write_buffer_drained(null_conn); diff --git a/tests/test_server_connection_return.cpp b/tests/test_server_connection_return.cpp index f619743..a8b360f 100644 --- a/tests/test_server_connection_return.cpp +++ b/tests/test_server_connection_return.cpp @@ -11,7 +11,7 @@ PERFETTO_TRACK_EVENT_STATIC_STORAGE(); struct Message { - std::unique_ptr conn; + Ref conn; std::string data; bool done; }; @@ -27,7 +27,7 @@ public: : pipeline(pipeline) {} void on_data_arrived(std::string_view data, - std::unique_ptr &conn_ptr) override { + Ref &conn_ptr) override { assert(conn_ptr); auto guard = pipeline.push(1, true); for (auto &message : guard.batch) {