Separate Connection, ConnectionHandler, Server

This commit is contained in:
2025-08-19 13:23:18 -04:00
parent addac1b0b7
commit cb322bbb2b
7 changed files with 888 additions and 492 deletions

130
src/connection.cpp Normal file
View File

@@ -0,0 +1,130 @@
#include "connection.hpp"
#include "server.hpp" // Need this for releaseBackToServer implementation
#include <cstdio>
#include <cstdlib>
#include <errno.h>
#include <limits.h>
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
ConnectionHandler *handler, std::weak_ptr<Server> server)
: fd_(fd), id_(id), addr_(addr), arena_(), handler_(handler),
server_(server) {
activeConnections.fetch_add(1, std::memory_order_relaxed);
if (handler_) {
handler_->on_connection_established(*this);
}
}
Connection::~Connection() {
if (handler_) {
handler_->on_connection_closed(*this);
}
activeConnections.fetch_sub(1, std::memory_order_relaxed);
int e = close(fd_);
if (e == -1) {
perror("close");
abort();
}
}
void Connection::appendMessage(std::string_view s) {
char *arena_str = arena_.allocate<char>(s.size());
std::memcpy(arena_str, s.data(), s.size());
messages_.emplace_back(arena_str, s.size());
}
std::string_view Connection::readBytes(size_t /*max_request_size*/,
size_t buffer_size) {
// Use Variable Length Array for optimal stack allocation
char buf[buffer_size];
int r = read(fd_, buf, buffer_size);
if (r == -1) {
if (errno == EINTR || errno == EAGAIN) {
return {}; // Empty string_view indicates no data or would block
}
perror("read");
return {}; // Error - let server handle connection cleanup
}
if (r == 0) {
return {}; // EOF - let server handle connection cleanup
}
// Copy data to arena for persistent storage
char *arena_data = arena_.allocate<char>(r);
std::memcpy(arena_data, buf, r);
return {arena_data, size_t(r)};
}
bool Connection::writeBytes() {
while (!messages_.empty()) {
// Build iovec array up to IOV_MAX limit
struct iovec iov[IOV_MAX];
int iov_count = 0;
for (auto it = messages_.begin();
it != messages_.end() && iov_count < IOV_MAX; ++it) {
const auto &msg = *it;
if (msg.size() > 0) {
iov[iov_count] = {
const_cast<void *>(static_cast<const void *>(msg.data())),
msg.size()};
iov_count++;
}
}
if (iov_count == 0) {
break;
}
ssize_t w;
for (;;) {
w = writev(fd_, iov, iov_count);
if (w == -1) {
if (errno == EINTR) {
continue; // Standard practice: retry on signal interruption
}
if (errno == EAGAIN) {
return false;
}
perror("writev");
return true;
}
break;
}
assert(w > 0);
// Handle partial writes by updating string_view data/size
size_t bytes_written = static_cast<size_t>(w);
while (bytes_written > 0 && !messages_.empty()) {
auto &front = messages_.front();
if (bytes_written >= front.size()) {
// This message is completely written
bytes_written -= front.size();
messages_.pop_front();
} else {
// Partial write of this message - update string_view
front = std::string_view(front.data() + bytes_written,
front.size() - bytes_written);
bytes_written = 0;
}
}
}
assert(messages_.empty());
arena_.reset();
return closeConnection_;
}
void Connection::tsan_acquire() {
#if __has_feature(thread_sanitizer)
tsan_sync_.load(std::memory_order_acquire);
#endif
}
void Connection::tsan_release() {
#if __has_feature(thread_sanitizer)
tsan_sync_.store(0, std::memory_order_release);
#endif
}