Files
weaseldb/src/connection.hpp

381 lines
14 KiB
C++

#pragma once
#include "arena_allocator.hpp"
#include "connection_handler.hpp"
#include <atomic>
#include <cassert>
#include <cstring>
#include <deque>
#include <memory>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>
extern std::atomic<int> activeConnections;
#ifndef __has_feature
#define __has_feature(x) 0
#endif
/**
* Represents a single client connection with efficient memory management.
*
* Connection ownership model:
* - Created by I/O thread, processed immediately, then transferred to epoll via
* raw pointer
* - I/O threads claim ownership by wrapping raw pointer in unique_ptr
* - I/O thread optionally passes ownership to a thread pipeline
* - Owner eventually transfers back to epoll by releasing unique_ptr to raw
* pointer
* - RAII cleanup happens if I/O thread doesn't transfer back
*
* Arena allocator thread safety:
* Each Connection contains its own ArenaAllocator instance that is accessed
* exclusively by the thread that currently owns the connection. This ensures
* thread safety without requiring locks:
* - Arena is used by the owning thread for I/O buffers, request parsing, and
* response generation
* - Arena memory is automatically freed when the connection is destroyed
* - reset() should only be called by the current owner thread
*
* Only the handler interface methods are public - all networking details are
* private.
*/
// Forward declaration
class Server;
struct Connection {
// No public constructor or factory method - only Server can create
// connections
/**
* @brief Destroy the Connection object.
*
* Automatically handles cleanup of all connection resources:
* - Calls handler's on_connection_closed() method for protocol-specific
* cleanup
* - Decrements the global active connection counter
* - Closes the socket file descriptor
* - Frees all arena-allocated memory
*
* @note The destructor will abort() if socket close fails, as this indicates
* a serious system-level issue that should not be ignored.
*/
~Connection();
// Handler interface - public methods that handlers can use
/**
* @brief Queue a message to be sent to the client.
*
* Adds data to the connection's outgoing message queue. The data will be sent
* asynchronously by the server's I/O threads using efficient vectored
* I/O.
*
* @param s The data to send (string view for zero-copy efficiency)
* @param copyToArena If true (default), copies data to the connection's arena
* for safe storage. If false, the caller must ensure the
* data remains valid until all queued messages are sent.
*
* @warning Thread Safety: Only call from the thread that currently owns this
* connection. The arena allocator is not thread-safe.
*
* @note Performance: Use copyToArena=false for static strings or data with
* guaranteed lifetime, copyToArena=true for temporary/dynamic data.
*
* Example usage:
* ```cpp
* conn->appendMessage("HTTP/1.1 200 OK\r\n\r\n", false); // Static string
* conn->appendMessage(dynamic_response, true); // Dynamic data
* conn->appendMessage(arena_allocated_data, false); // Arena data
* ```
*/
void appendMessage(std::string_view s, bool copyToArena = true);
/**
* @brief Mark the connection to be closed after sending all queued messages.
*
* Sets a flag that instructs the server to close this connection gracefully
* after all currently queued messages have been successfully sent to the
* client. This enables proper connection cleanup for protocols like HTTP/1.0
* or when implementing connection limits.
*
* @note The connection will remain active until:
* 1. All queued messages are sent to the client
* 2. The server processes the close flag during the next I/O cycle
* 3. The connection is properly closed and cleaned up
*
* @warning Thread Safety: Only call from the thread that currently owns this
* connection.
*
* Typical usage:
* ```cpp
* conn->appendMessage("HTTP/1.1 200 OK\r\n\r\nBye!");
* conn->closeAfterSend(); // Close after sending response
* ```
*/
void closeAfterSend() { closeConnection_ = true; }
/**
* @brief Get access to the connection's arena allocator.
*
* Returns a reference to this connection's private ArenaAllocator instance,
* which should be used for all temporary allocations during request
* processing. The arena provides extremely fast allocation (~1ns) and
* automatic cleanup when the connection is destroyed or reset.
*
* @return Reference to the connection's arena allocator
*
* @warning Thread Safety: Only access from the thread that currently owns
* this connection. The arena allocator is not thread-safe and concurrent
* access will result in undefined behavior.
*
* @note Memory Lifecycle: Arena memory is automatically freed when:
* - The connection is destroyed
* - reset() is called (keeps first block, frees others)
* - The connection is moved (arena ownership transfers)
*
* Best practices:
* ```cpp
* ArenaAllocator& arena = conn->getArena();
*
* // Allocate temporary parsing buffers
* char* buffer = arena.allocate<char>(1024);
*
* // Construct temporary objects
* auto* request = arena.construct<HttpRequest>(arena);
*
* // Use arena-backed STL containers
* std::vector<Token, ArenaStlAllocator<Token>> tokens{&arena};
* ```
*/
ArenaAllocator &getArena() { return arena_; }
/**
* @brief Get the unique identifier for this connection.
*
* Returns a server-generated unique ID that can be used for logging,
* debugging, or tracking individual connections. The ID is assigned
* atomically when the connection is created and remains constant throughout
* the connection's lifetime.
*
* @return Unique 64-bit connection identifier
*
* @note Thread Safety: This method is thread-safe as it only reads an
* immutable value set during construction.
*
* @note The ID is generated using an atomic counter, ensuring uniqueness
* across all connections within a single server instance. IDs are not
* guaranteed to be sequential due to concurrent connection creation.
*
* Typical usage:
* ```cpp
* std::cout << "Processing request on connection " << conn->getId() <<
* std::endl; logger.info("Connection {} sent {} bytes", conn->getId(),
* response.size());
* ```
*/
int64_t getId() const { return id_; }
/**
* @brief Get the number of bytes queued for transmission.
*
* Calculates and returns the total number of bytes in all messages currently
* queued for transmission to the client. This includes all data added via
* appendMessage() that has not yet been sent over the network.
*
* @return Total bytes queued for transmission
*
* @warning Thread Safety: Only call from the thread that currently owns this
* connection. Concurrent access to the message queue is not thread-safe.
*
* @note Performance: This method iterates through all queued messages to
* calculate the total, so avoid calling it frequently in hot paths.
*
* @note The count decreases as the server sends data via writeBytes() and
* removes completed messages from the queue.
*
* Use cases:
* ```cpp
* // Check if all data has been sent
* if (conn->outgoingBytesQueued() == 0) {
* conn->reset(); // Safe to reset arena
* }
*
* // Implement backpressure
* if (conn->outgoingBytesQueued() > MAX_BUFFER_SIZE) {
* // Stop adding more data until queue drains
* }
*
* // Logging/monitoring
* metrics.recordQueueDepth(conn->getId(), conn->outgoingBytesQueued());
* ```
*/
int64_t outgoingBytesQueued() const {
int64_t result = 0;
for (auto s : messages_) {
result += s.size();
}
return result;
}
/**
* @brief Protocol-specific data pointer for handler use.
*
* A void pointer that can be used by ConnectionHandler implementations to
* attach protocol-specific state or data structures to individual
* connections. This enables handlers to maintain per-connection state without
* requiring external lookup tables.
*
* @warning Memory Management: The handler is responsible for allocating and
* freeing any data pointed to by user_data. Typically this should be done in
* on_connection_established() and on_connection_closed() respectively.
*
* @warning Thread Safety: Only access from the thread that currently owns
* this connection. The pointer itself and any referenced data must not be
* accessed concurrently.
*
* @note Lifetime: The user_data pointer is set to nullptr in the destructor
* for safety, but the handler must ensure proper cleanup in
* on_connection_closed() before the connection is destroyed.
*
* Example usage:
* ```cpp
* class HttpHandler : public ConnectionHandler {
* void on_connection_established(Connection& conn) override {
* // Allocate HTTP state in connection's arena or heap
* auto* state = conn.getArena().construct<HttpConnectionState>();
* conn.user_data = state;
* }
*
* void on_connection_closed(Connection& conn) override {
* // Cleanup if using heap allocation
* delete static_cast<HttpConnectionState*>(conn.user_data);
* conn.user_data = nullptr;
* }
*
* void on_data_arrived(std::string_view data,
* std::unique_ptr<Connection>& conn_ptr) override {
* auto* state = static_cast<HttpConnectionState*>(conn_ptr->user_data);
* // Use state for protocol processing...
* }
* };
* ```
*/
void *user_data = nullptr;
/**
* Reset the connection's arena allocator and message queue for reuse.
*
* This method efficiently reclaims arena memory by keeping the first block
* and freeing all others, then reinitializes the message queue.
*
* @warning Thread Safety: This method should ONLY be called by the thread
* that currently owns this connection. Calling reset() while the connection
* is being transferred between threads or accessed by another thread will
* result in undefined behavior.
*
* @note The assert(messages_.empty()) ensures all outgoing data has been
* sent before resetting. This prevents data loss and indicates the connection
* is in a clean state for reuse.
*
* Typical usage pattern:
* - HTTP handlers call this after completing a request/response cycle
*/
void reset() {
assert(messages_.empty());
arena_.reset();
messages_ =
std::deque<std::string_view, ArenaStlAllocator<std::string_view>>{
ArenaStlAllocator<std::string_view>{&arena_}};
}
/**
* @note Ownership Transfer: To release a connection back to the server for
* continued processing, use the static method:
* ```cpp
* Server::releaseBackToServer(std::move(connection_ptr));
* ```
*
* This is the correct way to return connection ownership when:
* - A handler has taken ownership via unique_ptr.release()
* - Background processing of the connection is complete
* - The connection should resume normal server-managed I/O processing
*
* The method is thread-safe and handles the case where the server may have
* been destroyed while the connection was being processed elsewhere.
*/
private:
// Server is a friend and can access all networking internals
friend class Server;
/**
* @brief Private constructor - only accessible by Server.
*
* Creates a new connection with the specified network address, file
* descriptor, and associated handler. Automatically increments the global
* active connection counter and calls the handler's
* on_connection_established() method.
*
* @param addr Network address of the remote client (IPv4/IPv6 compatible)
* @param fd File descriptor for the socket connection
* @param id Unique connection identifier generated by the server
* @param handler Protocol handler for processing connection data
* @param server Weak reference to the server for safe cleanup
*/
Connection(struct sockaddr_storage addr, int fd, int64_t id,
ConnectionHandler *handler, std::weak_ptr<Server> server);
/**
* @brief Server-only factory method for creating connections.
*
* This factory method can only be called by the Server class due to friend
* access. It provides controlled connection creation with proper resource
* management.
*
* @param addr Network address of the remote client (IPv4/IPv6 compatible)
* @param fd File descriptor for the socket connection
* @param id Unique connection identifier generated by the server
* @param handler Protocol handler for processing connection data
* @param server Weak reference to the server for safe cleanup
*
* @return std::unique_ptr<Connection> to the newly created connection
*
* @note This method is only accessible to the Server class and should be used
* exclusively by I/O threads when new connections arrive.
*/
static std::unique_ptr<Connection>
createForServer(struct sockaddr_storage addr, int fd, int64_t id,
ConnectionHandler *handler, std::weak_ptr<Server> server);
// Networking interface - only accessible by Server
int readBytes(char *buf, size_t buffer_size);
bool writeBytes();
void tsan_acquire();
void tsan_release();
// Direct access methods for Server
int getFd() const { return fd_; }
bool hasMessages() const { return !messages_.empty(); }
bool shouldClose() const { return closeConnection_; }
const int fd_;
const int64_t id_;
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
ArenaAllocator arena_;
ConnectionHandler *handler_;
std::weak_ptr<Server> server_; // Weak reference to server for safe cleanup
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
ArenaStlAllocator<std::string_view>{&arena_}};
// Whether or not to close the connection after completing writing the
// response
bool closeConnection_{false};
// TSAN support for epoll synchronization
#if __has_feature(thread_sanitizer)
std::atomic<int> tsan_sync_;
#endif
};