#include "server.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "connection.hpp" #include "connection_registry.hpp" // Static thread-local storage for read buffer (used across different functions) static thread_local std::vector g_read_buffer; std::shared_ptr Server::create(const weaseldb::Config &config, ConnectionHandler &handler, const std::vector &listen_fds) { // Use std::shared_ptr constructor with private access // We can't use make_shared here because constructor is private return std::shared_ptr(new Server(config, handler, listen_fds)); } Server::Server(const weaseldb::Config &config, ConnectionHandler &handler, const std::vector &provided_listen_fds) : config_(config), handler_(handler), connection_registry_(), listen_fds_(provided_listen_fds) { // Server takes ownership of all provided listen fds // Ensure all listen fds are non-blocking for safe epoll usage for (int fd : listen_fds_) { int flags = fcntl(fd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); std::abort(); } if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl F_SETFL O_NONBLOCK"); std::abort(); } } // Setup shutdown pipe for graceful shutdown setup_shutdown_pipe(); // Create epoll instances immediately for create_local_connection() support create_epoll_instances(); // If empty vector provided, listen_fds_ will be empty (no listening) // Server works purely with create_local_connection() } Server::~Server() { if (shutdown_pipe_[0] != -1) { int e = close(shutdown_pipe_[0]); if (e == -1 && errno != EINTR) { perror("close shutdown_pipe_[0]"); std::abort(); } shutdown_pipe_[0] = -1; } if (shutdown_pipe_[1] != -1) { int e = close(shutdown_pipe_[1]); if (e == -1 && errno != EINTR) { perror("close shutdown_pipe_[1]"); std::abort(); } shutdown_pipe_[1] = -1; } // Close all epoll instances for (int epollfd : epoll_fds_) { if (epollfd != -1) { int e = close(epollfd); if (e == -1 && errno != EINTR) { perror("close epollfd"); std::abort(); } } } epoll_fds_.clear(); // Close all listen sockets (Server always owns them) for (int fd : listen_fds_) { if (fd != -1) { int e = close(fd); if (e == -1 && errno != EINTR) { perror("close listen_fd"); std::abort(); } } } // Clean up unix socket files if they exist for (const auto &iface : config_.server.interfaces) { if (iface.type == weaseldb::ListenInterface::Type::Unix) { unlink(iface.path.c_str()); } } } void Server::run() { // Shutdown pipe and epoll instances are now created in constructor // Create I/O threads locally in this call frame // CRITICAL: By owning threads in run()'s call frame, we guarantee they are // joined before run() returns, eliminating any race conditions in ~Server() std::vector threads; start_io_threads(threads); // Wait for all threads to complete before returning // This ensures all I/O threads are fully stopped before the Server // destructor can be called, preventing race conditions during connection // cleanup for (auto &thread : threads) { thread.join(); } // At this point, all threads are joined and it's safe to destroy the Server } void Server::shutdown() { if (shutdown_pipe_[1] != -1) { char val = 1; // write() is async-signal-safe per POSIX - safe to use in signal handler // Write single byte to avoid partial write complexity while (write(shutdown_pipe_[1], &val, 1) == -1) { if (errno != EINTR) { std::abort(); // graceful shutdown didn't work. Let's go ungraceful. } } } } void Server::release_back_to_server(std::unique_ptr connection) { if (!connection) { return; // Nothing to release } // Try to get the server from the connection's weak_ptr if (auto server = connection->server_.lock()) { // Server still exists - pass unique_ptr directly server->receiveConnectionBack(std::move(connection)); } // If server is gone, connection will be automatically cleaned up when // unique_ptr destructs } void Server::receiveConnectionBack(std::unique_ptr connection) { if (!connection) { return; // Nothing to process } // Re-add the connection to epoll for continued processing struct epoll_event event{}; if (!connection->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT; } else { event.events = EPOLLOUT | EPOLLONESHOT; } int fd = connection->getFd(); event.data.fd = fd; // Store connection in registry before adding to epoll // This mirrors the pattern used in process_connection_batch size_t epoll_index = connection->getEpollIndex(); int epollfd = epoll_fds_[epoll_index]; connection_registry_.store(fd, std::move(connection)); if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { perror("epoll_ctl MOD in receiveConnectionBack"); // Remove from registry and clean up on failure (void)connection_registry_.remove(fd); } } int Server::create_local_connection() { int sockets[2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) { perror("socketpair"); return -1; } int server_fd = sockets[0]; // Server keeps this end int client_fd = sockets[1]; // Return this end to caller int flags = fcntl(server_fd, F_GETFL, 0); if (flags == -1) { std::fprintf(stderr, "Server::create_local_connection: fcntl F_GETFL failed\n"); std::abort(); } if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) { std::fprintf(stderr, "Server::create_local_connection: fcntl F_SETFL failed\n"); std::abort(); } // Create sockaddr_storage for the connection struct sockaddr_storage addr{}; addr.ss_family = AF_UNIX; // Calculate epoll_index for connection distribution size_t epoll_index = connection_distribution_counter_.fetch_add(1, std::memory_order_relaxed) % epoll_fds_.size(); // Create Connection object auto connection = std::unique_ptr(new Connection( addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed), epoll_index, &handler_, *this)); // Store in registry connection_registry_.store(server_fd, std::move(connection)); // Add to appropriate epoll instance struct epoll_event event{}; event.events = EPOLLIN | EPOLLONESHOT; event.data.fd = server_fd; int epollfd = epoll_fds_[epoll_index]; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, server_fd, &event) == -1) { perror("epoll_ctl ADD local connection"); connection_registry_.remove(server_fd); int e = close(server_fd); if (e == -1 && errno != EINTR) { perror("close server_fd"); std::abort(); } e = close(client_fd); if (e == -1 && errno != EINTR) { perror("close client_fd"); std::abort(); } return -1; } return client_fd; } void Server::setup_shutdown_pipe() { if (pipe(shutdown_pipe_) == -1) { perror("pipe"); std::abort(); } // Set both ends to close-on-exec if (fcntl(shutdown_pipe_[0], F_SETFD, FD_CLOEXEC) == -1 || fcntl(shutdown_pipe_[1], F_SETFD, FD_CLOEXEC) == -1) { perror("fcntl FD_CLOEXEC"); std::abort(); } } void Server::create_epoll_instances() { // Create multiple epoll instances to reduce contention epoll_fds_.resize(config_.server.epoll_instances); for (int i = 0; i < config_.server.epoll_instances; ++i) { epoll_fds_[i] = epoll_create1(EPOLL_CLOEXEC); if (epoll_fds_[i] == -1) { perror("epoll_create1"); std::abort(); } // Add shutdown pipe to each epoll instance struct epoll_event shutdown_event; shutdown_event.events = EPOLLIN; shutdown_event.data.fd = shutdown_pipe_[0]; if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == -1) { perror("epoll_ctl shutdown pipe"); std::abort(); } // Add all listen sockets to each epoll instance with EPOLLEXCLUSIVE to // prevent thundering herd for (int listen_fd : listen_fds_) { struct epoll_event listen_event; listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; listen_event.data.fd = listen_fd; if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, listen_fd, &listen_event) == -1) { perror("epoll_ctl listen socket"); std::abort(); } } } } int Server::get_epoll_for_thread(int thread_id) const { // Round-robin assignment of threads to epoll instances return epoll_fds_[thread_id % epoll_fds_.size()]; } void Server::start_io_threads(std::vector &threads) { int io_threads = config_.server.io_threads; for (int thread_id = 0; thread_id < io_threads; ++thread_id) { threads.emplace_back([this, thread_id]() { pthread_setname_np(pthread_self(), ("io-" + std::to_string(thread_id)).c_str()); // Each thread uses its assigned epoll instance (round-robin) int epollfd = get_epoll_for_thread(thread_id); std::vector events(config_.server.event_batch_size); std::vector> batch( config_.server.event_batch_size); std::vector batch_events(config_.server.event_batch_size); std::vector ready_listen_fds; // Reused across iterations to avoid allocation for (;;) { int event_count = epoll_wait(epollfd, events.data(), config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { continue; } perror("epoll_wait"); std::abort(); } ready_listen_fds.clear(); // Clear from previous iteration int batch_count = 0; for (int i = 0; i < event_count; ++i) { // Check for shutdown event if (events[i].data.fd == shutdown_pipe_[0]) { return; } // Check for new connections on any listen socket bool isListenSocket = std::find(listen_fds_.begin(), listen_fds_.end(), events[i].data.fd) != listen_fds_.end(); if (isListenSocket) { ready_listen_fds.push_back(events[i].data.fd); continue; } // Handle existing connection events int fd = events[i].data.fd; std::unique_ptr conn = connection_registry_.remove(fd); assert(conn); if (events[i].events & (EPOLLERR | EPOLLHUP)) { // unique_ptr will automatically delete on scope exit continue; } // Transfer ownership from registry to batch processing batch[batch_count] = std::move(conn); batch_events[batch_count] = events[i].events; batch_count++; } // Process existing connections in batch if (batch_count > 0) { process_connection_batch( epollfd, std::span(batch).subspan(0, batch_count), std::span(batch_events).subspan(0, batch_count)); } // Only accept on listen sockets that epoll indicates are ready for (int listen_fd : ready_listen_fds) { for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); int fd = accept4(listen_fd, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); if (fd == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; // Try next listen socket if (errno == EINTR) continue; perror("accept4"); std::abort(); } // Check connection limit if (config_.server.max_connections > 0 && active_connections_.load(std::memory_order_relaxed) >= config_.server.max_connections) { int e = close(fd); if (e == -1 && errno != EINTR) { perror("close fd (max connections)"); std::abort(); } continue; } // Enable keepalive int keepalive = 1; if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) == -1) { perror("setsockopt SO_KEEPALIVE"); } // Add to epoll with no interests struct epoll_event event{}; event.events = 0; event.data.fd = fd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) { perror("epoll_ctl ADD"); (void)connection_registry_.remove(fd); } // Transfer ownership from registry to batch processing size_t epoll_index = thread_id % epoll_fds_.size(); batch[batch_count] = std::unique_ptr(new Connection( addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), epoll_index, &handler_, *this)); batch_events[batch_count] = EPOLLIN; // New connections always start with read batch_count++; // Process batch if full if (batch_count == config_.server.event_batch_size) { process_connection_batch( epollfd, {batch.data(), (size_t)batch_count}, {batch_events.data(), (size_t)batch_count}); batch_count = 0; } } // End inner accept loop } // End loop over listen_fds_ // Process remaining accepted connections if (batch_count > 0) { process_connection_batch( epollfd, std::span(batch).subspan(0, batch_count), std::span(batch_events).subspan(0, batch_count)); batch_count = 0; } } }); } } void Server::process_connection_reads(std::unique_ptr &conn, int events) { assert(conn); // Handle EPOLLIN - read data and process it if (events & EPOLLIN) { auto buf_size = config_.server.read_buffer_size; g_read_buffer.resize(buf_size); char *buf = g_read_buffer.data(); int r = conn->readBytes(buf, buf_size); if (r < 0) { // Error or EOF - connection should be closed conn.reset(); return; } if (r == 0) { // No data available (EAGAIN) - skip read processing but continue return; } // Call handler with unique_ptr - handler can take ownership if needed handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn); // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { return; } } } void Server::process_connection_writes(std::unique_ptr &conn, int events) { assert(conn); // Send immediately if we have outgoing messages (either from EPOLLOUT or // after reading) if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { bool had_messages = conn->hasMessages(); bool error = conn->writeBytes(); if (error) { conn.reset(); // Connection should be closed return; } // Call handler with unique_ptr - handler can take ownership if needed handler_.on_write_progress(conn); // If handler took ownership (conn is now null), return true to indicate // processing is done if (!conn) { return; } // Check if buffer became empty (transition from non-empty -> empty) if (had_messages && !conn->hasMessages()) { handler_.on_write_buffer_drained(conn); // If handler took ownership (conn is now null), return if (!conn) { return; } } // Check if we should close the connection according to application if (!conn->hasMessages() && conn->shouldClose()) { conn.reset(); // Connection should be closed return; } } } void Server::process_connection_batch( int epollfd, std::span> batch, std::span events) { // First process writes for each connection for (int i = 0; i < static_cast(batch.size()); ++i) { if (batch[i]) { process_connection_writes(batch[i], events[i]); } } // Then process reads for each connection for (int i = 0; i < static_cast(batch.size()); ++i) { if (batch[i]) { process_connection_reads(batch[i], events[i]); } } // Call batch complete handler - handlers can take ownership here handler_.on_batch_complete(batch); // Transfer all remaining connections back to epoll for (auto &conn_ptr : batch) { if (conn_ptr) { int fd = conn_ptr->getFd(); struct epoll_event event{}; if (!conn_ptr->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT; } else { event.events = EPOLLOUT | EPOLLONESHOT; } event.data.fd = fd; // Use file descriptor for epoll // Put connection back in registry since handler didn't take ownership. // Must happen before epoll_ctl connection_registry_.store(fd, std::move(conn_ptr)); if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { perror("epoll_ctl MOD"); (void)connection_registry_.remove(fd); } } } }