Files
weaseldb/src/connection.hpp

373 lines
13 KiB
C++

#pragma once
#include <cassert>
#include <cstring>
#include <deque>
#include <mutex>
#include <span>
#include <sys/socket.h>
#include <sys/uio.h>
#include <unistd.h>
#include "arena.hpp"
#include "connection_handler.hpp"
#include "reference.hpp"
#ifndef __has_feature
#define __has_feature(x) 0
#endif
// Forward declaration
struct Server;
/**
* Shutdown modes for connection termination.
*/
enum class ConnectionShutdown {
None, // Normal operation - no shutdown requested
WriteOnly, // shutdown(SHUT_WR) after sending queued data
Full // close() after sending queued data
};
/**
* Base interface for sending messages to a connection.
* This restricted interface is safe for use by pipeline threads,
* containing only the append_message method needed for responses.
* Pipeline threads should use WeakRef<MessageSender> to safely
* send responses without accessing other connection functionality
* that should only be used by the I/O thread.
*/
struct MessageSender {
/**
* @brief Send response with protocol-specific context for ordering.
*
* Thread-safe method for pipeline threads to send responses back to clients.
* Delegates to the connection's protocol handler for ordering logic.
* The protocol handler may queue the response or send it immediately.
*
* @param protocol_context Arena-allocated protocol-specific context
* @param data Response data parts (may be empty for deferred serialization)
* @param arena Arena containing response data and context
*
* Example usage:
* ```cpp
* auto* ctx = arena.allocate<HttpResponseContext>();
* ctx->sequence_id = 42;
* auto response_data = format_response(arena);
* conn.send_response(ctx, response_data, std::move(arena));
* ```
*/
virtual void send_response(void *protocol_context,
std::string_view response_json, Arena arena) = 0;
virtual ~MessageSender() = default;
};
/**
* Represents a single client connection - the full interface available to the
* io thread and connection handler.
*
* Connection ownership model:
* - Server owns all connections
* - Handlers receive Connection& references, and can keep a WeakRef to
* MessageSender for async responses.
* - Multiple pipeline threads can safely access the MessageSender concurrently
* - I/O thread has exclusive access to socket operations
*
* Threading model:
* - Single mutex protects state shared with pipeline threads
* - Pipeline threads call Connection methods (append_message, etc.)
* - I/O thread processes socket events and message queue
* - Pipeline threads register epoll write interest via append_message
* - Connection tracks closed state to prevent EBADF errors
*
* Arena allocator usage:
* - Request-scoped arenas created by handlers for each request
* - No connection-owned arena for parsing/response generation
* - Message queue stores spans + owning arenas until I/O completion
*/
struct Connection : MessageSender {
// No public constructor or factory method - only Server can create
// connections
/**
* @brief Destroy the Connection object.
*
* Automatically handles cleanup of all connection resources:
* - Calls handler's on_connection_closed() method for protocol-specific
* cleanup
* - Decrements the global active connection counter
* - Closes the socket file descriptor
* - Frees all arena-allocated memory
*
* @note The destructor will abort() if socket close fails, as this indicates
* a serious system-level issue that should not be ignored.
*/
~Connection();
// Handler interface - public methods that handlers can use
/**
* @brief Queue an atomic message to be sent to the client.
*
* Adds a complete message with all associated data to the connection's
* outgoing byte queue with guaranteed ordering.
*
* I/O thread only method for protocol handlers to queue bytes for sending.
* Bytes are queued in order and sent using efficient vectored I/O.
*
* @param data_parts Span of string_views pointing to arena-allocated data
* @param arena Arena that owns all the memory referenced by data_parts
* @param shutdown_mode Shutdown mode to apply after sending all queued data
*
* @note Thread Safety: Must be called from I/O thread only.
* @note Ordering: Bytes are sent in the order calls are made.
* @note The memory referenced by the data_parts span, must outlive @p arena.
* @note Shutdown Request: To request connection shutdown without sending
* data, pass empty data_parts span with desired shutdown_mode. This ensures
* all previously queued messages are sent before shutdown.
*
* Example usage (from ConnectionHandler::on_preprocess_writes):
* ```cpp
* Arena arena;
* auto parts = arena.allocate_span<std::string_view>(2);
* parts[0] = build_header(arena);
* parts[1] = build_body(arena);
* conn.append_bytes({parts, 2}, std::move(arena), ConnectionShutdown::None);
* ```
*/
void
append_bytes(std::span<std::string_view> data_parts, Arena arena,
ConnectionShutdown shutdown_mode = ConnectionShutdown::None);
void send_response(void *protocol_context, std::string_view response_json,
Arena arena) override;
/**
* @brief Get a WeakRef to this connection for async operations.
*
* Returns a WeakRef that can be safely used to access this connection
* from other threads, such as pipeline processing threads. The WeakRef
* allows safe access even if the connection might be destroyed by the
* time the async operation executes.
*
* @return WeakRef to this connection
*
* @note Thread Safety: This method is thread-safe.
*
* @note The WeakRef should be used with lock() to safely access the
* connection. If lock() returns null, the connection has been destroyed.
*
* Example usage:
* ```cpp
* auto weak_conn = conn.get_weak_ref();
* async_processor.submit([weak_conn, request_data]() {
* if (auto conn = weak_conn.lock()) {
* Arena arena;
* auto response = process_request(request_data, arena);
* conn->append_message({&response, 1}, std::move(arena));
* }
* });
* ```
*/
WeakRef<MessageSender> get_weak_ref() const {
assert(self_ref_.lock());
return self_ref_.copy();
}
/**
* @brief Get the unique identifier for this connection.
*
* Returns a server-generated unique ID that can be used for logging,
* debugging, or tracking individual connections. The ID is assigned
* atomically when the connection is created and remains constant throughout
* the connection's lifetime.
*
* @return Unique 64-bit connection identifier
*
* @note Thread Safety: This method is thread-safe as it only reads an
* immutable value set during construction.
*
* @note The ID is generated using an atomic counter, ensuring uniqueness
* across all connections within a single server instance. IDs are not
* guaranteed to be sequential due to concurrent connection creation.
*
* Typical usage:
* ```cpp
* std::cout << "Processing request on connection " << conn->get_id() <<
* std::endl; logger.info("Connection {} sent {} bytes", conn->get_id(),
* response.size());
* ```
*/
int64_t get_id() const { return id_; }
/**
* @brief Get the number of bytes queued for transmission.
*
* Returns the total number of bytes in all messages currently
* queued for transmission to the client. This includes all data added via
* append_message() that has not yet been sent over the network.
*
* @return Total bytes queued for transmission
*
* @warning Thread Safety: Only call from the thread that currently owns this
* connection. Concurrent access to the message queue is not thread-safe.
*
* @note Performance: This method uses an O(1) counter for fast retrieval
* in release builds. In debug builds, validates counter accuracy.
*
* @note The count decreases as the server sends data via writeBytes() and
* removes completed messages from the queue.
*
* Use cases:
* ```cpp
* // Check if all data has been sent
* if (conn->outgoing_bytes_queued() == 0) {
* conn->reset(); // Safe to reset arena
* }
*
* // Implement backpressure
* if (conn->outgoing_bytes_queued() > MAX_BUFFER_SIZE) {
* // Stop adding more data until queue drains
* }
*
* // Logging/monitoring
* metrics.recordQueueDepth(conn->get_id(), conn->outgoing_bytes_queued());
* ```
*/
int64_t outgoing_bytes_queued() const {
std::lock_guard lock(mutex_);
#ifndef NDEBUG
// Debug build: validate counter accuracy
int64_t computed_total = 0;
for (const auto &message : message_queue_) {
for (const auto &part : message.data_parts) {
computed_total += part.size();
}
}
assert(
outgoing_bytes_queued_ == computed_total &&
"outgoing_bytes_queued_ counter is out of sync with actual queue size");
#endif
return outgoing_bytes_queued_;
}
/**
* @brief Protocol-specific data pointer for handler use.
*
* A void pointer that can be used by ConnectionHandler implementations to
* attach protocol-specific state or data structures to individual
* connections. This enables handlers to maintain per-connection state without
* requiring external lookup tables.
*
* @warning Memory Management: The handler is responsible for allocating and
* freeing any data pointed to by user_data. Typically this should be done in
* on_connection_established() and on_connection_closed() respectively.
*
* @warning Thread Safety: Only access from the thread that currently owns
* this connection. The pointer itself and any referenced data must not be
* accessed concurrently.
*
* @note Lifetime: The user_data pointer is set to nullptr in the destructor
* for safety, but the handler must ensure proper cleanup in
* on_connection_closed() before the connection is destroyed.
*
* Example usage:
* ```cpp
* class HttpHandler : ConnectionHandler {
* void on_connection_established(Connection& conn) override {
* // Allocate HTTP state in connection's arena or heap
* auto* state = conn.get_arena().construct<HttpConnectionState>();
* conn.user_data = state;
* }
*
* void on_connection_closed(Connection& conn) override {
* // Cleanup if using heap allocation
* delete static_cast<HttpConnectionState*>(conn.user_data);
* conn.user_data = nullptr;
* }
*
* void on_data_arrived(std::string_view data,
* Connection& conn) override {
* auto* state = static_cast<HttpConnectionState*>(conn.user_data);
* // Use state for protocol processing...
* }
* };
* ```
*/
void *user_data = nullptr;
private:
struct Message {
Arena arena; // Owns all the memory (movable)
std::span<std::string_view> data_parts; // Points to arena-allocated memory
// (mutable for partial writes)
};
// Server is a friend and can access all networking internals
friend struct Server;
/**
* @brief Private constructor - only accessible by Server.
*
* Creates a new connection with the specified network address, file
* descriptor, and associated handler. Automatically increments the global
* active connection counter and calls the handler's
* on_connection_established() method.
*
* @param addr Network address of the remote client (IPv4/IPv6 compatible)
* @param fd File descriptor for the socket connection
* @param id Unique connection identifier generated by the server
* @param handler Protocol handler for processing connection data
* @param server Reference to server associated with this connection
*/
Connection(struct sockaddr_storage addr, int fd, int64_t id,
size_t epoll_index, ConnectionHandler *handler,
WeakRef<Server> server);
template <typename T, typename... Args>
friend Ref<T> make_ref(Args &&...args);
// Networking interface - only accessible by Server
int readBytes(char *buf, size_t buffer_size);
enum WriteBytesResult {
Error = 1 << 0,
Progress = 1 << 1,
Close = 1 << 2,
};
uint32_t write_bytes();
void close();
// Immutable connection properties
const int64_t id_;
const size_t epoll_index_; // Index of the epoll instance this connection uses
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
ConnectionHandler *const handler_;
WeakRef<Server> server_; // Weak reference to server for safe epoll_ctl calls
WeakRef<Connection> self_ref_; // WeakRef to self for get_weak_ref()
// state shared with pipeline threads (protected by mutex_)
mutable std::mutex mutex_; // Protects all mutable state
std::deque<Message>
message_queue_; // Queue of messages to send. Protected by
// mutex_, but if non-empty mutex_ can be
// dropped while server accesses existing elements.
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
std::deque<PendingResponse>
pending_response_queue_; // Responses awaiting protocol processing
ConnectionShutdown shutdown_requested_{
ConnectionShutdown::None}; // Shutdown mode requested
// Set to a negative number in `close`
int fd_;
#if __has_feature(thread_sanitizer)
void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); }
void tsan_release() { tsan_sync.store(0, std::memory_order_release); }
std::atomic<int> tsan_sync;
#else
void tsan_acquire() {}
void tsan_release() {}
#endif
};