Files
weaseldb/src/server.cpp
2025-08-23 17:32:37 -04:00

546 lines
17 KiB
C++

#include "server.hpp"
#include <csignal>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <fcntl.h>
#include <memory>
#include <netdb.h>
#include <netinet/tcp.h>
#include <pthread.h>
#include <sys/epoll.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
#include "connection.hpp"
#include "connection_registry.hpp"
// Static thread-local storage for read buffer (used across different functions)
static thread_local std::vector<char> g_read_buffer;
std::shared_ptr<Server> Server::create(const weaseldb::Config &config,
ConnectionHandler &handler,
const std::vector<int> &listen_fds) {
// Use std::shared_ptr constructor with private access
// We can't use make_shared here because constructor is private
return std::shared_ptr<Server>(new Server(config, handler, listen_fds));
}
Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
const std::vector<int> &provided_listen_fds)
: config_(config), handler_(handler), connection_registry_(),
listen_fds_(provided_listen_fds) {
// Server takes ownership of all provided listen fds
// Ensure all listen fds are non-blocking for safe epoll usage
for (int fd : listen_fds_) {
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) {
perror("fcntl F_GETFL");
std::abort();
}
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
perror("fcntl F_SETFL O_NONBLOCK");
std::abort();
}
}
// Setup shutdown pipe for graceful shutdown
setup_shutdown_pipe();
// Create epoll instances immediately for createLocalConnection() support
create_epoll_instances();
// If empty vector provided, listen_fds_ will be empty (no listening)
// Server works purely with createLocalConnection()
}
Server::~Server() {
if (shutdown_pipe_[0] != -1) {
int e;
do {
e = close(shutdown_pipe_[0]);
} while (e == -1 && errno == EINTR);
shutdown_pipe_[0] = -1;
}
if (shutdown_pipe_[1] != -1) {
int e;
do {
e = close(shutdown_pipe_[1]);
} while (e == -1 && errno == EINTR);
shutdown_pipe_[1] = -1;
}
// Close all epoll instances
for (int epollfd : epoll_fds_) {
if (epollfd != -1) {
int e;
do {
e = close(epollfd);
} while (e == -1 && errno == EINTR);
}
}
epoll_fds_.clear();
// Close all listen sockets (Server always owns them)
for (int fd : listen_fds_) {
if (fd != -1) {
int e;
do {
e = close(fd);
} while (e == -1 && errno == EINTR);
}
}
// Clean up unix socket file if it exists
if (!config_.server.unix_socket_path.empty()) {
unlink(config_.server.unix_socket_path.c_str());
}
}
void Server::run() {
// Shutdown pipe and epoll instances are now created in constructor
// Create I/O threads locally in this call frame
// CRITICAL: By owning threads in run()'s call frame, we guarantee they are
// joined before run() returns, eliminating any race conditions in ~Server()
std::vector<std::thread> threads;
start_io_threads(threads);
// Wait for all threads to complete before returning
// This ensures all I/O threads are fully stopped before the Server
// destructor can be called, preventing race conditions during connection
// cleanup
for (auto &thread : threads) {
thread.join();
}
// At this point, all threads are joined and it's safe to destroy the Server
}
void Server::shutdown() {
if (shutdown_pipe_[1] != -1) {
char val = 1;
// write() is async-signal-safe per POSIX - safe to use in signal handler
// Write single byte to avoid partial write complexity
while (write(shutdown_pipe_[1], &val, 1) == -1) {
if (errno != EINTR) {
abort(); // graceful shutdown didn't work. Let's go ungraceful.
}
}
}
}
void Server::releaseBackToServer(std::unique_ptr<Connection> connection) {
if (!connection) {
return; // Nothing to release
}
// Try to get the server from the connection's weak_ptr
if (auto server = connection->server_.lock()) {
// Server still exists - pass unique_ptr directly
server->receiveConnectionBack(std::move(connection));
}
// If server is gone, connection will be automatically cleaned up when
// unique_ptr destructs
}
void Server::receiveConnectionBack(std::unique_ptr<Connection> connection) {
if (!connection) {
return; // Nothing to process
}
// Re-add the connection to epoll for continued processing
struct epoll_event event{};
if (!connection->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT;
} else {
event.events = EPOLLOUT | EPOLLONESHOT;
}
int fd = connection->getFd();
event.data.fd = fd;
// Store connection in registry before adding to epoll
// This mirrors the pattern used in process_connection_batch
size_t epoll_index = connection->getEpollIndex();
int epollfd = epoll_fds_[epoll_index];
connection_registry_.store(fd, std::move(connection));
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
perror("epoll_ctl MOD in receiveConnectionBack");
// Remove from registry and clean up on failure
(void)connection_registry_.remove(fd);
}
}
int Server::createLocalConnection() {
int sockets[2];
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
perror("socketpair");
return -1;
}
int server_fd = sockets[0]; // Server keeps this end
int client_fd = sockets[1]; // Return this end to caller
int flags = fcntl(server_fd, F_GETFL, 0);
if (flags == -1) {
std::fprintf(stderr,
"Server::createLocalConnection: fcntl F_GETFL failed\n");
std::abort();
}
if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
std::fprintf(stderr,
"Server::createLocalConnection: fcntl F_SETFL failed\n");
std::abort();
}
// Create sockaddr_storage for the connection
struct sockaddr_storage addr{};
addr.ss_family = AF_UNIX;
// Calculate epoll_index for connection distribution
size_t epoll_index =
connection_distribution_counter_.fetch_add(1, std::memory_order_relaxed) %
epoll_fds_.size();
// Create Connection object
auto connection = std::unique_ptr<Connection>(new Connection(
addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed),
epoll_index, &handler_, *this));
// Store in registry
connection_registry_.store(server_fd, std::move(connection));
// Add to appropriate epoll instance
struct epoll_event event{};
event.events = EPOLLIN | EPOLLONESHOT;
event.data.fd = server_fd;
int epollfd = epoll_fds_[epoll_index];
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
perror("epoll_ctl ADD local connection");
connection_registry_.remove(server_fd);
int e;
do {
e = close(server_fd);
} while (e == -1 && errno == EINTR);
do {
e = close(client_fd);
} while (e == -1 && errno == EINTR);
return -1;
}
return client_fd;
}
void Server::setup_shutdown_pipe() {
if (pipe(shutdown_pipe_) == -1) {
perror("pipe");
std::abort();
}
// Set both ends to close-on-exec
if (fcntl(shutdown_pipe_[0], F_SETFD, FD_CLOEXEC) == -1 ||
fcntl(shutdown_pipe_[1], F_SETFD, FD_CLOEXEC) == -1) {
perror("fcntl FD_CLOEXEC");
std::abort();
}
}
void Server::create_epoll_instances() {
// Create multiple epoll instances to reduce contention
epoll_fds_.resize(config_.server.epoll_instances);
for (int i = 0; i < config_.server.epoll_instances; ++i) {
epoll_fds_[i] = epoll_create1(EPOLL_CLOEXEC);
if (epoll_fds_[i] == -1) {
perror("epoll_create1");
std::abort();
}
// Add shutdown pipe to each epoll instance
struct epoll_event shutdown_event;
shutdown_event.events = EPOLLIN;
shutdown_event.data.fd = shutdown_pipe_[0];
if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, shutdown_pipe_[0],
&shutdown_event) == -1) {
perror("epoll_ctl shutdown pipe");
std::abort();
}
// Add all listen sockets to each epoll instance with EPOLLEXCLUSIVE to
// prevent thundering herd
for (int listen_fd : listen_fds_) {
struct epoll_event listen_event;
listen_event.events = EPOLLIN | EPOLLEXCLUSIVE;
listen_event.data.fd = listen_fd;
if (epoll_ctl(epoll_fds_[i], EPOLL_CTL_ADD, listen_fd, &listen_event) ==
-1) {
perror("epoll_ctl listen socket");
std::abort();
}
}
}
}
int Server::get_epoll_for_thread(int thread_id) const {
// Round-robin assignment of threads to epoll instances
return epoll_fds_[thread_id % epoll_fds_.size()];
}
void Server::start_io_threads(std::vector<std::thread> &threads) {
int io_threads = config_.server.io_threads;
for (int thread_id = 0; thread_id < io_threads; ++thread_id) {
threads.emplace_back([this, thread_id]() {
pthread_setname_np(pthread_self(),
("io-" + std::to_string(thread_id)).c_str());
// Each thread uses its assigned epoll instance (round-robin)
int epollfd = get_epoll_for_thread(thread_id);
std::vector<epoll_event> events(config_.server.event_batch_size);
std::vector<std::unique_ptr<Connection>> batch(
config_.server.event_batch_size);
std::vector<int> batch_events(config_.server.event_batch_size);
std::vector<int>
ready_listen_fds; // Reused across iterations to avoid allocation
for (;;) {
int event_count = epoll_wait(epollfd, events.data(),
config_.server.event_batch_size, -1);
if (event_count == -1) {
if (errno == EINTR) {
continue;
}
perror("epoll_wait");
abort();
}
ready_listen_fds.clear(); // Clear from previous iteration
int batch_count = 0;
for (int i = 0; i < event_count; ++i) {
// Check for shutdown event
if (events[i].data.fd == shutdown_pipe_[0]) {
return;
}
// Check for new connections on any listen socket
bool isListenSocket =
std::find(listen_fds_.begin(), listen_fds_.end(),
events[i].data.fd) != listen_fds_.end();
if (isListenSocket) {
ready_listen_fds.push_back(events[i].data.fd);
continue;
}
// Handle existing connection events
int fd = events[i].data.fd;
std::unique_ptr<Connection> conn = connection_registry_.remove(fd);
assert(conn);
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
// unique_ptr will automatically delete on scope exit
continue;
}
// Transfer ownership from registry to batch processing
batch[batch_count] = std::move(conn);
batch_events[batch_count] = events[i].events;
batch_count++;
}
// Process existing connections in batch
if (batch_count > 0) {
process_connection_batch(
epollfd, std::span(batch).subspan(0, batch_count),
std::span(batch_events).subspan(0, batch_count));
}
// Only accept on listen sockets that epoll indicates are ready
for (int listen_fd : ready_listen_fds) {
for (;;) {
struct sockaddr_storage addr;
socklen_t addrlen = sizeof(addr);
int fd = accept4(listen_fd, (struct sockaddr *)&addr, &addrlen,
SOCK_NONBLOCK);
if (fd == -1) {
if (errno == EAGAIN || errno == EWOULDBLOCK)
break; // Try next listen socket
if (errno == EINTR)
continue;
perror("accept4");
abort();
}
// Check connection limit
if (config_.server.max_connections > 0 &&
active_connections_.load(std::memory_order_relaxed) >=
config_.server.max_connections) {
int e;
do {
e = close(fd);
} while (e == -1 && errno == EINTR);
continue;
}
// Enable keepalive
int keepalive = 1;
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &keepalive,
sizeof(keepalive)) == -1) {
perror("setsockopt SO_KEEPALIVE");
}
// Add to epoll with no interests
struct epoll_event event{};
event.events = 0;
event.data.fd = fd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl ADD");
(void)connection_registry_.remove(fd);
}
// Transfer ownership from registry to batch processing
size_t epoll_index = thread_id % epoll_fds_.size();
batch[batch_count] = std::unique_ptr<Connection>(new Connection(
addr, fd,
connection_id_.fetch_add(1, std::memory_order_relaxed),
epoll_index, &handler_, *this));
batch_events[batch_count] =
EPOLLIN; // New connections always start with read
batch_count++;
// Process batch if full
if (batch_count == config_.server.event_batch_size) {
process_connection_batch(
epollfd, {batch.data(), (size_t)batch_count},
{batch_events.data(), (size_t)batch_count});
batch_count = 0;
}
} // End inner accept loop
} // End loop over listen_fds_
// Process remaining accepted connections
if (batch_count > 0) {
process_connection_batch(
epollfd, std::span(batch).subspan(0, batch_count),
std::span(batch_events).subspan(0, batch_count));
batch_count = 0;
}
}
});
}
}
void Server::process_connection_reads(std::unique_ptr<Connection> &conn,
int events) {
assert(conn);
// Handle EPOLLIN - read data and process it
if (events & EPOLLIN) {
auto buf_size = config_.server.read_buffer_size;
g_read_buffer.resize(buf_size);
char *buf = g_read_buffer.data();
int r = conn->readBytes(buf, buf_size);
if (r < 0) {
// Error or EOF - connection should be closed
conn.reset();
return;
}
if (r == 0) {
// No data available (EAGAIN) - skip read processing but continue
return;
}
// Call handler with unique_ptr - handler can take ownership if needed
handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn);
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return;
}
}
}
void Server::process_connection_writes(std::unique_ptr<Connection> &conn,
int events) {
assert(conn);
// Send immediately if we have outgoing messages (either from EPOLLOUT or
// after reading)
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
bool error = conn->writeBytes();
if (error) {
conn.reset(); // Connection should be closed
return;
}
// Call handler with unique_ptr - handler can take ownership if needed
handler_.on_write_progress(conn);
// If handler took ownership (conn is now null), return true to indicate
// processing is done
if (!conn) {
return;
}
// Check if we should close the connection according to application
if (!conn->hasMessages() && conn->closeConnection_) {
conn.reset(); // Connection should be closed
return;
}
}
}
void Server::process_connection_batch(
int epollfd, std::span<std::unique_ptr<Connection>> batch,
std::span<const int> events) {
// First process writes for each connection
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) {
process_connection_writes(batch[i], events[i]);
}
}
// Then process reads for each connection
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) {
process_connection_reads(batch[i], events[i]);
}
}
// Call post-batch handler - handlers can take ownership here
handler_.on_post_batch(batch);
// Transfer all remaining connections back to epoll
for (auto &conn_ptr : batch) {
if (conn_ptr) {
int fd = conn_ptr->getFd();
struct epoll_event event{};
if (!conn_ptr->hasMessages()) {
event.events = EPOLLIN | EPOLLONESHOT;
} else {
event.events = EPOLLOUT | EPOLLONESHOT;
}
event.data.fd = fd; // Use file descriptor for epoll
// Put connection back in registry since handler didn't take ownership.
// Must happen before epoll_ctl
connection_registry_.store(fd, std::move(conn_ptr));
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
perror("epoll_ctl MOD");
(void)connection_registry_.remove(fd);
}
}
}
}