More cleanup
This commit is contained in:
@@ -1,4 +1,5 @@
|
||||
#include "arena.hpp"
|
||||
|
||||
#include <cassert>
|
||||
#include <iomanip>
|
||||
#include <limits>
|
||||
|
||||
@@ -59,10 +59,9 @@
|
||||
*
|
||||
* ### Safe Usage Patterns in WeaselDB:
|
||||
* - **Per-Connection Instances**: Each Connection owns its own Arena
|
||||
* instance, accessed only by the thread that currently owns the connection
|
||||
* - **Single Owner Principle**: Connection ownership transfers atomically
|
||||
* between threads using unique_ptr, ensuring only one thread accesses the arena
|
||||
* at a time
|
||||
* instance, accessed by its io thread
|
||||
* - **Server Ownership**: Server retains connection ownership, handlers access
|
||||
* arenas through Connection& references with proper mutex protection
|
||||
*
|
||||
* ### Thread Ownership Model:
|
||||
* 1. **I/O Thread**: Server owns connections, processes socket I/O events
|
||||
|
||||
@@ -45,8 +45,7 @@ Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
||||
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr),
|
||||
handler_(handler), server_(std::move(server)) {
|
||||
auto server_ref = server_.lock();
|
||||
// This should only be called from a member of Server itself, so I should
|
||||
// hope it's alive.
|
||||
// Should only be called from the io thread
|
||||
assert(server_ref);
|
||||
server_ref->active_connections_.fetch_add(1, std::memory_order_relaxed);
|
||||
|
||||
@@ -96,19 +95,23 @@ void Connection::append_message(std::span<std::string_view> data_parts,
|
||||
bool was_empty = message_queue_.empty();
|
||||
|
||||
// Add message to queue
|
||||
// TODO this allocates while holding the connection lock
|
||||
message_queue_.emplace_back(
|
||||
Message{std::move(arena), data_parts, close_after_send});
|
||||
outgoing_bytes_queued_ += total_bytes;
|
||||
|
||||
// If queue was empty, we need to add EPOLLOUT interest.
|
||||
if (was_empty && fd_ >= 0) {
|
||||
if (was_empty) {
|
||||
auto server = server_.lock();
|
||||
if (server) {
|
||||
if (fd_ >= 0 && server) {
|
||||
// Add EPOLLOUT interest - pipeline thread manages epoll
|
||||
struct epoll_event event;
|
||||
event.data.fd = fd_;
|
||||
event.events = EPOLLIN | EPOLLOUT;
|
||||
tsan_release();
|
||||
// I think we have to call epoll_ctl while holding mutex_. Otherwise a
|
||||
// call that clears the write interest could get reordered with one that
|
||||
// sets it and we would hang.
|
||||
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
||||
}
|
||||
}
|
||||
@@ -133,9 +136,8 @@ int Connection::readBytes(char *buf, size_t buffer_size) {
|
||||
}
|
||||
|
||||
// Increment bytes read metric
|
||||
if (r > 0) {
|
||||
bytes_read.inc(r);
|
||||
}
|
||||
assert(r > 0);
|
||||
bytes_read.inc(r);
|
||||
|
||||
return r;
|
||||
}
|
||||
@@ -196,9 +198,7 @@ uint32_t Connection::write_bytes() {
|
||||
if (errno == EAGAIN) {
|
||||
// Increment EAGAIN failure metric
|
||||
write_eagain_failures.inc();
|
||||
if (total_bytes_written > 0) {
|
||||
bytes_written.inc(total_bytes_written);
|
||||
}
|
||||
bytes_written.inc(total_bytes_written);
|
||||
return result;
|
||||
}
|
||||
perror("sendmsg");
|
||||
@@ -221,7 +221,6 @@ uint32_t Connection::write_bytes() {
|
||||
|
||||
while (bytes_remaining > 0 && !message_queue_.empty()) {
|
||||
auto &front_message = message_queue_.front();
|
||||
bool message_complete = true;
|
||||
|
||||
for (auto &part : front_message.data_parts) {
|
||||
if (part.empty())
|
||||
@@ -236,22 +235,17 @@ uint32_t Connection::write_bytes() {
|
||||
part = std::string_view(part.data() + bytes_remaining,
|
||||
part.size() - bytes_remaining);
|
||||
bytes_remaining = 0;
|
||||
message_complete = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (message_complete) {
|
||||
if (front_message.close_after_send) {
|
||||
result |= Close;
|
||||
}
|
||||
// Move arena to thread-local vector for deferred cleanup
|
||||
g_arenas_to_free.emplace_back(std::move(front_message.arena));
|
||||
message_queue_.pop_front();
|
||||
if (result & Close) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (front_message.close_after_send) {
|
||||
result |= Close;
|
||||
}
|
||||
// Move arena to thread-local vector for deferred cleanup
|
||||
g_arenas_to_free.emplace_back(std::move(front_message.arena));
|
||||
message_queue_.pop_front();
|
||||
if (result & Close) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -269,18 +263,19 @@ uint32_t Connection::write_bytes() {
|
||||
event.data.fd = fd_;
|
||||
event.events = EPOLLIN; // Remove EPOLLOUT
|
||||
tsan_release();
|
||||
// I think we have to call epoll_ctl while holding mutex_. Otherwise a
|
||||
// call that clears the write interest could get reordered with one that
|
||||
// sets it and we would hang.
|
||||
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Increment bytes written metric
|
||||
if (total_bytes_written > 0) {
|
||||
bytes_written.inc(total_bytes_written);
|
||||
}
|
||||
bytes_written.inc(total_bytes_written);
|
||||
|
||||
// Clean up arenas after all mutex operations are complete
|
||||
// This avoids holding the connection mutex while free() potentially contends
|
||||
// This avoids holding the connection mutex while calling free()
|
||||
g_arenas_to_free.clear();
|
||||
|
||||
return result;
|
||||
|
||||
@@ -34,7 +34,9 @@ struct MessageSender {
|
||||
*
|
||||
* Thread-safe method that can be called from any thread, including
|
||||
* pipeline processing threads. The arena is moved into the connection
|
||||
* to maintain data lifetime until the message is sent.
|
||||
* to maintain data lifetime until the message is sent. Messages appended
|
||||
* concurrently may be written in either order, but they will not be
|
||||
* interleaved.
|
||||
*
|
||||
* @param data_parts Span of string_view parts to send (arena-allocated)
|
||||
* @param arena Arena containing the memory for data_parts string_views
|
||||
@@ -55,20 +57,21 @@ struct MessageSender {
|
||||
};
|
||||
|
||||
/**
|
||||
* Represents a single client connection with thread-safe concurrent access.
|
||||
* Represents a single client connection - the full interface available to the
|
||||
* io thread and connection handler.
|
||||
*
|
||||
* Connection ownership model:
|
||||
* - Server owns all connections
|
||||
* - Handlers receive Connection& references, and can keep a WeakRef to
|
||||
* Connection for async responses.
|
||||
* - Multiple pipeline threads can safely access connection concurrently
|
||||
* MessageSender for async responses.
|
||||
* - Multiple pipeline threads can safely access the MessageSender concurrently
|
||||
* - I/O thread has exclusive access to socket operations
|
||||
*
|
||||
* Threading model:
|
||||
* - Single mutex protects all connection state
|
||||
* - Single mutex protects state shared with pipeline threads
|
||||
* - Pipeline threads call Connection methods (append_message, etc.)
|
||||
* - I/O thread processes socket events and message queue
|
||||
* - Pipeline threads manage epoll interests via Connection methods
|
||||
* - Pipeline threads register epoll write interest via append_message
|
||||
* - Connection tracks closed state to prevent EBADF errors
|
||||
*
|
||||
* Arena allocator usage:
|
||||
@@ -258,7 +261,7 @@ struct Connection : MessageSender {
|
||||
*
|
||||
* Example usage:
|
||||
* ```cpp
|
||||
* class HttpHandler : public ConnectionHandler {
|
||||
* class HttpHandler : ConnectionHandler {
|
||||
* void on_connection_established(Connection& conn) override {
|
||||
* // Allocate HTTP state in connection's arena or heap
|
||||
* auto* state = conn.get_arena().construct<HttpConnectionState>();
|
||||
@@ -272,8 +275,8 @@ struct Connection : MessageSender {
|
||||
* }
|
||||
*
|
||||
* void on_data_arrived(std::string_view data,
|
||||
* Ref<Connection>& conn_ptr) override {
|
||||
* auto* state = static_cast<HttpConnectionState*>(conn_ptr->user_data);
|
||||
* Connection& conn) override {
|
||||
* auto* state = static_cast<HttpConnectionState*>(conn.user_data);
|
||||
* // Use state for protocol processing...
|
||||
* }
|
||||
* };
|
||||
@@ -323,28 +326,25 @@ private:
|
||||
};
|
||||
uint32_t write_bytes();
|
||||
|
||||
// Direct access methods for Server (must hold mutex)
|
||||
int getFd() const { return fd_; }
|
||||
bool has_messages() const { return !message_queue_.empty(); }
|
||||
size_t getEpollIndex() const { return epoll_index_; }
|
||||
void close();
|
||||
|
||||
// Immutable connection properties
|
||||
int fd_;
|
||||
const int64_t id_;
|
||||
const size_t epoll_index_; // Index of the epoll instance this connection uses
|
||||
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
|
||||
ConnectionHandler *const handler_;
|
||||
WeakRef<Server> server_; // Weak reference to server for safe cleanup
|
||||
WeakRef<Server> server_; // Weak reference to server for safe epoll_ctl calls
|
||||
WeakRef<Connection> self_ref_; // WeakRef to self for get_weak_ref()
|
||||
|
||||
// Thread-safe state (protected by mutex_)
|
||||
// state shared with pipeline threads (protected by mutex_)
|
||||
mutable std::mutex mutex_; // Protects all mutable state
|
||||
std::deque<Message>
|
||||
message_queue_; // Queue of messages to send. Protectec by
|
||||
message_queue_; // Queue of messages to send. Protected by
|
||||
// mutex_, but if non-empty mutex_ can be
|
||||
// dropped while server accesses existing elements.
|
||||
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
|
||||
// Set to a negative number in `close`
|
||||
int fd_;
|
||||
|
||||
#if __has_feature(thread_sanitizer)
|
||||
void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); }
|
||||
|
||||
@@ -51,6 +51,7 @@ public:
|
||||
* @param conn Connection that made write progress - server retains ownership
|
||||
* @note Called from this connection's io thread.
|
||||
* @note Called during writes, not necessarily when buffer becomes empty
|
||||
* TODO Add bytes written argument?
|
||||
*/
|
||||
virtual void on_write_progress(Connection &) {}
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ ConnectionRegistry::ConnectionRegistry() : connections_(nullptr), max_fds_(0) {
|
||||
}
|
||||
max_fds_ = rlim.rlim_cur;
|
||||
|
||||
// TODO re-enable "ondemand pages" behavior
|
||||
// // Calculate size rounded up to page boundary
|
||||
// size_t array_size = max_fds_ * sizeof(Connection *);
|
||||
// size_t page_size = getpagesize();
|
||||
|
||||
@@ -34,10 +34,10 @@ public:
|
||||
|
||||
/**
|
||||
* Store a connection in the registry, indexed by its file descriptor.
|
||||
* Takes ownership of the connection via unique_ptr.
|
||||
* Takes a reference to the connection for storage.
|
||||
*
|
||||
* @param fd File descriptor (must be valid and < max_fds_)
|
||||
* @param connection unique_ptr to the connection (ownership transferred)
|
||||
* @param connection Ref<Connection> to store in the registry
|
||||
*/
|
||||
void store(int fd, Ref<Connection> connection);
|
||||
|
||||
|
||||
@@ -69,14 +69,17 @@ void HttpHandler::on_connection_closed(Connection &conn) {
|
||||
conn.user_data = nullptr;
|
||||
}
|
||||
|
||||
// TODO there might be an issue if we get pipelined requests here
|
||||
|
||||
void HttpHandler::on_write_buffer_drained(Connection &conn) {
|
||||
// Reset state after all messages have been written for the next request
|
||||
// Reset state after entire reply messages have been written for the next
|
||||
// request
|
||||
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
|
||||
if (state) {
|
||||
TRACE_EVENT("http", "reply",
|
||||
perfetto::Flow::Global(state->http_request_id));
|
||||
}
|
||||
// TODO we don't need this anymore. Look at removing it.
|
||||
// TODO consider replacing with HttpConnectionState->reset()
|
||||
on_connection_closed(conn);
|
||||
// Note: Connection reset happens at server level, not connection level
|
||||
on_connection_established(conn);
|
||||
@@ -109,7 +112,8 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
|
||||
}
|
||||
}
|
||||
|
||||
// Send requests to 4-stage pipeline in batch
|
||||
// Send requests to 4-stage pipeline in batch. Batching here reduces
|
||||
// contention on the way into the pipeline.
|
||||
if (pipeline_count > 0) {
|
||||
auto guard = commitPipeline.push(pipeline_count, true);
|
||||
auto out_iter = guard.batch.begin();
|
||||
@@ -396,7 +400,7 @@ void HttpHandler::handle_get_metrics(Connection &conn,
|
||||
conn.append_message(result, std::move(state.arena));
|
||||
}
|
||||
|
||||
void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &state) {
|
||||
void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) {
|
||||
ok_counter.inc();
|
||||
TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id));
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ struct HttpHandler : ConnectionHandler {
|
||||
void on_connection_closed(Connection &conn) override;
|
||||
void on_data_arrived(std::string_view data, Connection &conn) override;
|
||||
void on_batch_complete(std::span<Connection *const> batch) override;
|
||||
void on_write_buffer_drained(Connection &conn_ptr) override;
|
||||
void on_write_buffer_drained(Connection &conn) override;
|
||||
|
||||
// llhttp callbacks (public for HttpConnectionState access)
|
||||
static int onUrl(llhttp_t *parser, const char *at, size_t length);
|
||||
|
||||
@@ -45,7 +45,6 @@
|
||||
|
||||
#include <functional>
|
||||
#include <initializer_list>
|
||||
#include <memory>
|
||||
#include <span>
|
||||
#include <type_traits>
|
||||
#include <vector>
|
||||
|
||||
@@ -8,7 +8,7 @@
|
||||
* Gathers metrics like CPU usage, memory, and file descriptors by reading
|
||||
* files from the /proc filesystem.
|
||||
*/
|
||||
struct ProcessCollector : public metric::Collector {
|
||||
struct ProcessCollector : metric::Collector {
|
||||
/**
|
||||
* @brief Constructs the collector and initializes the process metrics.
|
||||
*/
|
||||
|
||||
@@ -324,7 +324,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
||||
// Process existing connections in batch
|
||||
if (batch_count > 0) {
|
||||
process_connection_batch(
|
||||
epollfd, std::span(batch).subspan(0, batch_count),
|
||||
std::span(batch).subspan(0, batch_count),
|
||||
std::span(batch_events).subspan(0, batch_count));
|
||||
}
|
||||
|
||||
@@ -388,7 +388,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
||||
// Process batch if full
|
||||
if (batch_count == config_.server.event_batch_size) {
|
||||
process_connection_batch(
|
||||
epollfd, {batch.data(), (size_t)batch_count},
|
||||
{batch.data(), (size_t)batch_count},
|
||||
{batch_events.data(), (size_t)batch_count});
|
||||
batch_count = 0;
|
||||
}
|
||||
@@ -398,7 +398,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
||||
// Process remaining accepted connections
|
||||
if (batch_count > 0) {
|
||||
process_connection_batch(
|
||||
epollfd, std::span(batch).subspan(0, batch_count),
|
||||
std::span(batch).subspan(0, batch_count),
|
||||
std::span(batch_events).subspan(0, batch_count));
|
||||
batch_count = 0;
|
||||
}
|
||||
@@ -463,10 +463,9 @@ void Server::close_connection(Ref<Connection> &conn) {
|
||||
conn.reset();
|
||||
}
|
||||
|
||||
static thread_local std::vector<Connection *> conn_ptrs;
|
||||
static thread_local std::vector<Connection *> batch_connections;
|
||||
|
||||
void Server::process_connection_batch(int epollfd,
|
||||
std::span<Ref<Connection>> batch,
|
||||
void Server::process_connection_batch(std::span<Ref<Connection>> batch,
|
||||
std::span<const int> events) {
|
||||
|
||||
// First process writes for each connection
|
||||
@@ -484,19 +483,19 @@ void Server::process_connection_batch(int epollfd,
|
||||
}
|
||||
|
||||
// Call batch complete handler with connection pointers
|
||||
conn_ptrs.clear();
|
||||
for (auto &conn_ref : batch) {
|
||||
if (conn_ref) {
|
||||
conn_ptrs.push_back(conn_ref.get());
|
||||
batch_connections.clear();
|
||||
for (auto &conn : batch) {
|
||||
if (conn) {
|
||||
batch_connections.push_back(conn.get());
|
||||
}
|
||||
}
|
||||
handler_.on_batch_complete(conn_ptrs);
|
||||
handler_.on_batch_complete(batch_connections);
|
||||
|
||||
// Transfer all remaining connections back to registry
|
||||
for (auto &conn_ptr : batch) {
|
||||
if (conn_ptr) {
|
||||
int fd = conn_ptr->getFd();
|
||||
connection_registry_.store(fd, std::move(conn_ptr));
|
||||
// Return all connections to registry
|
||||
for (auto &conn : batch) {
|
||||
if (conn) {
|
||||
const int fd = conn->fd_;
|
||||
connection_registry_.store(fd, std::move(conn));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
#pragma once
|
||||
|
||||
#include <atomic>
|
||||
#include <memory>
|
||||
#include <span>
|
||||
#include <thread>
|
||||
#include <vector>
|
||||
@@ -101,20 +100,21 @@ private:
|
||||
* Private constructor - use create() factory method instead.
|
||||
*
|
||||
* @param config Server configuration (threads, ports, limits, etc.)
|
||||
* @param handler Protocol handler for processing connection data
|
||||
* @param handler Protocol handler for processing connection data. Must
|
||||
* outlive the server.
|
||||
* @param listen_fds Vector of file descriptors to accept connections on.
|
||||
* Server takes ownership and will close them on
|
||||
* Server takes ownership and will close them on
|
||||
* destruction. Server will set these to non-blocking mode for safe epoll
|
||||
* usage.
|
||||
*/
|
||||
explicit Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
||||
const std::vector<int> &listen_fds);
|
||||
friend Ref<Server> make_ref<Server>(const weaseldb::Config &config,
|
||||
ConnectionHandler &handler,
|
||||
const std::vector<int> &listen_fds);
|
||||
template <typename T, typename... Args>
|
||||
friend Ref<T> make_ref(Args &&...args);
|
||||
|
||||
WeakRef<Server> self_;
|
||||
|
||||
const weaseldb::Config &config_;
|
||||
weaseldb::Config config_;
|
||||
ConnectionHandler &handler_;
|
||||
|
||||
// Connection registry
|
||||
@@ -145,25 +145,15 @@ private:
|
||||
int get_epoll_for_thread(int thread_id) const;
|
||||
|
||||
// Helper for processing connection I/O
|
||||
void process_connection_reads(Ref<Connection> &conn_ptr, int events);
|
||||
void process_connection_writes(Ref<Connection> &conn_ptr, int events);
|
||||
void process_connection_reads(Ref<Connection> &conn, int events);
|
||||
void process_connection_writes(Ref<Connection> &conn, int events);
|
||||
|
||||
void close_connection(Ref<Connection> &conn);
|
||||
|
||||
// Helper for processing a batch of connections with their events
|
||||
void process_connection_batch(int epollfd, std::span<Ref<Connection>> batch,
|
||||
void process_connection_batch(std::span<Ref<Connection>> batch,
|
||||
std::span<const int> events);
|
||||
|
||||
/**
|
||||
* Called internally to return ownership to the server.
|
||||
*
|
||||
* This method is thread-safe and can be called from any thread.
|
||||
* The connection will be re-added to the epoll for continued processing.
|
||||
*
|
||||
* @param connection Unique pointer to the connection being released back
|
||||
*/
|
||||
void receiveConnectionBack(Ref<Connection> connection);
|
||||
|
||||
// Make non-copyable and non-movable
|
||||
Server(const Server &) = delete;
|
||||
Server &operator=(const Server &) = delete;
|
||||
|
||||
Reference in New Issue
Block a user