Files
weaseldb/src/connection.cpp
2025-08-22 13:52:32 -04:00

119 lines
3.0 KiB
C++

#include "connection.hpp"
#include "server.hpp" // Need this for releaseBackToServer implementation
#include <cstdio>
#include <cstdlib>
#include <errno.h>
#include <limits.h>
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
size_t epoll_index, ConnectionHandler *handler,
Server &server)
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
handler_(handler), server_(server.weak_from_this()) {
server.active_connections_.fetch_add(1, std::memory_order_relaxed);
assert(handler_);
handler_->on_connection_established(*this);
}
Connection::~Connection() {
if (handler_) {
handler_->on_connection_closed(*this);
}
if (auto server_ptr = server_.lock()) {
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
}
int e = close(fd_);
if (e == -1) {
perror("close");
abort();
}
}
void Connection::appendMessage(std::string_view s, bool copyToArena) {
if (copyToArena) {
char *arena_str = arena_.allocate<char>(s.size());
std::memcpy(arena_str, s.data(), s.size());
messages_.emplace_back(arena_str, s.size());
} else {
messages_.push_back(s);
}
}
int Connection::readBytes(char *buf, size_t buffer_size) {
int r;
for (;;) {
r = read(fd_, buf, buffer_size);
if (r == -1) {
if (errno == EINTR) {
continue;
}
if (errno == EAGAIN) {
return 0;
}
perror("read");
return -1;
}
if (r == 0) {
return -1;
}
return r;
}
}
bool Connection::writeBytes() {
while (!messages_.empty()) {
// Build iovec array up to IOV_MAX limit
struct iovec iov[IOV_MAX];
int iov_count = 0;
for (auto it = messages_.begin();
it != messages_.end() && iov_count < IOV_MAX; ++it) {
const auto &msg = *it;
iov[iov_count] = {
const_cast<void *>(static_cast<const void *>(msg.data())),
msg.size()};
iov_count++;
}
assert(iov_count > 0);
ssize_t w;
for (;;) {
w = writev(fd_, iov, iov_count);
if (w == -1) {
if (errno == EINTR) {
continue; // Standard practice: retry on signal interruption
}
if (errno == EAGAIN) {
return false;
}
perror("writev");
return true;
}
break;
}
assert(w > 0);
// Handle partial writes by updating string_view data/size
size_t bytes_written = static_cast<size_t>(w);
while (bytes_written > 0 && !messages_.empty()) {
auto &front = messages_.front();
if (bytes_written >= front.size()) {
// This message is completely written
bytes_written -= front.size();
messages_.pop_front();
} else {
// Partial write of this message - update string_view
front = std::string_view(front.data() + bytes_written,
front.size() - bytes_written);
bytes_written = 0;
}
}
}
assert(messages_.empty());
return false;
}