From cb322bbb2b382234215d567e2e9795486a034947 Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Tue, 19 Aug 2025 13:23:18 -0400 Subject: [PATCH] Separate Connection, ConnectionHandler, Server --- CMakeLists.txt | 10 +- src/connection.cpp | 130 +++++++++ src/connection.hpp | 83 ++++++ src/connection_handler.hpp | 72 +++++ src/main.cpp | 523 +++---------------------------------- src/server.cpp | 426 ++++++++++++++++++++++++++++++ src/server.hpp | 136 ++++++++++ 7 files changed, 888 insertions(+), 492 deletions(-) create mode 100644 src/connection.cpp create mode 100644 src/connection.hpp create mode 100644 src/connection_handler.hpp create mode 100644 src/server.cpp create mode 100644 src/server.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 82f5b27..695de84 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -122,8 +122,14 @@ add_custom_command( add_custom_target(generate_json_tokens DEPENDS ${CMAKE_BINARY_DIR}/json_tokens.cpp) -set(SOURCES src/main.cpp src/config.cpp src/json_commit_request_parser.cpp - src/arena_allocator.cpp ${CMAKE_BINARY_DIR}/json_tokens.cpp) +set(SOURCES + src/main.cpp + src/config.cpp + src/connection.cpp + src/server.cpp + src/json_commit_request_parser.cpp + src/arena_allocator.cpp + ${CMAKE_BINARY_DIR}/json_tokens.cpp) add_executable(weaseldb ${SOURCES}) add_dependencies(weaseldb generate_json_tokens) diff --git a/src/connection.cpp b/src/connection.cpp new file mode 100644 index 0000000..292b25c --- /dev/null +++ b/src/connection.cpp @@ -0,0 +1,130 @@ +#include "connection.hpp" +#include "server.hpp" // Need this for releaseBackToServer implementation +#include +#include +#include +#include + +Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, + ConnectionHandler *handler, std::weak_ptr server) + : fd_(fd), id_(id), addr_(addr), arena_(), handler_(handler), + server_(server) { + activeConnections.fetch_add(1, std::memory_order_relaxed); + if (handler_) { + handler_->on_connection_established(*this); + } +} + +Connection::~Connection() { + if (handler_) { + handler_->on_connection_closed(*this); + } + activeConnections.fetch_sub(1, std::memory_order_relaxed); + int e = close(fd_); + if (e == -1) { + perror("close"); + abort(); + } +} + +void Connection::appendMessage(std::string_view s) { + char *arena_str = arena_.allocate(s.size()); + std::memcpy(arena_str, s.data(), s.size()); + messages_.emplace_back(arena_str, s.size()); +} + +std::string_view Connection::readBytes(size_t /*max_request_size*/, + size_t buffer_size) { + // Use Variable Length Array for optimal stack allocation + char buf[buffer_size]; + + int r = read(fd_, buf, buffer_size); + if (r == -1) { + if (errno == EINTR || errno == EAGAIN) { + return {}; // Empty string_view indicates no data or would block + } + perror("read"); + return {}; // Error - let server handle connection cleanup + } + if (r == 0) { + return {}; // EOF - let server handle connection cleanup + } + + // Copy data to arena for persistent storage + char *arena_data = arena_.allocate(r); + std::memcpy(arena_data, buf, r); + return {arena_data, size_t(r)}; +} + +bool Connection::writeBytes() { + while (!messages_.empty()) { + // Build iovec array up to IOV_MAX limit + struct iovec iov[IOV_MAX]; + int iov_count = 0; + + for (auto it = messages_.begin(); + it != messages_.end() && iov_count < IOV_MAX; ++it) { + const auto &msg = *it; + if (msg.size() > 0) { + iov[iov_count] = { + const_cast(static_cast(msg.data())), + msg.size()}; + iov_count++; + } + } + + if (iov_count == 0) { + break; + } + + ssize_t w; + for (;;) { + w = writev(fd_, iov, iov_count); + if (w == -1) { + if (errno == EINTR) { + continue; // Standard practice: retry on signal interruption + } + if (errno == EAGAIN) { + return false; + } + perror("writev"); + return true; + } + break; + } + + assert(w > 0); + + // Handle partial writes by updating string_view data/size + size_t bytes_written = static_cast(w); + while (bytes_written > 0 && !messages_.empty()) { + auto &front = messages_.front(); + + if (bytes_written >= front.size()) { + // This message is completely written + bytes_written -= front.size(); + messages_.pop_front(); + } else { + // Partial write of this message - update string_view + front = std::string_view(front.data() + bytes_written, + front.size() - bytes_written); + bytes_written = 0; + } + } + } + assert(messages_.empty()); + arena_.reset(); + return closeConnection_; +} + +void Connection::tsan_acquire() { +#if __has_feature(thread_sanitizer) + tsan_sync_.load(std::memory_order_acquire); +#endif +} + +void Connection::tsan_release() { +#if __has_feature(thread_sanitizer) + tsan_sync_.store(0, std::memory_order_release); +#endif +} \ No newline at end of file diff --git a/src/connection.hpp b/src/connection.hpp new file mode 100644 index 0000000..ac6e783 --- /dev/null +++ b/src/connection.hpp @@ -0,0 +1,83 @@ +#pragma once + +#include "arena_allocator.hpp" +#include "connection_handler.hpp" +#include +#include +#include +#include +#include +#include +#include +#include + +extern std::atomic activeConnections; + +#ifndef __has_feature +#define __has_feature(x) 0 +#endif + +/** + * Represents a single client connection with efficient memory management. + * + * Connection ownership model: + * - Created by accept thread, transferred to epoll via raw pointer + * - Network threads claim ownership by wrapping raw pointer in unique_ptr + * - Network thread optionally passes ownership to a thread pipeline + * - Owner eventually transfers back to epoll by releasing unique_ptr to raw + * pointer + * - RAII cleanup happens if network thread doesn't transfer back + * + * Only the handler interface methods are public - all networking details are + * private. + */ +// Forward declaration +class Server; + +struct Connection { + Connection(struct sockaddr_storage addr, int fd, int64_t id, + ConnectionHandler *handler, std::weak_ptr server); + ~Connection(); + + // Handler interface - public methods that handlers can use + void appendMessage(std::string_view s); + void closeAfterSend() { closeConnection_ = true; } + ArenaAllocator &getArena() { return arena_; } + int64_t getId() const { return id_; } + + // Note: To release connection back to server, use + // Server::releaseBackToServer(std::move(connection_ptr)) + +private: + // Server is a friend and can access all networking internals + friend class Server; + + // Networking interface - only accessible by Server + std::string_view readBytes(size_t max_request_size, size_t buffer_size); + bool writeBytes(); + void tsan_acquire(); + void tsan_release(); + + // Direct access methods for Server + int getFd() const { return fd_; } + bool hasMessages() const { return !messages_.empty(); } + bool shouldClose() const { return closeConnection_; } + const int fd_; + const int64_t id_; + struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6 + ArenaAllocator arena_; + ConnectionHandler *handler_; + std::weak_ptr server_; // Weak reference to server for safe cleanup + + std::deque> messages_{ + ArenaStlAllocator{&arena_}}; + + // Whether or not to close the connection after completing writing the + // response + bool closeConnection_{false}; + + // TSAN support for epoll synchronization +#if __has_feature(thread_sanitizer) + std::atomic tsan_sync_; +#endif +}; diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp new file mode 100644 index 0000000..e8b2d0c --- /dev/null +++ b/src/connection_handler.hpp @@ -0,0 +1,72 @@ +#pragma once + +#include +#include + +// Forward declaration to avoid circular dependency +struct Connection; + +enum class ProcessResult { + Continue, // Keep connection open, continue processing + CloseAfterSend, // Send response then close connection + CloseNow // Close connection immediately +}; + +/** + * Abstract interface for handling connection data processing. + * + * This interface decouples protocol-specific logic (HTTP, WebSocket, etc.) + * from the underlying networking infrastructure. Implementations handle + * parsing incoming data and generating appropriate responses. + * + * The networking layer manages connection lifecycle, I/O multiplexing, + * and efficient data transfer, while handlers focus purely on protocol logic. + */ +class ConnectionHandler { +public: + virtual ~ConnectionHandler() = default; + + /** + * Process incoming data from a connection. + * + * @param data Incoming data buffer (may be partial message) + * @param conn_ptr Unique pointer to connection - handler can take ownership + * by releasing it + * @return ProcessResult indicating how to handle the connection + * + * Implementation should: + * - Parse incoming data using arena allocator when needed + * - Use conn_ptr->appendMessage() to queue response data + * - Return appropriate ProcessResult for connection management + * - Handle partial messages and streaming protocols appropriately + * - Can take ownership by calling conn_ptr.release() to pass to other threads + * - If ownership is taken, handler must call conn->releaseBackToServer() when + * done + */ + virtual ProcessResult process_data(std::string_view data, + std::unique_ptr &conn_ptr) = 0; + + /** + * Called when a new connection is established. + * + * @param conn Newly established connection + * + * Use this for: + * - Connection-specific initialization + * - Sending greeting messages + * - Setting up connection state + */ + virtual void on_connection_established(Connection &) {} + + /** + * Called when a connection is about to be closed. + * + * @param conn Connection being closed + * + * Use this for: + * - Cleanup of connection-specific resources + * - Logging connection statistics + * - Finalizing protocol state + */ + virtual void on_connection_closed(Connection &) {} +}; diff --git a/src/main.cpp b/src/main.cpp index 3f3755b..b019d52 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,270 +1,44 @@ -#include "arena_allocator.hpp" #include "config.hpp" +#include "connection.hpp" +#include "connection_handler.hpp" +#include "server.hpp" #include -#include #include -#include -#include -#include -#include -#include -#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include std::atomic activeConnections{0}; -int shutdown_pipe[2] = {-1, -1}; -#ifndef __has_feature -#define __has_feature(x) 0 -#endif +// Global server instance for signal handler access +Server *g_server = nullptr; void signal_handler(int sig) { if (sig == SIGTERM || sig == SIGINT) { - if (shutdown_pipe[1] != -1) { - char val = 1; - // write() is async-signal-safe per POSIX - safe to use in signal handler - // Write single byte to avoid partial write complexity - while (write(shutdown_pipe[1], &val, 1) == -1) { - if (errno != EINTR) { - abort(); // graceful shutdown didn't work. Let's go ungraceful. - } - } + if (g_server) { + g_server->shutdown(); } } } -// Adapted from getaddrinfo man page -int getListenFd(const char *node, const char *service) { - - struct addrinfo hints; - struct addrinfo *result, *rp; - int sfd, s; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ - hints.ai_socktype = SOCK_STREAM; /* stream socket */ - hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ - hints.ai_protocol = 0; /* Any protocol */ - hints.ai_canonname = nullptr; - hints.ai_addr = nullptr; - hints.ai_next = nullptr; - - s = getaddrinfo(node, service, &hints, &result); - if (s != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); - abort(); +/** + * Simple echo handler that mirrors received data back to the client. + * + * This implementation preserves the current behavior of the server + * while demonstrating the ConnectionHandler interface pattern. + */ +class EchoHandler : public ConnectionHandler { +public: + ProcessResult process_data(std::string_view data, + std::unique_ptr &conn_ptr) override { + // Echo the received data back to the client + conn_ptr->appendMessage(data); + return ProcessResult::Continue; } - /* getaddrinfo() returns a list of address structures. - Try each address until we successfully bind(2). - If socket(2) (or bind(2)) fails, we (close the socket - and) try the next address. */ - - for (rp = result; rp != nullptr; rp = rp->ai_next) { - sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); - if (sfd == -1) { - continue; - } - - int val = 1; - if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) { - perror("setsockopt SO_REUSEADDR"); - close(sfd); - continue; // Try next address - } - - // Enable TCP_NODELAY for low latency (disable Nagle's algorithm) - if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) { - perror("setsockopt TCP_NODELAY"); - close(sfd); - continue; // Try next address - } - - // Set socket to non-blocking for graceful shutdown - int flags = fcntl(sfd, F_GETFL, 0); - if (flags == -1) { - perror("fcntl F_GETFL"); - close(sfd); - continue; // Try next address - } - if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { - perror("fcntl F_SETFL"); - close(sfd); - continue; // Try next address - } - - if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { - break; /* Success */ - } - - close(sfd); + void on_connection_established(Connection &conn) override { + // Could send a welcome message if desired + // conn.appendMessage("Welcome to WeaselDB echo server\n"); + (void)conn; // Suppress unused parameter warning } - - freeaddrinfo(result); /* No longer needed */ - - if (rp == nullptr) { /* No address succeeded */ - fprintf(stderr, "Could not bind\n"); - abort(); - } - - int rv = listen(sfd, SOMAXCONN); - if (rv) { - perror("listen"); - abort(); - } - - return sfd; -} - -// Since only one thread owns a connection at a time, no synchronization is -// necessary -// Connection ownership model: -// - Created by accept thread, transferred to epoll via raw pointer -// - Network threads claim ownership by wrapping raw pointer in unique_ptr -// - Network thread optionally passes ownership to a thread pipeline -// - Owner eventually transfers back to epoll by releasing unique_ptr to raw -// pointer -// - RAII cleanup happens if network thread doesn't transfer back -struct Connection { - const int fd; - const int64_t id; - struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6 - ArenaAllocator arena; - - Connection(struct sockaddr_storage addr, int fd, int64_t id) - : fd(fd), id(id), addr(addr) { - activeConnections.fetch_add(1, std::memory_order_relaxed); - } - - ~Connection() { - activeConnections.fetch_sub(1, std::memory_order_relaxed); - int e = close(fd); - if (e == -1) { - perror("close"); - abort(); - } - } - - std::deque> messages{ - ArenaStlAllocator{&arena}}; - // Copies s into arena, and appends to messages - void appendMessage(std::string_view s) { - char *arena_str = arena.allocate(s.size()); - std::memcpy(arena_str, s.data(), s.size()); - messages.emplace_back(arena_str, s.size()); - } - // Whether or not to close the connection after completing writing the - // response - bool closeConnection{false}; - - bool readBytes(size_t /*max_request_size*/, size_t buffer_size) { - // Use Variable Length Array for optimal stack allocation - char buf[buffer_size]; - - for (;;) { - int r = read(fd, buf, buffer_size); - if (r == -1) { - if (errno == EINTR) { - continue; - } - if (errno == EAGAIN) { - return false; - } - perror("read"); - return true; - } - if (r == 0) { - return true; - } - // "pump parser" - // TODO revisit - appendMessage({buf, size_t(r)}); - } - } - - bool writeBytes() { - while (!messages.empty()) { - // Build iovec array up to IOV_MAX limit - struct iovec iov[IOV_MAX]; - int iov_count = 0; - - for (auto it = messages.begin(); - it != messages.end() && iov_count < IOV_MAX; ++it) { - const auto &msg = *it; - if (msg.size() > 0) { - iov[iov_count] = { - const_cast(static_cast(msg.data())), - msg.size()}; - iov_count++; - } - } - - if (iov_count == 0) { - break; - } - - ssize_t w; - for (;;) { - w = writev(fd, iov, iov_count); - if (w == -1) { - if (errno == EINTR) { - continue; // Standard practice: retry on signal interruption - } - if (errno == EAGAIN) { - return false; - } - perror("writev"); - return true; - } - break; - } - - assert(w > 0); - - // Handle partial writes by updating string_view data/size - size_t bytes_written = static_cast(w); - while (bytes_written > 0 && !messages.empty()) { - auto &front = messages.front(); - - if (bytes_written >= front.size()) { - // This message is completely written - bytes_written -= front.size(); - messages.pop_front(); - } else { - // Partial write of this message - update string_view - front = std::string_view(front.data() + bytes_written, - front.size() - bytes_written); - bytes_written = 0; - } - } - } - assert(messages.empty()); - arena.reset(); - return closeConnection; - } - - // This is necessary because tsan doesn't (yet?) understand that there's a - // happens-before relationship for epoll_ctl(..., EPOLL_CTL_MOD, ...) and - // epoll_wait -#if __has_feature(thread_sanitizer) - void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } - void tsan_release() { tsan_sync.store(0, std::memory_order_release); } - std::atomic tsan_sync; -#else - void tsan_acquire() {} - void tsan_release() {} -#endif }; void print_help(const char *program_name) { @@ -351,251 +125,20 @@ int main(int argc, char *argv[]) { << config->subscription.keepalive_interval.count() << " seconds" << std::endl; - // Create shutdown pipe for graceful shutdown - if (pipe(shutdown_pipe) == -1) { - perror("pipe"); - abort(); - } - // Set both ends to close-on-exec - if (fcntl(shutdown_pipe[0], F_SETFD, FD_CLOEXEC) == -1 || - fcntl(shutdown_pipe[1], F_SETFD, FD_CLOEXEC) == -1) { - perror("fcntl FD_CLOEXEC"); - abort(); - } + // Create handler and server + EchoHandler echo_handler; + auto server = Server::create(*config, echo_handler); + g_server = server.get(); + // Setup signal handling signal(SIGPIPE, SIG_IGN); signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); - int sockfd = getListenFd(config->server.bind_address.c_str(), - std::to_string(config->server.port).c_str()); - std::vector threads; - int network_epollfd = epoll_create1(EPOLL_CLOEXEC); - if (network_epollfd == -1) { - perror("epoll_create"); - abort(); - } - // Add shutdown pipe read end to network thread epoll - struct epoll_event shutdown_event; - shutdown_event.events = EPOLLIN; - shutdown_event.data.fd = shutdown_pipe[0]; - if (epoll_ctl(network_epollfd, EPOLL_CTL_ADD, shutdown_pipe[0], - &shutdown_event) == -1) { - perror("epoll_ctl add shutdown event"); - abort(); - } - - std::atomic connectionId{0}; - - // Network threads from configuration - int networkThreads = config->server.network_threads; - - for (int networkThreadId = 0; networkThreadId < networkThreads; - ++networkThreadId) { - threads.emplace_back( - [network_epollfd, networkThreadId, - max_request_size = config->server.max_request_size_bytes, - read_buffer_size = config->server.read_buffer_size, - event_batch_size = config->server.event_batch_size]() { - pthread_setname_np( - pthread_self(), - ("network-" + std::to_string(networkThreadId)).c_str()); - std::vector events(event_batch_size); - for (;;) { - int eventCount = epoll_wait(network_epollfd, events.data(), - event_batch_size, -1 /* no timeout */); - if (eventCount == -1) { - if (errno == EINTR) { - continue; - } - perror("epoll_wait"); - abort(); - } - - for (int i = 0; i < eventCount; ++i) { - // Check for shutdown event - if (events[i].data.fd == shutdown_pipe[0]) { - // Don't read pipe - all threads need to see shutdown signal - return; - } - - // Take ownership from epoll: raw pointer -> unique_ptr - std::unique_ptr conn{ - static_cast(events[i].data.ptr)}; - conn->tsan_acquire(); - events[i].data.ptr = - nullptr; // Clear epoll pointer (we own it now) - const int fd = conn->fd; - - if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { - // Connection closed or error occurred - unique_ptr destructor - // cleans up - continue; - } - - // When we register our epoll interest, if we have something to - // write, we write. Otherwise we read. - assert(!((events[i].events & EPOLLIN) && - (events[i].events & EPOLLOUT))); - - if (events[i].events & EPOLLIN) { - bool done = conn->readBytes(max_request_size, read_buffer_size); - if (done) { - continue; - } - } - - if (events[i].events & EPOLLOUT) { - bool done = conn->writeBytes(); - if (done) { - continue; - } - } - - if (conn->messages.empty()) { - events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - } else { - events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; - } - // Transfer ownership back to epoll: unique_ptr -> raw pointer - conn->tsan_release(); - Connection *raw_conn = - conn.release(); // Get raw pointer before epoll_ctl - events[i].data.ptr = raw_conn; // epoll now owns the connection - int e = epoll_ctl(network_epollfd, EPOLL_CTL_MOD, fd, &events[i]); - if (e == -1) { - perror("epoll_ctl"); - delete raw_conn; // Clean up connection on epoll failure - continue; - } - } - } - }); - } - - // Accept threads from configuration - int acceptThreads = config->server.accept_threads; - // epoll instance for accept threads - int accept_epollfd = epoll_create1(EPOLL_CLOEXEC); - if (accept_epollfd == -1) { - perror("epoll_create1"); - abort(); - } - - // Add shutdown pipe read end to accept epoll - if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, shutdown_pipe[0], - &shutdown_event) == -1) { - perror("epoll_ctl shutdown pipe"); - abort(); - } - - // Add listen socket to accept epoll with EPOLLEXCLUSIVE for better load - // balancing - struct epoll_event listen_event; - listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; - listen_event.data.fd = sockfd; - if (epoll_ctl(accept_epollfd, EPOLL_CTL_ADD, sockfd, &listen_event) == -1) { - perror("epoll_ctl listen socket"); - abort(); - } - - for (int acceptThreadId = 0; acceptThreadId < acceptThreads; - ++acceptThreadId) { - threads.emplace_back([network_epollfd, acceptThreadId, sockfd, - &connectionId, - max_connections = config->server.max_connections, - accept_epollfd]() { - pthread_setname_np(pthread_self(), - ("accept-" + std::to_string(acceptThreadId)).c_str()); - - for (;;) { - struct epoll_event events[2]; // listen socket + shutdown pipe - int ready = epoll_wait(accept_epollfd, events, 2, -1 /* no timeout */); - - if (ready == -1) { - if (errno == EINTR) - continue; - perror("epoll_wait"); - abort(); - } - - for (int i = 0; i < ready; ++i) { - if (events[i].data.fd == shutdown_pipe[0]) { - // Don't read pipe - all threads need to see shutdown signal - return; - } - - if (events[i].data.fd == sockfd) { - // Listen socket ready - accept connections - for (;;) { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); - int fd = accept4(sockfd, (struct sockaddr *)&addr, &addrlen, - SOCK_NONBLOCK); - - if (fd == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) - break; // No more connections - if (errno == EINTR) - continue; - perror("accept4"); - abort(); - } - - // Check connection limit (0 means unlimited). Limiting - // connections is best effort - race condition between check and - // increment is acceptable for this use case - if (max_connections > 0 && - activeConnections.load(std::memory_order_relaxed) >= - max_connections) { - // Reject connection by immediately closing it - close(fd); - continue; - } - - // Enable keepalive to detect dead connections - int keepalive = 1; - if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, - sizeof(keepalive)) == -1) { - perror("setsockopt SO_KEEPALIVE"); - // Continue anyway - not critical - } - - auto conn = std::make_unique( - addr, fd, - connectionId.fetch_add(1, std::memory_order_relaxed)); - - // Transfer new connection to network thread epoll - struct epoll_event event{}; - event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; - conn->tsan_release(); - Connection *raw_conn = - conn.release(); // Get raw pointer before epoll_ctl - event.data.ptr = - raw_conn; // network epoll now owns the connection - int e = epoll_ctl(network_epollfd, EPOLL_CTL_ADD, fd, &event); - if (e == -1) { - perror("epoll_ctl"); - delete raw_conn; // Clean up connection on epoll failure - continue; - } - } - } - } - } - }); - } - - for (auto &t : threads) { - t.join(); - } - - // Cleanup - close(shutdown_pipe[0]); - close(shutdown_pipe[1]); - close(accept_epollfd); - close(network_epollfd); - close(sockfd); + std::cout << "Starting WeaselDB server..." << std::endl; + server->run(); + std::cout << "Server shutdown complete." << std::endl; + g_server = nullptr; return 0; } diff --git a/src/server.cpp b/src/server.cpp new file mode 100644 index 0000000..78b458c --- /dev/null +++ b/src/server.cpp @@ -0,0 +1,426 @@ +#include "server.hpp" +#include "connection.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +extern std::atomic activeConnections; + +std::shared_ptr Server::create(const weaseldb::Config &config, + ConnectionHandler &handler) { + // Use std::shared_ptr constructor with private access + // We can't use make_shared here because constructor is private + return std::shared_ptr(new Server(config, handler)); +} + +Server::Server(const weaseldb::Config &config, ConnectionHandler &handler) + : config_(config), handler_(handler) {} + +Server::~Server() { cleanup_resources(); } + +void Server::run() { + setup_shutdown_pipe(); + + listen_sockfd_ = create_listen_socket(); + + // Create epoll instances + network_epollfd_ = epoll_create1(EPOLL_CLOEXEC); + if (network_epollfd_ == -1) { + perror("epoll_create network"); + throw std::runtime_error("Failed to create network epoll instance"); + } + + accept_epollfd_ = epoll_create1(EPOLL_CLOEXEC); + if (accept_epollfd_ == -1) { + perror("epoll_create accept"); + throw std::runtime_error("Failed to create accept epoll instance"); + } + + // Add shutdown pipe to both epoll instances + struct epoll_event shutdown_event; + shutdown_event.events = EPOLLIN; + shutdown_event.data.fd = shutdown_pipe_[0]; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], + &shutdown_event) == -1) { + perror("epoll_ctl add shutdown to network"); + throw std::runtime_error("Failed to add shutdown pipe to network epoll"); + } + + if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], + &shutdown_event) == -1) { + perror("epoll_ctl add shutdown to accept"); + throw std::runtime_error("Failed to add shutdown pipe to accept epoll"); + } + + // Add listen socket to accept epoll + struct epoll_event listen_event; + listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; + listen_event.data.fd = listen_sockfd_; + if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, listen_sockfd_, + &listen_event) == -1) { + perror("epoll_ctl add listen socket"); + throw std::runtime_error("Failed to add listen socket to accept epoll"); + } + + start_network_threads(); + start_accept_threads(); + + // Wait for all threads to complete + for (auto &thread : threads_) { + thread.join(); + } +} + +void Server::shutdown() { + if (shutdown_pipe_[1] != -1) { + char val = 1; + // write() is async-signal-safe per POSIX - safe to use in signal handler + // Write single byte to avoid partial write complexity + while (write(shutdown_pipe_[1], &val, 1) == -1) { + if (errno != EINTR) { + abort(); // graceful shutdown didn't work. Let's go ungraceful. + } + } + } +} + +void Server::releaseBackToServer(std::unique_ptr connection) { + if (!connection) { + return; // Nothing to release + } + + // Try to get the server from the connection's weak_ptr + if (auto server = connection->server_.lock()) { + // Server still exists - release raw pointer and let server take over + Connection *raw_conn = connection.release(); + server->receiveConnectionBack(raw_conn); + } + + // If server is gone, connection will be automatically cleaned up when + // unique_ptr destructs +} + +void Server::receiveConnectionBack(Connection *connection) { + // Re-add the connection to epoll for continued processing + struct epoll_event event{}; + + if (!connection->hasMessages()) { + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + connection->tsan_release(); + event.data.ptr = connection; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == + -1) { + perror("epoll_ctl ADD in receiveConnectionBack"); + delete connection; // Clean up on failure + } +} + +void Server::setup_shutdown_pipe() { + if (pipe(shutdown_pipe_) == -1) { + perror("pipe"); + throw std::runtime_error("Failed to create shutdown pipe"); + } + + // Set both ends to close-on-exec + if (fcntl(shutdown_pipe_[0], F_SETFD, FD_CLOEXEC) == -1 || + fcntl(shutdown_pipe_[1], F_SETFD, FD_CLOEXEC) == -1) { + perror("fcntl FD_CLOEXEC"); + throw std::runtime_error("Failed to set close-on-exec for shutdown pipe"); + } +} + +int Server::create_listen_socket() { + struct addrinfo hints; + struct addrinfo *result, *rp; + int sfd, s; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ + hints.ai_socktype = SOCK_STREAM; /* stream socket */ + hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ + hints.ai_protocol = 0; /* Any protocol */ + hints.ai_canonname = nullptr; + hints.ai_addr = nullptr; + hints.ai_next = nullptr; + + s = getaddrinfo(config_.server.bind_address.c_str(), + std::to_string(config_.server.port).c_str(), &hints, &result); + if (s != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); + throw std::runtime_error("Failed to resolve bind address"); + } + + for (rp = result; rp != nullptr; rp = rp->ai_next) { + sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); + if (sfd == -1) { + continue; + } + + int val = 1; + if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) { + perror("setsockopt SO_REUSEADDR"); + close(sfd); + continue; + } + + // Enable TCP_NODELAY for low latency + if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) { + perror("setsockopt TCP_NODELAY"); + close(sfd); + continue; + } + + // Set socket to non-blocking for graceful shutdown + int flags = fcntl(sfd, F_GETFL, 0); + if (flags == -1) { + perror("fcntl F_GETFL"); + close(sfd); + continue; + } + if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { + perror("fcntl F_SETFL"); + close(sfd); + continue; + } + + if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { + break; /* Success */ + } + + close(sfd); + } + + freeaddrinfo(result); + + if (rp == nullptr) { + throw std::runtime_error("Could not bind to any address"); + } + + if (listen(sfd, SOMAXCONN) == -1) { + perror("listen"); + close(sfd); + throw std::runtime_error("Failed to listen on socket"); + } + + return sfd; +} + +void Server::start_network_threads() { + int network_threads = config_.server.network_threads; + + for (int thread_id = 0; thread_id < network_threads; ++thread_id) { + threads_.emplace_back([this, thread_id]() { + pthread_setname_np(pthread_self(), + ("network-" + std::to_string(thread_id)).c_str()); + + std::vector events(config_.server.event_batch_size); + + for (;;) { + int event_count = epoll_wait(network_epollfd_, events.data(), + config_.server.event_batch_size, -1); + if (event_count == -1) { + if (errno == EINTR) { + continue; + } + perror("epoll_wait"); + abort(); + } + + for (int i = 0; i < event_count; ++i) { + // Check for shutdown event + if (events[i].data.fd == shutdown_pipe_[0]) { + return; + } + + // Take ownership from epoll: raw pointer -> unique_ptr + std::unique_ptr conn{ + static_cast(events[i].data.ptr)}; + conn->tsan_acquire(); + events[i].data.ptr = nullptr; + const int fd = conn->getFd(); + + if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + continue; // Connection closed - unique_ptr destructor cleans up + } + + if (events[i].events & EPOLLIN) { + std::string_view data = + conn->readBytes(config_.server.max_request_size_bytes, + config_.server.read_buffer_size); + + if (data.empty()) { + // No data, error, or EOF - connection should be closed + continue; + } + + // Call handler with unique_ptr - handler can take ownership if + // needed + ProcessResult result = handler_.process_data(data, conn); + switch (result) { + case ProcessResult::Continue: + break; + case ProcessResult::CloseAfterSend: + conn->closeAfterSend(); + break; + case ProcessResult::CloseNow: + continue; // Connection will be destroyed when unique_ptr goes out + // of scope + } + + // If handler took ownership (conn is now null), don't continue + // processing + if (!conn) { + continue; + } + } + + if (events[i].events & EPOLLOUT) { + bool done = conn->writeBytes(); + if (done) { + continue; + } + } + + // Determine next epoll interest + if (!conn->hasMessages()) { + events[i].events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + } else { + events[i].events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; + } + + // Transfer ownership back to epoll + conn->tsan_release(); + Connection *raw_conn = conn.release(); + events[i].data.ptr = raw_conn; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, fd, &events[i]) == + -1) { + perror("epoll_ctl MOD"); + delete raw_conn; + continue; + } + } + } + }); + } +} + +void Server::start_accept_threads() { + int accept_threads = config_.server.accept_threads; + + for (int thread_id = 0; thread_id < accept_threads; ++thread_id) { + threads_.emplace_back([this, thread_id]() { + pthread_setname_np(pthread_self(), + ("accept-" + std::to_string(thread_id)).c_str()); + + for (;;) { + struct epoll_event events[2]; // listen socket + shutdown pipe + int ready = epoll_wait(accept_epollfd_, events, 2, -1); + + if (ready == -1) { + if (errno == EINTR) + continue; + perror("epoll_wait accept"); + abort(); + } + + for (int i = 0; i < ready; ++i) { + if (events[i].data.fd == shutdown_pipe_[0]) { + return; // Shutdown signal + } + + if (events[i].data.fd == listen_sockfd_) { + // Accept new connections + for (;;) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, + &addrlen, SOCK_NONBLOCK); + + if (fd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + break; + if (errno == EINTR) + continue; + perror("accept4"); + abort(); + } + + // Check connection limit + if (config_.server.max_connections > 0 && + activeConnections.load(std::memory_order_relaxed) >= + config_.server.max_connections) { + close(fd); + continue; + } + + // Enable keepalive + int keepalive = 1; + if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, + sizeof(keepalive)) == -1) { + perror("setsockopt SO_KEEPALIVE"); + } + + auto conn = std::make_unique( + addr, fd, + connection_id_.fetch_add(1, std::memory_order_relaxed), + &handler_, weak_from_this()); + + // Transfer to network epoll + struct epoll_event event{}; + event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; + conn->tsan_release(); + Connection *raw_conn = conn.release(); + event.data.ptr = raw_conn; + + if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == + -1) { + perror("epoll_ctl ADD"); + delete raw_conn; + continue; + } + } + } + } + } + }); + } +} + +void Server::cleanup_resources() { + if (shutdown_pipe_[0] != -1) { + close(shutdown_pipe_[0]); + shutdown_pipe_[0] = -1; + } + if (shutdown_pipe_[1] != -1) { + close(shutdown_pipe_[1]); + shutdown_pipe_[1] = -1; + } + if (network_epollfd_ != -1) { + close(network_epollfd_); + network_epollfd_ = -1; + } + if (accept_epollfd_ != -1) { + close(accept_epollfd_); + accept_epollfd_ = -1; + } + if (listen_sockfd_ != -1) { + close(listen_sockfd_); + listen_sockfd_ = -1; + } +} \ No newline at end of file diff --git a/src/server.hpp b/src/server.hpp new file mode 100644 index 0000000..96dd448 --- /dev/null +++ b/src/server.hpp @@ -0,0 +1,136 @@ +#pragma once + +#include "config.hpp" +#include "connection_handler.hpp" +#include +#include +#include +#include + +/** + * High-performance multi-threaded server for handling network connections. + * + * The Server class encapsulates all networking logic including: + * - Socket management and configuration + * - Multi-threaded epoll-based I/O multiplexing + * - Connection lifecycle management + * - Graceful shutdown handling + * + * The server uses a configurable thread pool architecture: + * - Accept threads: Handle incoming connections with load balancing + * - Network threads: Process I/O events for established connections + * + * All protocol-specific logic is delegated to the provided ConnectionHandler, + * maintaining clean separation between networking and application logic. + * + * IMPORTANT: Server uses a factory pattern and MUST be created via + * Server::create(). This ensures: + * - Proper shared_ptr semantics for enable_shared_from_this + * - Safe weak_ptr references from Connection objects + * - Prevention of accidental stack allocation that would break safety + * guarantees + */ +class Server : public std::enable_shared_from_this { +public: + /** + * Factory method to create a Server instance. + * + * This is the only way to create a Server - ensures proper shared_ptr + * semantics and prevents accidental stack allocation that would break + * weak_ptr safety. + * + * @param config Server configuration (threads, ports, limits, etc.) + * @param handler Protocol handler for processing connection data + * @return shared_ptr to the newly created Server + */ + static std::shared_ptr create(const weaseldb::Config &config, + ConnectionHandler &handler); + + /** + * Destructor ensures proper cleanup of all resources. + */ + ~Server(); + + /** + * Start the server and begin accepting connections. + * + * This method: + * - Creates and configures the listen socket + * - Starts all worker threads + * - Blocks until shutdown() is called or an error occurs + * + * @throws std::runtime_error on socket creation or configuration errors + */ + void run(); + + /** + * Initiate graceful server shutdown. + * + * This method is async-signal-safe and can be called from signal handlers. + * It signals all threads to stop processing and begin cleanup. + * + * The run() method will return after all threads have completed shutdown. + */ + void shutdown(); + + /** + * Release a connection back to its server for continued processing. + * + * This static method safely returns ownership of a connection back to its + * server. If the server has been destroyed, the connection will be safely + * cleaned up. + * + * This method is thread-safe and can be called from any thread. + * + * @param connection unique_ptr to the connection being released back + */ + static void releaseBackToServer(std::unique_ptr connection); + +private: + /** + * Private constructor - use create() factory method instead. + * + * @param config Server configuration (threads, ports, limits, etc.) + * @param handler Protocol handler for processing connection data + */ + explicit Server(const weaseldb::Config &config, ConnectionHandler &handler); + + const weaseldb::Config &config_; + ConnectionHandler &handler_; + + // Thread management + std::vector threads_; + std::atomic connection_id_{0}; + + // Shutdown coordination + int shutdown_pipe_[2] = {-1, -1}; + + // Epoll file descriptors + int network_epollfd_ = -1; + int accept_epollfd_ = -1; + int listen_sockfd_ = -1; + + // Private helper methods + void setup_shutdown_pipe(); + void setup_signal_handling(); + int create_listen_socket(); + void start_network_threads(); + void start_accept_threads(); + void cleanup_resources(); + + /** + * Called internally to return ownership to the server. + * + * This method is thread-safe and can be called from any thread. + * The connection will be re-added to the epoll for continued processing. + * + * @param connection Raw pointer to the connection being released back + */ + void receiveConnectionBack(Connection *connection); + + // Make non-copyable and non-movable + Server(const Server &) = delete; + Server &operator=(const Server &) = delete; + Server(Server &&) = delete; + Server &operator=(Server &&) = delete; +}; \ No newline at end of file