Finish std::shared_ptr -> Ref migration
This commit is contained in:
@@ -38,10 +38,14 @@ static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
|
|||||||
|
|
||||||
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
||||||
size_t epoll_index, ConnectionHandler *handler,
|
size_t epoll_index, ConnectionHandler *handler,
|
||||||
Server &server)
|
WeakRef<Server> server)
|
||||||
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
|
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
|
||||||
handler_(handler), server_(server.weak_from_this()) {
|
handler_(handler), server_(std::move(server)) {
|
||||||
server.active_connections_.fetch_add(1, std::memory_order_relaxed);
|
auto server_ref = server_.lock();
|
||||||
|
// This should only be called from a member of Server itself, so I should
|
||||||
|
// hope it's alive.
|
||||||
|
assert(server_ref);
|
||||||
|
server_ref->active_connections_.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
|
||||||
// Increment connection metrics using thread-local instances
|
// Increment connection metrics using thread-local instances
|
||||||
connections_total.inc();
|
connections_total.inc();
|
||||||
@@ -55,6 +59,7 @@ Connection::~Connection() {
|
|||||||
if (handler_) {
|
if (handler_) {
|
||||||
handler_->on_connection_closed(*this);
|
handler_->on_connection_closed(*this);
|
||||||
}
|
}
|
||||||
|
// Server may legitimately be gone now
|
||||||
if (auto server_ptr = server_.lock()) {
|
if (auto server_ptr = server_.lock()) {
|
||||||
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
|
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,13 +3,13 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
#include <memory>
|
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/uio.h>
|
#include <sys/uio.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "arena.hpp"
|
#include "arena.hpp"
|
||||||
#include "connection_handler.hpp"
|
#include "connection_handler.hpp"
|
||||||
|
#include "reference.hpp"
|
||||||
|
|
||||||
#ifndef __has_feature
|
#ifndef __has_feature
|
||||||
#define __has_feature(x) 0
|
#define __has_feature(x) 0
|
||||||
@@ -330,7 +330,8 @@ private:
|
|||||||
* @param server Reference to server associated with this connection
|
* @param server Reference to server associated with this connection
|
||||||
*/
|
*/
|
||||||
Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
||||||
size_t epoll_index, ConnectionHandler *handler, Server &server);
|
size_t epoll_index, ConnectionHandler *handler,
|
||||||
|
WeakRef<Server> server);
|
||||||
|
|
||||||
// Networking interface - only accessible by Server
|
// Networking interface - only accessible by Server
|
||||||
int readBytes(char *buf, size_t buffer_size);
|
int readBytes(char *buf, size_t buffer_size);
|
||||||
@@ -347,7 +348,7 @@ private:
|
|||||||
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
|
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
|
||||||
Arena arena_;
|
Arena arena_;
|
||||||
ConnectionHandler *handler_;
|
ConnectionHandler *handler_;
|
||||||
std::weak_ptr<Server> server_; // Weak reference to server for safe cleanup
|
WeakRef<Server> server_; // Weak reference to server for safe cleanup
|
||||||
|
|
||||||
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
|
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
|
||||||
ArenaStlAllocator<std::string_view>{&arena_}};
|
ArenaStlAllocator<std::string_view>{&arena_}};
|
||||||
|
|||||||
@@ -318,7 +318,7 @@ template <typename T> struct WeakRef {
|
|||||||
* @brief Attempt to promote WeakRef to Ref
|
* @brief Attempt to promote WeakRef to Ref
|
||||||
* @return Valid Ref if object still alive, empty Ref otherwise
|
* @return Valid Ref if object still alive, empty Ref otherwise
|
||||||
*/
|
*/
|
||||||
Ref<T> lock() {
|
Ref<T> lock() const {
|
||||||
if (!control_block) {
|
if (!control_block) {
|
||||||
return Ref<T>();
|
return Ref<T>();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -21,12 +21,12 @@
|
|||||||
// Static thread-local storage for read buffer (used across different functions)
|
// Static thread-local storage for read buffer (used across different functions)
|
||||||
static thread_local std::vector<char> g_read_buffer;
|
static thread_local std::vector<char> g_read_buffer;
|
||||||
|
|
||||||
std::shared_ptr<Server> Server::create(const weaseldb::Config &config,
|
Ref<Server> Server::create(const weaseldb::Config &config,
|
||||||
ConnectionHandler &handler,
|
ConnectionHandler &handler,
|
||||||
const std::vector<int> &listen_fds) {
|
const std::vector<int> &listen_fds) {
|
||||||
// Use std::shared_ptr constructor with private access
|
auto result = make_ref<Server>(config, handler, listen_fds);
|
||||||
// We can't use make_shared here because constructor is private
|
result->self_ = result;
|
||||||
return std::shared_ptr<Server>(new Server(config, handler, listen_fds));
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
||||||
@@ -218,7 +218,7 @@ int Server::create_local_connection() {
|
|||||||
// Create Connection object
|
// Create Connection object
|
||||||
auto connection = std::unique_ptr<Connection>(new Connection(
|
auto connection = std::unique_ptr<Connection>(new Connection(
|
||||||
addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed),
|
addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed),
|
||||||
epoll_index, &handler_, *this));
|
epoll_index, &handler_, self_));
|
||||||
|
|
||||||
// Store in registry
|
// Store in registry
|
||||||
connection_registry_.store(server_fd, std::move(connection));
|
connection_registry_.store(server_fd, std::move(connection));
|
||||||
@@ -422,7 +422,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
|||||||
batch[batch_count] = std::unique_ptr<Connection>(new Connection(
|
batch[batch_count] = std::unique_ptr<Connection>(new Connection(
|
||||||
addr, fd,
|
addr, fd,
|
||||||
connection_id_.fetch_add(1, std::memory_order_relaxed),
|
connection_id_.fetch_add(1, std::memory_order_relaxed),
|
||||||
epoll_index, &handler_, *this));
|
epoll_index, &handler_, self_));
|
||||||
batch_events[batch_count] =
|
batch_events[batch_count] =
|
||||||
EPOLLIN; // New connections always start with read
|
EPOLLIN; // New connections always start with read
|
||||||
batch_count++;
|
batch_count++;
|
||||||
|
|||||||
@@ -29,18 +29,18 @@
|
|||||||
*
|
*
|
||||||
* IMPORTANT: Server uses a factory pattern and MUST be created via
|
* IMPORTANT: Server uses a factory pattern and MUST be created via
|
||||||
* Server::create(). This ensures:
|
* Server::create(). This ensures:
|
||||||
* - Proper shared_ptr semantics for enable_shared_from_this
|
* - Proper Ref<Server> semantics for reference counting
|
||||||
* - Safe weak_ptr references from Connection objects
|
* - Safe WeakRef<Server> references from Connection objects
|
||||||
* - Prevention of accidental stack allocation that would break safety
|
* - Prevention of accidental stack allocation that would break safety
|
||||||
* guarantees
|
* guarantees
|
||||||
*/
|
*/
|
||||||
struct Server : std::enable_shared_from_this<Server> {
|
struct Server {
|
||||||
/**
|
/**
|
||||||
* Factory method to create a Server instance.
|
* Factory method to create a Server instance.
|
||||||
*
|
*
|
||||||
* This is the only way to create a Server - ensures proper shared_ptr
|
* This is the only way to create a Server - ensures proper Ref<Server>
|
||||||
* semantics and prevents accidental stack allocation that would break
|
* semantics and prevents accidental stack allocation that would break
|
||||||
* weak_ptr safety.
|
* WeakRef<Server> safety.
|
||||||
*
|
*
|
||||||
* @param config Server configuration (threads, ports, limits, etc.)
|
* @param config Server configuration (threads, ports, limits, etc.)
|
||||||
* @param handler Protocol handler for processing connection data
|
* @param handler Protocol handler for processing connection data
|
||||||
@@ -48,11 +48,11 @@ struct Server : std::enable_shared_from_this<Server> {
|
|||||||
* Server takes ownership and will close them on
|
* Server takes ownership and will close them on
|
||||||
* destruction. Server will set these to non-blocking mode for safe epoll
|
* destruction. Server will set these to non-blocking mode for safe epoll
|
||||||
* usage. Empty vector means no listening sockets.
|
* usage. Empty vector means no listening sockets.
|
||||||
* @return shared_ptr to the newly created Server
|
* @return Ref to the newly created Server
|
||||||
*/
|
*/
|
||||||
static std::shared_ptr<Server> create(const weaseldb::Config &config,
|
static Ref<Server> create(const weaseldb::Config &config,
|
||||||
ConnectionHandler &handler,
|
ConnectionHandler &handler,
|
||||||
const std::vector<int> &listen_fds);
|
const std::vector<int> &listen_fds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Destructor ensures proper cleanup of all resources.
|
* Destructor ensures proper cleanup of all resources.
|
||||||
@@ -122,6 +122,10 @@ private:
|
|||||||
*/
|
*/
|
||||||
explicit Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
explicit Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
||||||
const std::vector<int> &listen_fds);
|
const std::vector<int> &listen_fds);
|
||||||
|
friend Ref<Server> make_ref<Server>(const weaseldb::Config &config,
|
||||||
|
ConnectionHandler &handler,
|
||||||
|
const std::vector<int> &listen_fds);
|
||||||
|
WeakRef<Server> self_;
|
||||||
|
|
||||||
const weaseldb::Config &config_;
|
const weaseldb::Config &config_;
|
||||||
ConnectionHandler &handler_;
|
ConnectionHandler &handler_;
|
||||||
|
|||||||
Reference in New Issue
Block a user