Add test for releasing connections back to the server
This commit is contained in:
@@ -315,8 +315,8 @@ public:
|
||||
// none available) Returns: StageGuard with batch of items to process
|
||||
[[nodiscard]] StageGuard acquire(int stage, int thread, int maxBatch = 0,
|
||||
bool mayBlock = true) {
|
||||
assert(stage < threadState.size());
|
||||
assert(thread < threadState[stage].size());
|
||||
assert(stage < int(threadState.size()));
|
||||
assert(thread < int(threadState[stage].size()));
|
||||
auto batch = acquireHelper(stage, thread, maxBatch, mayBlock);
|
||||
return StageGuard{std::move(batch), &threadState[stage][thread]};
|
||||
}
|
||||
|
||||
@@ -131,15 +131,3 @@ bool Connection::writeBytes() {
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void Connection::tsan_acquire() {
|
||||
#if __has_feature(thread_sanitizer)
|
||||
tsan_sync_.load(std::memory_order_acquire);
|
||||
#endif
|
||||
}
|
||||
|
||||
void Connection::tsan_release() {
|
||||
#if __has_feature(thread_sanitizer)
|
||||
tsan_sync_.store(0, std::memory_order_release);
|
||||
#endif
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@
|
||||
|
||||
#include "arena_allocator.hpp"
|
||||
#include "connection_handler.hpp"
|
||||
#include <atomic>
|
||||
#include <cassert>
|
||||
#include <cstring>
|
||||
#include <deque>
|
||||
@@ -352,8 +351,6 @@ private:
|
||||
// Networking interface - only accessible by Server
|
||||
int readBytes(char *buf, size_t buffer_size);
|
||||
bool writeBytes();
|
||||
void tsan_acquire();
|
||||
void tsan_release();
|
||||
|
||||
// Direct access methods for Server
|
||||
int getFd() const { return fd_; }
|
||||
@@ -374,9 +371,4 @@ private:
|
||||
// Whether or not to close the connection after completing writing the
|
||||
// response
|
||||
bool closeConnection_{false};
|
||||
|
||||
// TSAN support for epoll synchronization
|
||||
#if __has_feature(thread_sanitizer)
|
||||
std::atomic<int> tsan_sync_;
|
||||
#endif
|
||||
};
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
#include "connection_registry.hpp"
|
||||
#include "connection.hpp"
|
||||
#include <atomic>
|
||||
#include <cstring>
|
||||
#include <stdexcept>
|
||||
#include <unistd.h>
|
||||
@@ -19,7 +20,7 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) {
|
||||
|
||||
// Allocate virtual address space using mmap
|
||||
// MAP_ANONYMOUS provides zero-initialized pages on-demand (lazy allocation)
|
||||
connections_ = static_cast<Connection **>(
|
||||
connections_ = static_cast<std::atomic<Connection *> *>(
|
||||
mmap(nullptr, aligned_size, PROT_READ | PROT_WRITE,
|
||||
MAP_PRIVATE | MAP_ANONYMOUS, -1, 0));
|
||||
|
||||
@@ -34,7 +35,7 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) {
|
||||
ConnectionRegistry::~ConnectionRegistry() {
|
||||
if (connections_ != nullptr) {
|
||||
for (size_t fd = 0; fd < max_fds_; ++fd) {
|
||||
delete connections_[fd];
|
||||
delete connections_[fd].load(std::memory_order_relaxed);
|
||||
}
|
||||
if (munmap(connections_, aligned_size_) == -1) {
|
||||
perror("munmap");
|
||||
@@ -47,16 +48,13 @@ void ConnectionRegistry::store(int fd, std::unique_ptr<Connection> connection) {
|
||||
abort();
|
||||
}
|
||||
// Release ownership from unique_ptr and store raw pointer
|
||||
connections_[fd] = connection.release();
|
||||
connections_[fd].store(connection.release(), std::memory_order_release);
|
||||
}
|
||||
|
||||
std::unique_ptr<Connection> ConnectionRegistry::remove(int fd) {
|
||||
if (fd < 0 || static_cast<size_t>(fd) >= max_fds_) {
|
||||
abort();
|
||||
}
|
||||
|
||||
Connection *conn = connections_[fd];
|
||||
connections_[fd] = nullptr;
|
||||
// Transfer ownership back to unique_ptr
|
||||
return std::unique_ptr<Connection>(conn);
|
||||
return std::unique_ptr<Connection>(
|
||||
connections_[fd].exchange(nullptr, std::memory_order_acquire));
|
||||
}
|
||||
|
||||
@@ -63,7 +63,10 @@ public:
|
||||
ConnectionRegistry &operator=(ConnectionRegistry &&) = delete;
|
||||
|
||||
private:
|
||||
Connection **connections_; ///< mmap'd array of raw connection pointers
|
||||
size_t max_fds_; ///< Maximum file descriptor limit
|
||||
size_t aligned_size_; ///< Page-aligned size for munmap
|
||||
std::atomic<Connection *>
|
||||
*connections_; ///< mmap'd array of raw connection pointers. It's
|
||||
///< thread-safe without since epoll_ctl happens before
|
||||
///< epoll_wait, but this makes tsan happy /shrug.
|
||||
size_t max_fds_; ///< Maximum file descriptor limit
|
||||
size_t aligned_size_; ///< Page-aligned size for munmap
|
||||
};
|
||||
|
||||
@@ -42,6 +42,13 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
||||
throw std::runtime_error("Failed to set provided listen fd non-blocking");
|
||||
}
|
||||
}
|
||||
|
||||
// Setup shutdown pipe for graceful shutdown
|
||||
setup_shutdown_pipe();
|
||||
|
||||
// Create epoll instances immediately for createLocalConnection() support
|
||||
create_epoll_instances();
|
||||
|
||||
// If empty vector provided, listen_fds_ will be empty (no listening)
|
||||
// Server works purely with createLocalConnection()
|
||||
}
|
||||
@@ -78,8 +85,7 @@ Server::~Server() {
|
||||
}
|
||||
|
||||
void Server::run() {
|
||||
setup_shutdown_pipe();
|
||||
create_epoll_instances();
|
||||
// Shutdown pipe and epoll instances are now created in constructor
|
||||
|
||||
// Create I/O threads locally in this call frame
|
||||
// CRITICAL: By owning threads in run()'s call frame, we guarantee they are
|
||||
@@ -140,7 +146,6 @@ void Server::receiveConnectionBack(std::unique_ptr<Connection> connection) {
|
||||
event.events = EPOLLOUT | EPOLLONESHOT;
|
||||
}
|
||||
|
||||
connection->tsan_release();
|
||||
int fd = connection->getFd();
|
||||
event.data.fd = fd;
|
||||
|
||||
@@ -159,7 +164,7 @@ void Server::receiveConnectionBack(std::unique_ptr<Connection> connection) {
|
||||
|
||||
int Server::createLocalConnection() {
|
||||
int sockets[2];
|
||||
if (socketpair(AF_UNIX, SOCK_STREAM | SOCK_NONBLOCK, 0, sockets) != 0) {
|
||||
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
|
||||
perror("socketpair");
|
||||
return -1;
|
||||
}
|
||||
@@ -167,6 +172,18 @@ int Server::createLocalConnection() {
|
||||
int server_fd = sockets[0]; // Server keeps this end
|
||||
int client_fd = sockets[1]; // Return this end to caller
|
||||
|
||||
int flags = fcntl(server_fd, F_GETFL, 0);
|
||||
if (flags == -1) {
|
||||
perror("fcntl F_GETFL on provided listen fd");
|
||||
throw std::runtime_error(
|
||||
"Failed to get flags for server side of local connection");
|
||||
}
|
||||
if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
|
||||
perror("fcntl F_SETFL O_NONBLOCK on provided listen fd");
|
||||
throw std::runtime_error(
|
||||
"Failed to set server side of local connection to non-blocking");
|
||||
}
|
||||
|
||||
// Create sockaddr_storage for the connection
|
||||
struct sockaddr_storage addr{};
|
||||
addr.ss_family = AF_UNIX;
|
||||
@@ -304,9 +321,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
||||
// Handle existing connection events
|
||||
int fd = events[i].data.fd;
|
||||
std::unique_ptr<Connection> conn = connection_registry_.remove(fd);
|
||||
if (conn) {
|
||||
conn->tsan_acquire();
|
||||
}
|
||||
assert(conn);
|
||||
|
||||
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
|
||||
// unique_ptr will automatically delete on scope exit
|
||||
@@ -468,7 +483,6 @@ void Server::process_connection_batch(
|
||||
event.events = EPOLLOUT | EPOLLONESHOT;
|
||||
}
|
||||
|
||||
conn_ptr->tsan_release();
|
||||
event.data.fd = fd; // Use file descriptor for epoll
|
||||
// Put connection back in registry since handler didn't take ownership.
|
||||
// Must happen before epoll_ctl
|
||||
|
||||
Reference in New Issue
Block a user