diff --git a/commit_pipeline.md b/commit_pipeline.md index 98e7ca3..dafdfd3 100644 --- a/commit_pipeline.md +++ b/commit_pipeline.md @@ -167,8 +167,9 @@ bool HttpHandler::process_release_batch(BatchType &batch) { if (!conn) { return true; // Shutdown signal } - // Return connection to server for further processing or cleanup - Server::release_back_to_server(std::move(conn)); + // Connection is server-owned - respond to client and connection + // remains managed by server's connection registry + // TODO: Implement response sending with new server-owned connection model } return false; // Continue processing } @@ -180,12 +181,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) { ```cpp // 4-stage pipeline: sequence -> resolve -> persist -> release -// TODO: Update pipeline type from std::unique_ptr to PipelineEntry variant +// Pipeline with PipelineEntry variant instead of connection ownership transfer StaticThreadPipeline WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1> commitPipeline{lg_size}; -// Pipeline entry type (to be implemented) +// Pipeline entry type for server-owned connection model using PipelineEntry = std::variant; ``` @@ -228,7 +229,7 @@ for (auto &conn : guard.batch) { Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`: ```cpp -void HttpHandler::on_batch_complete(std::span> batch) { +void HttpHandler::on_batch_complete(std::span batch) { // Collect commit requests that passed basic validation for 4-stage pipeline processing int commit_count = 0; for (auto &conn : batch) { @@ -305,7 +306,7 @@ std::visit([&](auto&& entry) { - Failed CommitEntries are passed through the pipeline with error information - Downstream stages skip processing for error connections but forward them - Error responses are sent when connection reaches release stage -- Connection ownership is always transferred to ensure cleanup +- Server-owned connections ensure proper cleanup and response handling ### Pipeline Integrity @@ -327,7 +328,7 @@ std::visit([&](auto&& entry) { - **Single-Pass Processing**: Each connection flows through all stages once - **Streaming Design**: Stages process concurrently -- **Minimal Copying**: Connection ownership transfer, not data copying +- **Minimal Copying**: Request processing with server-owned connections - **Direct Response**: Release stage triggers immediate response transmission ### Scalability Characteristics @@ -345,7 +346,7 @@ private: static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries // 4-stage pipeline configuration - StaticThreadPipeline, + StaticThreadPipeline commitPipeline{lg_size}; ``` @@ -362,8 +363,8 @@ The pipeline processes different types of entries using a variant/union type sys ### Pipeline Entry Variants -- **CommitEntry**: Contains `std::unique_ptr` with CommitRequest and connection state -- **StatusEntry**: Contains `std::unique_ptr` with StatusRequest (transferred to status threadpool after sequence) +- **CommitEntry**: Contains connection reference/ID with CommitRequest and connection state +- **StatusEntry**: Contains connection reference/ID with StatusRequest (transferred to status threadpool after sequence) - **ShutdownEntry**: Signals pipeline shutdown to all stages - **Future types**: Pipeline design supports additional entry types @@ -472,7 +473,7 @@ void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState The pipeline integrates with the HTTP handler at two points: 1. **Entry**: `on_batch_complete()` feeds connections into sequence stage -1. **Exit**: Release stage calls `Server::release_back_to_server()` +1. **Exit**: Release stage responds to clients with server-owned connections ### Persistence Layer Integration diff --git a/design.md b/design.md index 8a5da7b..e48329c 100644 --- a/design.md +++ b/design.md @@ -243,19 +243,19 @@ CommitRequest { #### Connection Ownership Lifecycle -1. **Creation**: Accept threads create connections, transfer to epoll as raw pointers -1. **Processing**: Network threads claim ownership by wrapping in unique_ptr -1. **Handler Transfer**: Handlers can take ownership for async processing via unique_ptr.release() -1. **Return Path**: Handlers use Server::release_back_to_server() to return connections -1. **Safety**: All transfers use weak_ptr to server for safe cleanup -1. **Cleanup**: RAII ensures proper resource cleanup in all scenarios +1. **Creation**: Server creates connections and stores them in registry +1. **Processing**: I/O threads access connections via registry lookup +1. **Handler Access**: Handlers receive Connection& references, server retains ownership +1. **Async Processing**: Handlers use WeakRef for safe async access +1. **Safety**: Connection mutex synchronizes concurrent access between threads +1. **Cleanup**: RAII ensures proper resource cleanup when connections are destroyed #### Arena Memory Lifecycle -1. **Request Processing**: Handler uses `conn->get_arena()` to allocate memory for parsing request data -1. **Response Generation**: Handler uses arena for temporary response construction (headers, JSON, etc.) -1. **Response Queuing**: Handler calls `conn->append_message()` which copies data to arena-backed message queue -1. **Response Writing**: Server writes all queued messages to socket via `writeBytes()` +1. **Request Processing**: Handler creates request-scoped arena for parsing request data +1. **Response Generation**: Handler uses same arena for response construction (headers, JSON, etc.) +1. **Response Queuing**: Handler calls `conn->append_message()` passing span + arena ownership +1. **Response Writing**: I/O thread writes messages to socket, arena freed after completion > **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written. diff --git a/src/arena.hpp b/src/arena.hpp index 3c7096f..87ce709 100644 --- a/src/arena.hpp +++ b/src/arena.hpp @@ -65,21 +65,19 @@ * at a time * * ### Thread Ownership Model: - * 1. **Network Thread**: Claims connection ownership, accesses arena for I/O - * buffers - * 2. **Handler Thread**: Can take ownership via unique_ptr.release(), uses - * arena for request parsing and response generation - * 3. **Background Thread**: Can receive ownership for async processing, uses - * arena for temporary data structures - * 4. **Return Path**: Connection (and its arena) safely returned via - * Server::release_back_to_server() + * 1. **I/O Thread**: Server owns connections, processes socket I/O events + * 2. **Handler Thread**: Receives Connection& reference, creates request-scoped + * arenas for parsing and response generation + * 3. **Pipeline Thread**: Can use WeakRef for async processing, + * creates own arenas for temporary data structures + * 4. **Arena Lifecycle**: Request-scoped arenas moved to message queue, freed + * after I/O completion without holding connection mutex * * ### Why This Design is Thread-Safe: - * - **Exclusive Access**: Only the current owner thread should access the arena - * - **Transfer Points**: Ownership transfers happen at well-defined - * synchronization points with proper memory barriers. - * - **No Shared State**: Each arena is completely isolated - no shared data - * between different arena instances + * - **Request-Scoped**: Each request gets its own Arena instance for isolation + * - **Move Semantics**: Arenas transferred via move, avoiding shared access + * - **Deferred Cleanup**: Arena destruction deferred to avoid malloc contention + * while holding connection mutex * * @warning Do not share Arena instances between threads. Use separate * instances per thread or per logical unit of work. diff --git a/src/connection.cpp b/src/connection.cpp index b9b8fe1..d036822 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -4,9 +4,10 @@ #include #include #include +#include #include "metric.hpp" -#include "server.hpp" // Need this for release_back_to_server implementation +#include "server.hpp" // Need this for server reference namespace { // Thread-local metric instances @@ -35,11 +36,13 @@ thread_local auto write_eagain_failures = // Static thread-local storage for iovec buffer static thread_local std::vector g_iovec_buffer{IOV_MAX}; +// Thread-local storage for arenas to be freed after unlocking +static thread_local std::vector g_arenas_to_free; Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, size_t epoll_index, ConnectionHandler *handler, WeakRef server) - : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(), + : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), handler_(handler), server_(std::move(server)) { auto server_ref = server_.lock(); // This should only be called from a member of Server itself, so I should @@ -75,15 +78,47 @@ Connection::~Connection() { // EINTR ignored - fd is guaranteed closed on Linux } -void Connection::append_message(std::string_view s, bool copy_to_arena) { - if (copy_to_arena) { - char *arena_str = arena_.allocate(s.size()); - std::memcpy(arena_str, s.data(), s.size()); - messages_.emplace_back(arena_str, s.size()); - } else { - messages_.push_back(s); +void Connection::append_message(std::span data_parts, + Arena arena, bool close_after_send) { + // Calculate total bytes for this message. Don't need to hold the lock yet. + size_t total_bytes = 0; + for (const auto &part : data_parts) { + total_bytes += part.size(); + } + + std::unique_lock lock(mutex_); + + if (is_closed_) { + return; // Connection is closed, ignore message + } + + // Check if queue was empty to determine if we need to enable EPOLLOUT + bool was_empty = message_queue_.empty(); + + // Add message to queue + message_queue_.emplace_back( + Message{std::move(arena), data_parts, close_after_send}); + outgoing_bytes_queued_ += total_bytes; + + // If this message has close_after_send flag, set connection flag + if (close_after_send) { + close_after_send_ = true; + } + + lock.unlock(); + + // If queue was empty, we need to add EPOLLOUT interest. We don't need to hold + // the lock + if (was_empty) { + auto server = server_.lock(); + if (server) { + // Add EPOLLOUT interest - pipeline thread manages epoll + struct epoll_event event; + event.data.fd = fd_; + event.events = EPOLLIN | EPOLLOUT; + epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); + } } - outgoing_bytes_queued_ += s.size(); } int Connection::readBytes(char *buf, size_t buffer_size) { @@ -115,27 +150,47 @@ int Connection::readBytes(char *buf, size_t buffer_size) { bool Connection::writeBytes() { ssize_t total_bytes_written = 0; - while (!messages_.empty()) { - // Build iovec array up to IOV_MAX limit using thread-local vector - assert(g_iovec_buffer.size() == IOV_MAX); - struct iovec *iov = g_iovec_buffer.data(); + + while (true) { + // Build iovec array while holding mutex using thread-local buffer int iov_count = 0; + { + std::lock_guard lock(mutex_); - for (auto it = messages_.begin(); - it != messages_.end() && iov_count < IOV_MAX; ++it) { - const auto &msg = *it; - iov[iov_count] = { - const_cast(static_cast(msg.data())), - msg.size()}; - iov_count++; - } + if (is_closed_ || message_queue_.empty()) { + break; + } - assert(iov_count > 0); + // Build iovec array up to IOV_MAX limit using thread-local vector + assert(g_iovec_buffer.size() == IOV_MAX); + struct iovec *iov = g_iovec_buffer.data(); + for (auto &message : message_queue_) { + if (iov_count >= IOV_MAX) + break; + + for (const auto &part : message.data_parts) { + if (iov_count >= IOV_MAX) + break; + if (part.empty()) + continue; + + iov[iov_count] = { + const_cast(static_cast(part.data())), + part.size()}; + iov_count++; + } + } + + if (iov_count == 0) + break; + } // Release mutex during I/O + + // Perform I/O without holding mutex ssize_t w; for (;;) { struct msghdr msg = {}; - msg.msg_iov = iov; + msg.msg_iov = g_iovec_buffer.data(); msg.msg_iovlen = iov_count; w = sendmsg(fd_, &msg, MSG_NOSIGNAL); @@ -146,7 +201,6 @@ bool Connection::writeBytes() { if (errno == EAGAIN) { // Increment EAGAIN failure metric write_eagain_failures.inc(); - // Increment bytes written metric before returning if (total_bytes_written > 0) { bytes_written.inc(total_bytes_written); } @@ -161,30 +215,67 @@ bool Connection::writeBytes() { assert(w > 0); total_bytes_written += w; - // Handle partial writes by updating string_view data/size - size_t bytes_written = static_cast(w); - outgoing_bytes_queued_ -= bytes_written; - while (bytes_written > 0 && !messages_.empty()) { - auto &front = messages_.front(); + // Handle partial writes by updating message data_parts + { + std::lock_guard lock(mutex_); + outgoing_bytes_queued_ -= w; + size_t bytes_remaining = static_cast(w); - if (bytes_written >= front.size()) { - // This message is completely written - bytes_written -= front.size(); - messages_.pop_front(); - } else { - // Partial write of this message - update string_view - front = std::string_view(front.data() + bytes_written, - front.size() - bytes_written); - bytes_written = 0; + while (bytes_remaining > 0 && !message_queue_.empty()) { + auto &front_message = message_queue_.front(); + bool message_complete = true; + + for (auto &part : front_message.data_parts) { + if (part.empty()) + continue; + + if (bytes_remaining >= part.size()) { + // This part is completely written + bytes_remaining -= part.size(); + part = std::string_view(); // Mark as consumed + } else { + // Partial write of this part + part = std::string_view(part.data() + bytes_remaining, + part.size() - bytes_remaining); + bytes_remaining = 0; + message_complete = false; + break; + } + } + + if (message_complete) { + // Move arena to thread-local vector for deferred cleanup + g_arenas_to_free.emplace_back(std::move(front_message.arena)); + message_queue_.pop_front(); + } else { + break; + } + } + } + } + + // Check if queue is empty and remove EPOLLOUT interest + { + std::lock_guard lock(mutex_); + if (message_queue_.empty()) { + auto server = server_.lock(); + if (server) { + struct epoll_event event; + event.data.fd = fd_; + event.events = EPOLLIN; // Remove EPOLLOUT + epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } } } - assert(messages_.empty()); // Increment bytes written metric if (total_bytes_written > 0) { bytes_written.inc(total_bytes_written); } + // Clean up arenas after all mutex operations are complete + // This avoids holding the connection mutex while free() potentially contends + g_arenas_to_free.clear(); + return false; } diff --git a/src/connection.hpp b/src/connection.hpp index 155c581..73265fc 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -3,6 +3,8 @@ #include #include #include +#include +#include #include #include #include @@ -15,33 +17,31 @@ #define __has_feature(x) 0 #endif -/** - * Represents a single client connection with efficient memory management. - * - * Connection ownership model: - * - Created by I/O thread, processed immediately, then transferred to epoll via - * raw pointer - * - I/O threads claim ownership by wrapping raw pointer in unique_ptr - * - I/O thread optionally passes ownership to a thread pipeline - * - Owner eventually transfers back to epoll by releasing unique_ptr to raw - * pointer - * - RAII cleanup happens if I/O thread doesn't transfer back - * - * Arena allocator thread safety: - * Each Connection contains its own Arena instance that is accessed - * exclusively by the thread that currently owns the connection. This ensures - * thread safety without requiring locks: - * - Arena is used by the owning thread for I/O buffers, request parsing, and - * response generation - * - Arena memory is automatically freed when the connection is destroyed - * - reset() should only be called by the current owner thread - * - * Only the handler interface methods are public - all networking details are - * private. - */ // Forward declaration struct Server; +/** + * Represents a single client connection with thread-safe concurrent access. + * + * Connection ownership model: + * - Server owns all connections + * - Handlers receive Connection& references, and can keep a WeakRef to + * Connection for async responses. + * - Multiple pipeline threads can safely access connection concurrently + * - I/O thread has exclusive access to socket operations + * + * Threading model: + * - Single mutex protects all connection state + * - Pipeline threads call Connection methods (append_message, etc.) + * - I/O thread processes socket events and message queue + * - Pipeline threads manage epoll interests via Connection methods + * - Connection tracks closed state to prevent EBADF errors + * + * Arena allocator usage: + * - Request-scoped arenas created by handlers for each request + * - No connection-owned arena for parsing/response generation + * - Message queue stores spans + owning arenas until I/O completion + */ struct Connection { // No public constructor or factory method - only Server can create // connections @@ -64,90 +64,63 @@ struct Connection { // Handler interface - public methods that handlers can use /** - * @brief Queue a message to be sent to the client. + * @brief Queue an atomic message to be sent to the client. * - * Adds data to the connection's outgoing message queue. The data will be sent - * asynchronously by the server's I/O threads using efficient vectored - * I/O. + * Adds a complete message with all associated data to the connection's + * outgoing message queue. The message will be sent asynchronously by a + * server I/O thread using efficient vectored I/O. * - * @param s The data to send (string view parameter for efficiency) - * @param copy_to_arena If true (default), copies data to the connection's - * arena for safe storage. If false, the caller must ensure the data remains - * valid until all queued messages are sent. + * @param data_parts Span of string_views pointing to arena-allocated data + * @param arena Arena that owns all the memory referenced by data_parts + * @param close_after_send Whether to close connection after sending this + * message * - * @warning Thread Safety: Only call from the thread that currently owns this - * connection. The arena allocator is not thread-safe. + * @note Thread Safety: This method is thread-safe and can be called + * concurrently from multiple pipeline threads. * - * @note Performance: Use copy_to_arena=false for static strings or data with - * guaranteed lifetime, copy_to_arena=true for temporary/dynamic data. + * @note The memory referenced by the data_parts span, must outlive @p arena. + * The arena will be moved and kept alive until the message is fully sent. * * Example usage: * ```cpp - * conn->append_message("HTTP/1.1 200 OK\r\n\r\n", false); // Static string - * conn->append_message(dynamic_response, true); // Dynamic data - * conn->append_message(arena_allocated_data, false); // Arena data + * Arena arena; + * auto* parts = arena.allocate(2); + * parts[0] = build_header(arena); + * parts[1] = build_body(arena); + * conn.append_message({parts, 2}, std::move(arena)); * ``` */ - void append_message(std::string_view s, bool copy_to_arena = true); + void append_message(std::span data_parts, Arena arena, + bool close_after_send = false); /** - * @brief Mark the connection to be closed after sending all queued messages. + * @brief Get a WeakRef to this connection for async operations. * - * Sets a flag that instructs the server to close this connection gracefully - * after all currently queued messages have been successfully sent to the - * client. This enables proper connection cleanup for protocols like HTTP/1.0 - * or when implementing connection limits. + * Returns a WeakRef that can be safely used to access this connection + * from other threads, such as pipeline processing threads. The WeakRef + * allows safe access even if the connection might be destroyed by the + * time the async operation executes. * - * @note The connection will remain active until: - * 1. All queued messages are sent to the client - * 2. The server processes the close flag during the next I/O cycle - * 3. The connection is properly closed and cleaned up + * @return WeakRef to this connection * - * @warning Thread Safety: Only call from the thread that currently owns this - * connection. + * @note Thread Safety: This method is thread-safe. * - * Typical usage: + * @note The WeakRef should be used with lock() to safely access the + * connection. If lock() returns null, the connection has been destroyed. + * + * Example usage: * ```cpp - * conn->append_message("HTTP/1.1 200 OK\r\n\r\nBye!"); - * conn->close_after_send(); // Close after sending response + * auto weak_conn = conn.get_weak_ref(); + * async_processor.submit([weak_conn, request_data]() { + * if (auto conn = weak_conn.lock()) { + * Arena arena; + * auto response = process_request(request_data, arena); + * conn->append_message({&response, 1}, std::move(arena)); + * } + * }); * ``` */ - void close_after_send() { closeConnection_ = true; } - - /** - * @brief Get access to the connection's arena allocator. - * - * Returns a reference to this connection's private Arena instance, - * which should be used for all temporary allocations during request - * processing. The arena provides extremely fast allocation (~1ns) and - * automatic cleanup when the connection is destroyed or reset. - * - * @return Reference to the connection's arena allocator - * - * @warning Thread Safety: Only access from the thread that currently owns - * this connection. The arena allocator is not thread-safe and concurrent - * access will result in undefined behavior. - * - * @note Memory Lifecycle: Arena memory is automatically freed when: - * - The connection is destroyed - * - reset() is called (keeps first block, frees others) - * - The connection is moved (arena ownership transfers) - * - * Best practices: - * ```cpp - * Arena& arena = conn->get_arena(); - * - * // Allocate temporary parsing buffers - * char* buffer = arena.allocate(1024); - * - * // Construct temporary objects - * auto* request = arena.construct(arena); - * - * // Use arena-backed STL containers - * std::vector> tokens{&arena}; - * ``` - */ - Arena &get_arena() { return arena_; } + WeakRef get_weak_ref() const { return self_ref_.copy(); } /** * @brief Get the unique identifier for this connection. @@ -210,11 +183,14 @@ struct Connection { * ``` */ int64_t outgoing_bytes_queued() const { + std::lock_guard lock(mutex_); #ifndef NDEBUG // Debug build: validate counter accuracy int64_t computed_total = 0; - for (auto s : messages_) { - computed_total += s.size(); + for (const auto &message : message_queue_) { + for (const auto &part : message.data_parts) { + computed_total += part.size(); + } } assert( outgoing_bytes_queued_ == computed_total && @@ -268,50 +244,14 @@ struct Connection { */ void *user_data = nullptr; - /** - * Reset the connection's arena allocator and message queue for reuse. - * - * This method efficiently reclaims arena memory by keeping the first block - * and freeing all others, then reinitializes the message queue. - * - * @warning Thread Safety: This method should ONLY be called by the thread - * that currently owns this connection. Calling reset() while the connection - * is being transferred between threads or accessed by another thread will - * result in undefined behavior. - * - * @note The assert(messages_.empty()) ensures all outgoing data has been - * sent before resetting. This prevents data loss and indicates the connection - * is in a clean state for reuse. - * - * Typical usage pattern: - * - HTTP handlers call this after completing a request/response cycle - */ - void reset() { - assert(messages_.empty()); - outgoing_bytes_queued_ = 0; - arena_.reset(); - messages_ = - std::deque>{ - ArenaStlAllocator{&arena_}}; - } - - /** - * @note Ownership Transfer: To release a connection back to the server for - * continued processing, use the static method: - * ```cpp - * Server::release_back_to_server(std::move(connection_ptr)); - * ``` - * - * This is the correct way to return connection ownership when: - * - A handler has taken ownership via unique_ptr.release() - * - Background processing of the connection is complete - * - The connection should resume normal server-managed I/O processing - * - * The method is thread-safe and handles the case where the server may have - * been destroyed while the connection was being processed elsewhere. - */ - private: + struct Message { + Arena arena; // Owns all the memory (movable) + std::span data_parts; // Points to arena-allocated memory + // (mutable for partial writes) + bool close_after_send = false; // Close connection after sending + }; + // Server is a friend and can access all networking internals friend struct Server; @@ -340,26 +280,31 @@ private: int readBytes(char *buf, size_t buffer_size); bool writeBytes(); - // Direct access methods for Server + // Direct access methods for Server (must hold mutex) int getFd() const { return fd_; } - bool has_messages() const { return !messages_.empty(); } - bool should_close() const { return closeConnection_; } + bool has_messages() const { return !message_queue_.empty(); } + bool should_close() const { return close_after_send_; } size_t getEpollIndex() const { return epoll_index_; } + + // Server can set self-reference after creation + void setSelfRef(WeakRef self) { self_ref_ = std::move(self); } + + // Immutable connection properties const int fd_; const int64_t id_; const size_t epoll_index_; // Index of the epoll instance this connection uses struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6 - Arena arena_; ConnectionHandler *handler_; - WeakRef server_; // Weak reference to server for safe cleanup + WeakRef server_; // Weak reference to server for safe cleanup + WeakRef self_ref_; // WeakRef to self for get_weak_ref() - std::deque> messages_{ - ArenaStlAllocator{&arena_}}; - - // Counter tracking total bytes queued for transmission - int64_t outgoing_bytes_queued_{0}; - - // Whether or not to close the connection after completing writing the - // response - bool closeConnection_{false}; + // Thread-safe state (protected by mutex_) + mutable std::mutex mutex_; // Protects all mutable state + std::deque + message_queue_; // Queue of messages to send. Protectec by + // mutex_, but if non-empty mutex_ can be + // dropped while server accesses existing elements. + int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes + bool close_after_send_{false}; // Close after sending all messages + bool is_closed_{false}; // Connection closed state }; diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index c364376..bedc864 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -26,21 +26,21 @@ public: * Process incoming data from a connection. * * @param data Incoming data buffer (may be partial message) - * @param conn_ptr Unique pointer to connection - handler can take ownership - * by releasing it + * @param conn Connection reference - server retains ownership * * Implementation should: - * - Parse incoming data using arena allocator when needed - * - Use conn_ptr->append_message() to queue response data to be sent + * - Create request-scoped Arena for parsing and response generation + * - Parse incoming data using the request arena + * - Use conn.append_message() to queue response data to be sent * - Handle partial messages and streaming protocols appropriately - * - Can take ownership by calling conn_ptr.release() to pass to other threads - * - If ownership is taken, handler must call Server::release_back_to_server() - * when done - * @note `data` is *not* owned by the connection arena, and its lifetime ends - * after the call to on_data_arrived. + * - Use conn.get_weak_ref() for async processing if needed + * + * @note `data` lifetime ends after the call to on_data_arrived. * @note May be called from an arbitrary server thread. + * @note Handler can safely access connection concurrently via thread-safe + * methods. */ - virtual void on_data_arrived(std::string_view /*data*/, Ref &) {}; + virtual void on_data_arrived(std::string_view /*data*/, Connection &) {}; /** * Called when data has been successfully written to the connection. @@ -50,29 +50,26 @@ public: * - Implementing backpressure for continuous data streams * - Progress monitoring for long-running transfers * - * @param conn_ptr Connection that made write progress - handler can take - * ownership + * @param conn Connection that made write progress - server retains ownership * @note May be called from an arbitrary server thread. * @note Called during writes, not necessarily when buffer becomes empty */ - virtual void on_write_progress(Ref &) {} + virtual void on_write_progress(Connection &) {} /** * Called when the connection's outgoing write buffer becomes empty. * * This indicates all queued messages have been successfully written * to the socket. Useful for: - * - Resetting arena allocators safely * - Implementing keep-alive connection reuse * - Closing connections after final response * - Relieving backpressure conditions * - * @param conn_ptr Connection with empty write buffer - handler can take - * ownership + * @param conn Connection with empty write buffer - server retains ownership * @note May be called from an arbitrary server thread. * @note Only called on transitions from non-empty → empty buffer */ - virtual void on_write_buffer_drained(Ref &) {} + virtual void on_write_buffer_drained(Connection &) {} /** * Called when a new connection is established. @@ -101,11 +98,9 @@ public: * * This hook is called after on_data_arrived, on_write_progress, or * on_write_buffer_drained has been called for each connection in the batch. - * The handler can take ownership of the connections by moving the unique_ptr - * out of the span. Any connections left in the span will remain owned by the - * server. + * All connections remain server-owned. * - * @param batch A span of unique_ptrs to the connections in the batch. + * @param batch A span of connection references in the batch. */ - virtual void on_batch_complete(std::span> /*batch*/) {} + virtual void on_batch_complete(std::span /*batch*/) {} }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 0631ba7..6b47347 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -896,9 +896,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) { perfetto::Flow::Global(state->http_request_id)); } - // Return connection to server for further processing or cleanup - Server::release_back_to_server(std::move(commit_entry.connection)); - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: return connection to server @@ -911,9 +908,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) { perfetto::Flow::Global(state->http_request_id)); } - // Return connection to server for further processing or cleanup - Server::release_back_to_server(std::move(status_entry.connection)); - return false; // Continue processing } else if constexpr (std::is_same_v) { // Process health check entry: return connection to server @@ -926,10 +920,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) { perfetto::Flow::Global(state->http_request_id)); } - // Return connection to server for further processing or cleanup - Server::release_back_to_server( - std::move(health_check_entry.connection)); - return false; // Continue processing } diff --git a/src/server.cpp b/src/server.cpp index 9670e4a..8a53e64 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -138,51 +138,6 @@ void Server::shutdown() { } } -void Server::release_back_to_server(Ref connection) { - if (!connection) { - return; // Nothing to release - } - - // Try to get the server from the connection's weak_ptr - if (auto server = connection->server_.lock()) { - // Server still exists - pass unique_ptr directly - server->receiveConnectionBack(std::move(connection)); - } - - // If server is gone, connection will be automatically cleaned up when - // unique_ptr destructs -} - -void Server::receiveConnectionBack(Ref connection) { - if (!connection) { - return; // Nothing to process - } - - // Re-add the connection to epoll for continued processing - struct epoll_event event{}; - - if (!connection->has_messages()) { - event.events = EPOLLIN | EPOLLONESHOT; - } else { - event.events = EPOLLOUT | EPOLLONESHOT; - } - - int fd = connection->getFd(); - event.data.fd = fd; - - // Store connection in registry before adding to epoll - // This mirrors the pattern used in process_connection_batch - size_t epoll_index = connection->getEpollIndex(); - int epollfd = epoll_fds_[epoll_index]; - connection_registry_.store(fd, std::move(connection)); - - if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { - perror("epoll_ctl MOD in receiveConnectionBack"); - // Remove from registry and clean up on failure - (void)connection_registry_.remove(fd); - } -} - int Server::create_local_connection() { int sockets[2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) { @@ -224,7 +179,7 @@ int Server::create_local_connection() { // Add to appropriate epoll instance struct epoll_event event{}; - event.events = EPOLLIN | EPOLLONESHOT; + event.events = EPOLLIN; event.data.fd = server_fd; int epollfd = epoll_fds_[epoll_index]; @@ -353,7 +308,7 @@ void Server::start_io_threads(std::vector &threads) { assert(conn); if (events[i].events & (EPOLLERR | EPOLLHUP)) { - // unique_ptr will automatically delete on scope exit + // Connection will be destroyed on scope exit continue; } @@ -467,14 +422,8 @@ void Server::process_connection_reads(Ref &conn, int events) { return; } - // Call handler with unique_ptr - handler can take ownership if needed - handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn); - - // If handler took ownership (conn is now null), return true to indicate - // processing is done - if (!conn) { - return; - } + // Call handler with connection reference - server retains ownership + handler_.on_data_arrived(std::string_view{buf, size_t(r)}, *conn); } } @@ -492,21 +441,12 @@ void Server::process_connection_writes(Ref &conn, int /*events*/) { return; } - // Call handler with unique_ptr - handler can take ownership if needed - handler_.on_write_progress(conn); - // If handler took ownership (conn is now null), return true to indicate - // processing is done - if (!conn) { - return; - } + // Call handler with connection reference - server retains ownership + handler_.on_write_progress(*conn); // Check if buffer became empty (transition from non-empty -> empty) if (had_messages && !conn->has_messages()) { - handler_.on_write_buffer_drained(conn); - // If handler took ownership (conn is now null), return - if (!conn) { - return; - } + handler_.on_write_buffer_drained(*conn); } // Check if we should close the connection according to application @@ -535,8 +475,15 @@ void Server::process_connection_batch(int epollfd, } } - // Call batch complete handler - handlers can take ownership here - handler_.on_batch_complete(batch); + // Call batch complete handler with connection pointers + std::vector conn_ptrs; + conn_ptrs.reserve(batch.size()); + for (auto &conn_ref : batch) { + if (conn_ref) { + conn_ptrs.push_back(conn_ref.get()); + } + } + handler_.on_batch_complete(conn_ptrs); // Transfer all remaining connections back to epoll for (auto &conn_ptr : batch) { @@ -545,13 +492,13 @@ void Server::process_connection_batch(int epollfd, struct epoll_event event{}; if (!conn_ptr->has_messages()) { - event.events = EPOLLIN | EPOLLONESHOT; + event.events = EPOLLIN; } else { - event.events = EPOLLOUT | EPOLLONESHOT; + event.events = EPOLLIN | EPOLLOUT; } - event.data.fd = fd; // Use file descriptor for epoll - // Put connection back in registry since handler didn't take ownership. + event.data.fd = fd; + // Put connection back in registry since handler didn't take ownership // Must happen before epoll_ctl connection_registry_.store(fd, std::move(conn_ptr)); if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { diff --git a/src/server.hpp b/src/server.hpp index dd3a205..a9a0cfa 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -95,19 +95,6 @@ struct Server { */ int create_local_connection(); - /** - * Release a connection back to its server for continued processing. - * - * This static method safely returns ownership of a connection back to its - * server. If the server has been destroyed, the connection will be safely - * cleaned up. - * - * This method is thread-safe and can be called from any thread. - * - * @param connection unique_ptr to the connection being released back - */ - static void release_back_to_server(Ref connection); - private: friend struct Connection; /** diff --git a/tests/test_server_connection_return.cpp b/tests/test_server_connection_return.cpp index a8b360f..e16d53e 100644 --- a/tests/test_server_connection_return.cpp +++ b/tests/test_server_connection_return.cpp @@ -55,7 +55,6 @@ TEST_CASE( } assert(message.conn); message.conn->append_message(message.data); - Server::release_back_to_server(std::move(message.conn)); } } }};