#include "server.hpp" #include "connection.hpp" #include "connection_registry.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include std::shared_ptr Server::create(const weaseldb::Config &config, ConnectionHandler &handler) { // Use std::shared_ptr constructor with private access // We can't use make_shared here because constructor is private return std::shared_ptr(new Server(config, handler)); } Server::Server(const weaseldb::Config &config, ConnectionHandler &handler) : config_(config), handler_(handler), connection_registry_() {} Server::~Server() { if (shutdown_pipe_[0] != -1) { close(shutdown_pipe_[0]); shutdown_pipe_[0] = -1; } if (shutdown_pipe_[1] != -1) { close(shutdown_pipe_[1]); shutdown_pipe_[1] = -1; } // Close all epoll instances for (int epollfd : epoll_fds_) { if (epollfd != -1) { close(epollfd); } } epoll_fds_.clear(); if (listen_sockfd_ != -1) { close(listen_sockfd_); listen_sockfd_ = -1; } // Clean up unix socket file if it exists if (!config_.server.unix_socket_path.empty()) { unlink(config_.server.unix_socket_path.c_str()); } } void Server::run() { setup_shutdown_pipe(); listen_sockfd_ = create_listen_socket(); create_epoll_instances(); // Create I/O threads locally in this call frame // CRITICAL: By owning threads in run()'s call frame, we guarantee they are // joined before run() returns, eliminating any race conditions in ~Server() std::vector threads; start_io_threads(threads); // Wait for all threads to complete before returning // This ensures all I/O threads are fully stopped before the Server // destructor can be called, preventing race conditions during connection // cleanup for (auto &thread : threads) { thread.join(); } // At this point, all threads are joined and it's safe to destroy the Server } void Server::shutdown() { if (shutdown_pipe_[1] != -1) { char val = 1; // write() is async-signal-safe per POSIX - safe to use in signal handler // Write single byte to avoid partial write complexity while (write(shutdown_pipe_[1], &val, 1) == -1) { if (errno != EINTR) { abort(); // graceful shutdown didn't work. Let's go ungraceful. } } } } void Server::releaseBackToServer(std::unique_ptr connection) { if (!connection) { return; // Nothing to release } // Try to get the server from the connection's weak_ptr if (auto server = connection->server_.lock()) { // Server still exists - pass unique_ptr directly server->receiveConnectionBack(std::move(connection)); } // If server is gone, connection will be automatically cleaned up when // unique_ptr destructs } void Server::receiveConnectionBack(std::unique_ptr connection) { if (!connection) { return; // Nothing to process } // Re-add the connection to epoll for continued processing struct epoll_event event{}; if (!connection->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT; } else { event.events = EPOLLOUT | EPOLLONESHOT; } connection->tsan_release(); int fd = connection->getFd(); event.data.fd = fd; // Store connection in registry before adding to epoll // This mirrors the pattern used in process_connection_batch size_t epoll_index = connection->getEpollIndex(); int epollfd = epoll_fds_[epoll_index]; connection_registry_.store(fd, std::move(connection)); if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { perror("epoll_ctl MOD in receiveConnectionBack"); // Remove from registry and clean up on failure (void)connection_registry_.remove(fd); } } void Server::setup_shutdown_pipe() { if (pipe(shutdown_pipe_) == -1) { perror("pipe"); throw std::runtime_error("Failed to create shutdown pipe"); } // Set both ends to close-on-exec if (fcntl(shutdown_pipe_[0], F_SETFD, FD_CLOEXEC) == -1 || fcntl(shutdown_pipe_[1], F_SETFD, FD_CLOEXEC) == -1) { perror("fcntl FD_CLOEXEC"); throw std::runtime_error("Failed to set close-on-exec for shutdown pipe"); } } int Server::create_listen_socket() { int sfd; // Check if unix socket path is specified if (!config_.server.unix_socket_path.empty()) { // Create unix socket sfd = socket(AF_UNIX, SOCK_STREAM, 0); if (sfd == -1) { perror("socket"); throw std::runtime_error("Failed to create unix socket"); } // Remove existing socket file if it exists unlink(config_.server.unix_socket_path.c_str()); struct sockaddr_un addr; memset(&addr, 0, sizeof(addr)); addr.sun_family = AF_UNIX; if (config_.server.unix_socket_path.length() >= sizeof(addr.sun_path)) { close(sfd); throw std::runtime_error("Unix socket path too long"); } strncpy(addr.sun_path, config_.server.unix_socket_path.c_str(), sizeof(addr.sun_path) - 1); // Set socket to non-blocking for graceful shutdown int flags = fcntl(sfd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); close(sfd); throw std::runtime_error("Failed to get socket flags"); } if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl F_SETFL"); close(sfd); throw std::runtime_error("Failed to set socket non-blocking"); } if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { perror("bind"); close(sfd); throw std::runtime_error("Failed to bind unix socket"); } if (listen(sfd, SOMAXCONN) == -1) { perror("listen"); close(sfd); throw std::runtime_error("Failed to listen on unix socket"); } return sfd; } // TCP socket creation (original code) struct addrinfo hints; struct addrinfo *result, *rp; int s; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ hints.ai_canonname = nullptr; hints.ai_addr = nullptr; hints.ai_next = nullptr; s = getaddrinfo(config_.server.bind_address.c_str(), std::to_string(config_.server.port).c_str(), &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); throw std::runtime_error("Failed to resolve bind address"); } for (rp = result; rp != nullptr; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } int val = 1; if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) { perror("setsockopt SO_REUSEADDR"); close(sfd); continue; } // Enable TCP_NODELAY for low latency (only for TCP sockets) if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) { if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) { perror("setsockopt TCP_NODELAY"); close(sfd); continue; } } // Set socket to non-blocking for graceful shutdown int flags = fcntl(sfd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); close(sfd); continue; } if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl F_SETFL"); close(sfd); continue; } if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } close(sfd); } freeaddrinfo(result); if (rp == nullptr) { throw std::runtime_error("Could not bind to any address"); } if (listen(sfd, SOMAXCONN) == -1) { perror("listen"); close(sfd); throw std::runtime_error("Failed to listen on socket"); } return sfd; } void Server::create_epoll_instances() { // Create multiple epoll instances to reduce contention epoll_fds_.resize(config_.server.epoll_instances); for (int i = 0; i < config_.server.epoll_instances; ++i) { epoll_fds_[i] = epoll_create1(EPOLL_CLOEXEC); if (epoll_fds_[i] == -1) { perror("epoll_create"); throw std::runtime_error("Failed to create epoll instance"); } // Add shutdown pipe to each epoll instance struct epoll_event shutdown_event; shutdown_event.events = EPOLLIN; shutdown_event.data.fd = shutdown_pipe_[0]; if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == -1) { perror("epoll_ctl shutdown pipe"); throw std::runtime_error("Failed to add shutdown pipe to epoll"); } // Add listen socket to each epoll instance with EPOLLEXCLUSIVE to prevent // thundering herd struct epoll_event listen_event; listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; listen_event.data.fd = listen_sockfd_; if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) { perror("epoll_ctl listen socket"); throw std::runtime_error("Failed to add listen socket to epoll"); } } } int Server::get_epoll_for_thread(int thread_id) const { // Round-robin assignment of threads to epoll instances return epoll_fds_[thread_id % epoll_fds_.size()]; } void Server::start_io_threads(std::vector &threads) { int io_threads = config_.server.io_threads; for (int thread_id = 0; thread_id < io_threads; ++thread_id) { threads.emplace_back([this, thread_id]() { pthread_setname_np(pthread_self(), ("io-" + std::to_string(thread_id)).c_str()); // Each thread uses its assigned epoll instance (round-robin) int epollfd = get_epoll_for_thread(thread_id); struct epoll_event events[config_.server.event_batch_size]; std::unique_ptr batch[config_.server.event_batch_size]; int batch_events[config_.server.event_batch_size]; for (;;) { int event_count = epoll_wait(epollfd, events, config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { continue; } perror("epoll_wait"); abort(); } bool listenReady = false; int batch_count = 0; for (int i = 0; i < event_count; ++i) { // Check for shutdown event if (events[i].data.fd == shutdown_pipe_[0]) { return; } // Check for new connections if (events[i].data.fd == listen_sockfd_) { listenReady = true; continue; } // Handle existing connection events int fd = events[i].data.fd; std::unique_ptr conn = connection_registry_.remove(fd); if (conn) { conn->tsan_acquire(); } if (events[i].events & (EPOLLERR | EPOLLHUP)) { // unique_ptr will automatically delete on scope exit continue; } // Transfer ownership from registry to batch processing batch[batch_count] = std::move(conn); batch_events[batch_count] = events[i].events; batch_count++; } // Process existing connections in batch if (batch_count > 0) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, false); } // Reuse same batch array for accepting connections if (listenReady) { for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); if (fd == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; perror("accept4"); abort(); } // Check connection limit if (config_.server.max_connections > 0 && active_connections_.load(std::memory_order_relaxed) >= config_.server.max_connections) { close(fd); continue; } // Enable keepalive int keepalive = 1; if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) == -1) { perror("setsockopt SO_KEEPALIVE"); } // Transfer ownership from registry to batch processing size_t epoll_index = thread_id % epoll_fds_.size(); batch[batch_count] = std::unique_ptr(new Connection( addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), epoll_index, &handler_, weak_from_this())); batch_events[batch_count] = EPOLLIN; // New connections always start with read batch_count++; // Process batch if full if (batch_count == config_.server.event_batch_size) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, true); batch_count = 0; } } // Process remaining accepted connections if (batch_count > 0) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, {batch_events, (size_t)batch_count}, true); batch_count = 0; } } } }); } } void Server::process_connection_io(std::unique_ptr &conn, int events) { assert(conn); // Handle EPOLLIN - read data and process it if (events & EPOLLIN) { auto buf_size = config_.server.read_buffer_size; char buf[buf_size]; int r = conn->readBytes(buf, buf_size); if (r < 0) { // Error or EOF - connection should be closed conn.reset(); return; } if (r == 0) { // No data available (EAGAIN) - skip read processing but continue return; } // Call handler with unique_ptr - handler can take ownership if needed handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn); // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { return; } } // Send immediately if we have outgoing messages (either from EPOLLOUT or // after reading) if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { bool error = conn->writeBytes(); if (error) { conn.reset(); // Connection should be closed return; } // Call handler with unique_ptr - handler can take ownership if needed handler_.on_write_progress(conn); // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { return; } // Check if we should close the connection according to application if (!conn->hasMessages() && conn->closeConnection_) { conn.reset(); // Connection should be closed return; } } } void Server::process_connection_batch( int epollfd, std::span> batch, std::span events, bool is_new) { // First process I/O for each connection for (size_t i = 0; i < batch.size(); ++i) { if (batch[i]) { process_connection_io(batch[i], events[i]); } } // Call post-batch handler - handlers can take ownership here handler_.on_post_batch(batch); // Transfer all remaining connections back to epoll for (auto &conn_ptr : batch) { if (conn_ptr) { int fd = conn_ptr->getFd(); struct epoll_event event{}; if (!conn_ptr->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT; } else { event.events = EPOLLOUT | EPOLLONESHOT; } conn_ptr->tsan_release(); event.data.fd = fd; // Use file descriptor for epoll // Put connection back in registry since handler didn't take ownership. // Must happen before epoll_ctl connection_registry_.store(fd, std::move(conn_ptr)); int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; if (epoll_ctl(epollfd, epoll_op, fd, &event) == -1) { perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD"); (void)connection_registry_.remove(fd); } } } }