#include "server.hpp" #include "connection.hpp" #include #include #include #include #include #include #include #include #include #include #include #include extern std::atomic activeConnections; std::shared_ptr Server::create(const weaseldb::Config &config, ConnectionHandler &handler) { // Use std::shared_ptr constructor with private access // We can't use make_shared here because constructor is private return std::shared_ptr(new Server(config, handler)); } Server::Server(const weaseldb::Config &config, ConnectionHandler &handler) : config_(config), handler_(handler) {} Server::~Server() { cleanup_resources(); } void Server::run() { setup_shutdown_pipe(); listen_sockfd_ = create_listen_socket(); // Create epoll instances network_epollfd_ = epoll_create1(EPOLL_CLOEXEC); if (network_epollfd_ == -1) { perror("epoll_create network"); throw std::runtime_error("Failed to create network epoll instance"); } accept_epollfd_ = epoll_create1(EPOLL_CLOEXEC); if (accept_epollfd_ == -1) { perror("epoll_create accept"); throw std::runtime_error("Failed to create accept epoll instance"); } // Add shutdown pipe to both epoll instances struct epoll_event shutdown_event; shutdown_event.events = EPOLLIN; shutdown_event.data.fd = shutdown_pipe_[0]; if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == -1) { perror("epoll_ctl add shutdown to network"); throw std::runtime_error("Failed to add shutdown pipe to network epoll"); } if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, shutdown_pipe_[0], &shutdown_event) == -1) { perror("epoll_ctl add shutdown to accept"); throw std::runtime_error("Failed to add shutdown pipe to accept epoll"); } // Add listen socket to accept epoll struct epoll_event listen_event; listen_event.events = EPOLLIN | EPOLLEXCLUSIVE; listen_event.data.fd = listen_sockfd_; if (epoll_ctl(accept_epollfd_, EPOLL_CTL_ADD, listen_sockfd_, &listen_event) == -1) { perror("epoll_ctl add listen socket"); throw std::runtime_error("Failed to add listen socket to accept epoll"); } start_network_threads(); start_accept_threads(); // Wait for all threads to complete for (auto &thread : threads_) { thread.join(); } } void Server::shutdown() { if (shutdown_pipe_[1] != -1) { char val = 1; // write() is async-signal-safe per POSIX - safe to use in signal handler // Write single byte to avoid partial write complexity while (write(shutdown_pipe_[1], &val, 1) == -1) { if (errno != EINTR) { abort(); // graceful shutdown didn't work. Let's go ungraceful. } } } } void Server::releaseBackToServer(std::unique_ptr connection) { if (!connection) { return; // Nothing to release } // Try to get the server from the connection's weak_ptr if (auto server = connection->server_.lock()) { // Server still exists - release raw pointer and let server take over Connection *raw_conn = connection.release(); server->receiveConnectionBack(raw_conn); } // If server is gone, connection will be automatically cleaned up when // unique_ptr destructs } void Server::receiveConnectionBack(Connection *connection) { // Re-add the connection to epoll for continued processing struct epoll_event event{}; if (!connection->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; } connection->tsan_release(); event.data.ptr = connection; if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, connection->getFd(), &event) == -1) { perror("epoll_ctl ADD in receiveConnectionBack"); delete connection; // Clean up on failure } } void Server::setup_shutdown_pipe() { if (pipe(shutdown_pipe_) == -1) { perror("pipe"); throw std::runtime_error("Failed to create shutdown pipe"); } // Set both ends to close-on-exec if (fcntl(shutdown_pipe_[0], F_SETFD, FD_CLOEXEC) == -1 || fcntl(shutdown_pipe_[1], F_SETFD, FD_CLOEXEC) == -1) { perror("fcntl FD_CLOEXEC"); throw std::runtime_error("Failed to set close-on-exec for shutdown pipe"); } } int Server::create_listen_socket() { struct addrinfo hints; struct addrinfo *result, *rp; int sfd, s; memset(&hints, 0, sizeof(hints)); hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */ hints.ai_socktype = SOCK_STREAM; /* stream socket */ hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */ hints.ai_protocol = 0; /* Any protocol */ hints.ai_canonname = nullptr; hints.ai_addr = nullptr; hints.ai_next = nullptr; s = getaddrinfo(config_.server.bind_address.c_str(), std::to_string(config_.server.port).c_str(), &hints, &result); if (s != 0) { fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s)); throw std::runtime_error("Failed to resolve bind address"); } for (rp = result; rp != nullptr; rp = rp->ai_next) { sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol); if (sfd == -1) { continue; } int val = 1; if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) { perror("setsockopt SO_REUSEADDR"); close(sfd); continue; } // Enable TCP_NODELAY for low latency if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) { perror("setsockopt TCP_NODELAY"); close(sfd); continue; } // Set socket to non-blocking for graceful shutdown int flags = fcntl(sfd, F_GETFL, 0); if (flags == -1) { perror("fcntl F_GETFL"); close(sfd); continue; } if (fcntl(sfd, F_SETFL, flags | O_NONBLOCK) == -1) { perror("fcntl F_SETFL"); close(sfd); continue; } if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) { break; /* Success */ } close(sfd); } freeaddrinfo(result); if (rp == nullptr) { throw std::runtime_error("Could not bind to any address"); } if (listen(sfd, SOMAXCONN) == -1) { perror("listen"); close(sfd); throw std::runtime_error("Failed to listen on socket"); } return sfd; } void Server::start_network_threads() { int network_threads = config_.server.network_threads; for (int thread_id = 0; thread_id < network_threads; ++thread_id) { threads_.emplace_back([this, thread_id]() { pthread_setname_np(pthread_self(), ("network-" + std::to_string(thread_id)).c_str()); struct epoll_event events[config_.server.event_batch_size]; std::unique_ptr batch[config_.server.event_batch_size]; for (;;) { int event_count = epoll_wait(network_epollfd_, events, config_.server.event_batch_size, -1); if (event_count == -1) { if (errno == EINTR) { continue; } perror("epoll_wait"); abort(); } int batch_count = 0; for (int i = 0; i < event_count; ++i) { // Check for shutdown event if (events[i].data.fd == shutdown_pipe_[0]) { return; } // Take ownership from epoll: raw pointer -> unique_ptr std::unique_ptr conn{ static_cast(events[i].data.ptr)}; conn->tsan_acquire(); events[i].data.ptr = nullptr; if (events[i].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { continue; // Connection closed - unique_ptr destructor cleans up } if (events[i].events & EPOLLIN) { std::string_view data = conn->readBytes(config_.server.max_request_size_bytes, config_.server.read_buffer_size); if (data.empty()) { // No data, error, or EOF - connection should be closed continue; } // Call handler with unique_ptr - handler can take ownership if // needed handler_.on_data_arrived(data, conn); // If handler took ownership (conn is now null), don't continue // processing if (!conn) { continue; } } if (events[i].events & EPOLLOUT) { bool error = conn->writeBytes(); if (error) { continue; } // Call handler with unique_ptr - handler can take ownership if // needed handler_.on_write_progress(conn); // If handler took ownership (conn is now null), don't continue // processing if (!conn) { continue; } // Check if we should close the connection according to application if (!conn->hasMessages() && conn->closeConnection_) { continue; } } batch[batch_count++] = std::move(conn); } handler_.on_post_batch({batch, (size_t)batch_count}); for (int i = 0; i < batch_count; ++i) { auto &conn = batch[i]; if (!conn) { continue; } // Determine next epoll interest struct epoll_event event{}; if (!conn->hasMessages()) { event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; } else { event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; } // Transfer ownership back to epoll conn->tsan_release(); Connection *raw_conn = conn.release(); event.data.ptr = raw_conn; if (epoll_ctl(network_epollfd_, EPOLL_CTL_MOD, raw_conn->getFd(), &event) == -1) { perror("epoll_ctl MOD"); delete raw_conn; continue; } } } }); } } void Server::start_accept_threads() { int accept_threads = config_.server.accept_threads; for (int thread_id = 0; thread_id < accept_threads; ++thread_id) { threads_.emplace_back([this, thread_id]() { pthread_setname_np(pthread_self(), ("accept-" + std::to_string(thread_id)).c_str()); for (;;) { struct epoll_event events[2]; // listen socket + shutdown pipe int ready = epoll_wait(accept_epollfd_, events, 2, -1); if (ready == -1) { if (errno == EINTR) continue; perror("epoll_wait accept"); abort(); } for (int i = 0; i < ready; ++i) { if (events[i].data.fd == shutdown_pipe_[0]) { return; // Shutdown signal } if (events[i].data.fd == listen_sockfd_) { // Accept new connections for (;;) { struct sockaddr_storage addr; socklen_t addrlen = sizeof(addr); int fd = accept4(listen_sockfd_, (struct sockaddr *)&addr, &addrlen, SOCK_NONBLOCK); if (fd == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) break; if (errno == EINTR) continue; perror("accept4"); abort(); } // Check connection limit if (config_.server.max_connections > 0 && activeConnections.load(std::memory_order_relaxed) >= config_.server.max_connections) { close(fd); continue; } // Enable keepalive int keepalive = 1; if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive, sizeof(keepalive)) == -1) { perror("setsockopt SO_KEEPALIVE"); } auto conn = Connection::createForServer( addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), &handler_, weak_from_this()); // Transfer to network epoll struct epoll_event event{}; event.events = EPOLLIN | EPOLLONESHOT | EPOLLRDHUP; conn->tsan_release(); Connection *raw_conn = conn.release(); event.data.ptr = raw_conn; if (epoll_ctl(network_epollfd_, EPOLL_CTL_ADD, fd, &event) == -1) { perror("epoll_ctl ADD"); delete raw_conn; continue; } } } } } }); } } void Server::cleanup_resources() { if (shutdown_pipe_[0] != -1) { close(shutdown_pipe_[0]); shutdown_pipe_[0] = -1; } if (shutdown_pipe_[1] != -1) { close(shutdown_pipe_[1]); shutdown_pipe_[1] = -1; } if (network_epollfd_ != -1) { close(network_epollfd_); network_epollfd_ = -1; } if (accept_epollfd_ != -1) { close(accept_epollfd_); accept_epollfd_ = -1; } if (listen_sockfd_ != -1) { close(listen_sockfd_); listen_sockfd_ = -1; } }