Simplify connection registry

This commit is contained in:
2025-08-21 19:53:18 -04:00
parent d1b1e6d589
commit 9ee23fdc46
4 changed files with 43 additions and 102 deletions

View File

@@ -12,46 +12,45 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) {
}
max_fds_ = rlim.rlim_cur;
// Calculate size rounded up to page boundary
size_t array_size = max_fds_ * sizeof(Connection *);
size_t page_size = getpagesize();
size_t aligned_size = (array_size + page_size - 1) & ~(page_size - 1);
// Allocate virtual address space using mmap
// This reserves virtual memory but doesn't allocate physical pages until
// touched
// MAP_ANONYMOUS provides zero-initialized pages on-demand (lazy allocation)
connections_ = static_cast<Connection **>(
mmap(nullptr, max_fds_ * sizeof(Connection *), PROT_READ | PROT_WRITE,
mmap(nullptr, aligned_size, PROT_READ | PROT_WRITE,
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
if (connections_ == MAP_FAILED) {
throw std::runtime_error("Failed to mmap for connection registry");
}
// Initialize all pointers to null
// This will cause physical pages to be allocated on-demand
memset(connections_, 0, max_fds_ * sizeof(Connection *));
// Store aligned size for munmap
aligned_size_ = aligned_size;
}
ConnectionRegistry::~ConnectionRegistry() {
if (connections_ != MAP_FAILED && connections_ != nullptr) {
munmap(connections_, max_fds_ * sizeof(Connection *));
if (connections_ != nullptr) {
for (size_t fd = 0; fd < max_fds_; ++fd) {
delete connections_[fd];
}
munmap(connections_, aligned_size_);
}
}
void ConnectionRegistry::store(int fd, std::unique_ptr<Connection> connection) {
if (fd < 0 || static_cast<size_t>(fd) >= max_fds_) {
return; // Invalid fd - silently ignore to avoid crashes
abort();
}
// Release ownership from unique_ptr and store raw pointer
connections_[fd] = connection.release();
}
bool ConnectionRegistry::has(int fd) const {
if (fd < 0 || static_cast<size_t>(fd) >= max_fds_) {
return false; // Invalid fd
}
return connections_[fd] != nullptr;
}
std::unique_ptr<Connection> ConnectionRegistry::remove(int fd) {
if (fd < 0 || static_cast<size_t>(fd) >= max_fds_) {
return nullptr; // Invalid fd
abort();
}
Connection *conn = connections_[fd];
@@ -59,25 +58,3 @@ std::unique_ptr<Connection> ConnectionRegistry::remove(int fd) {
// Transfer ownership back to unique_ptr
return std::unique_ptr<Connection>(conn);
}
void ConnectionRegistry::shutdown_cleanup() {
// Iterate through all possible file descriptors and clean up any connections
// Following the critical ordering: remove -> delete (destructor handles
// close)
size_t connections_found = 0;
for (size_t fd = 0; fd < max_fds_; ++fd) {
Connection *conn = connections_[fd];
if (conn != nullptr) {
connections_found++;
// Step 1: Remove from registry (set to null)
connections_[fd] = nullptr;
// Steps 2 & 3: Delete the connection object (destructor handles closing
// fd)
delete conn;
}
}
// Note: In normal shutdown, this should be 0 since all connections
// should have been properly cleaned up during normal operation
}

View File

@@ -15,14 +15,6 @@ class Connection;
* allocate a large virtual address space efficiently, with physical memory
* allocated on-demand as connections are created.
*
* CRITICAL ORDERING REQUIREMENT:
* All connection cleanup MUST follow this exact sequence:
* 1. Remove from registry: auto conn = registry.remove(fd)
* 2. Delete the connection: unique_ptr destructor handles it automatically
*
* This ordering prevents race conditions between cleanup and fd reuse.
* The unique_ptr interface ensures ownership is always clear and prevents
* double-delete bugs.
*/
class ConnectionRegistry {
public:
@@ -48,14 +40,6 @@ public:
*/
void store(int fd, std::unique_ptr<Connection> connection);
/**
* Check if a connection exists in the registry by file descriptor.
*
* @param fd File descriptor
* @return true if connection exists, false otherwise
*/
bool has(int fd) const;
/**
* Remove a connection from the registry and transfer ownership to caller.
* This transfers ownership via unique_ptr move semantics.
@@ -72,15 +56,6 @@ public:
*/
size_t max_fds() const { return max_fds_; }
/**
* Perform graceful shutdown cleanup.
* Iterates through all registry entries and cleans up any remaining
* connections using the critical ordering: remove -> close -> delete.
*
* This method is called during server shutdown to ensure no connections leak.
*/
virtual void shutdown_cleanup();
// Non-copyable and non-movable
ConnectionRegistry(const ConnectionRegistry &) = delete;
ConnectionRegistry &operator=(const ConnectionRegistry &) = delete;
@@ -90,4 +65,5 @@ public:
private:
Connection **connections_; ///< mmap'd array of raw connection pointers
size_t max_fds_; ///< Maximum file descriptor limit
size_t aligned_size_; ///< Page-aligned size for munmap
};

View File

@@ -30,14 +30,32 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler)
: config_(config), handler_(handler), connection_registry_() {}
Server::~Server() {
// CRITICAL: All I/O threads are guaranteed to be joined before the destructor
// is called because they are owned by the run() method's call frame.
// This eliminates any possibility of race conditions during connection
// cleanup.
if (shutdown_pipe_[0] != -1) {
close(shutdown_pipe_[0]);
shutdown_pipe_[0] = -1;
}
if (shutdown_pipe_[1] != -1) {
close(shutdown_pipe_[1]);
shutdown_pipe_[1] = -1;
}
// Clean up any remaining connections using proper ordering
connection_registry_.shutdown_cleanup();
cleanup_resources();
// Close all epoll instances
for (int epollfd : epoll_fds_) {
if (epollfd != -1) {
close(epollfd);
}
}
epoll_fds_.clear();
if (listen_sockfd_ != -1) {
close(listen_sockfd_);
listen_sockfd_ = -1;
}
// Clean up unix socket file if it exists
if (!config_.server.unix_socket_path.empty()) {
unlink(config_.server.unix_socket_path.c_str());
}
}
void Server::run() {
@@ -525,32 +543,3 @@ void Server::process_connection_batch(
}
}
}
void Server::cleanup_resources() {
if (shutdown_pipe_[0] != -1) {
close(shutdown_pipe_[0]);
shutdown_pipe_[0] = -1;
}
if (shutdown_pipe_[1] != -1) {
close(shutdown_pipe_[1]);
shutdown_pipe_[1] = -1;
}
// Close all epoll instances
for (int epollfd : epoll_fds_) {
if (epollfd != -1) {
close(epollfd);
}
}
epoll_fds_.clear();
if (listen_sockfd_ != -1) {
close(listen_sockfd_);
listen_sockfd_ = -1;
}
// Clean up unix socket file if it exists
if (!config_.server.unix_socket_path.empty()) {
unlink(config_.server.unix_socket_path.c_str());
}
}

View File

@@ -126,7 +126,6 @@ private:
int create_listen_socket();
void create_epoll_instances();
void start_io_threads(std::vector<std::thread> &threads);
void cleanup_resources();
// Helper to get epoll fd for a thread using round-robin
int get_epoll_for_thread(int thread_id) const;