Remove release_back_to_server
This commit is contained in:
@@ -167,8 +167,9 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
|
|||||||
if (!conn) {
|
if (!conn) {
|
||||||
return true; // Shutdown signal
|
return true; // Shutdown signal
|
||||||
}
|
}
|
||||||
// Return connection to server for further processing or cleanup
|
// Connection is server-owned - respond to client and connection
|
||||||
Server::release_back_to_server(std::move(conn));
|
// remains managed by server's connection registry
|
||||||
|
// TODO: Implement response sending with new server-owned connection model
|
||||||
}
|
}
|
||||||
return false; // Continue processing
|
return false; // Continue processing
|
||||||
}
|
}
|
||||||
@@ -180,12 +181,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
|
|||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
// 4-stage pipeline: sequence -> resolve -> persist -> release
|
||||||
// TODO: Update pipeline type from std::unique_ptr<Connection> to PipelineEntry variant
|
// Pipeline with PipelineEntry variant instead of connection ownership transfer
|
||||||
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
|
StaticThreadPipeline<PipelineEntry, // Was: std::unique_ptr<Connection>
|
||||||
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||||
commitPipeline{lg_size};
|
commitPipeline{lg_size};
|
||||||
|
|
||||||
// Pipeline entry type (to be implemented)
|
// Pipeline entry type for server-owned connection model
|
||||||
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
using PipelineEntry = std::variant<CommitEntry, StatusEntry, ShutdownEntry>;
|
||||||
```
|
```
|
||||||
|
|
||||||
@@ -228,7 +229,7 @@ for (auto &conn : guard.batch) {
|
|||||||
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
|
Commit requests enter the pipeline via `HttpHandler::on_batch_complete()`:
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
void HttpHandler::on_batch_complete(std::span<std::unique_ptr<Connection>> batch) {
|
void HttpHandler::on_batch_complete(std::span<Connection*> batch) {
|
||||||
// Collect commit requests that passed basic validation for 4-stage pipeline processing
|
// Collect commit requests that passed basic validation for 4-stage pipeline processing
|
||||||
int commit_count = 0;
|
int commit_count = 0;
|
||||||
for (auto &conn : batch) {
|
for (auto &conn : batch) {
|
||||||
@@ -305,7 +306,7 @@ std::visit([&](auto&& entry) {
|
|||||||
- Failed CommitEntries are passed through the pipeline with error information
|
- Failed CommitEntries are passed through the pipeline with error information
|
||||||
- Downstream stages skip processing for error connections but forward them
|
- Downstream stages skip processing for error connections but forward them
|
||||||
- Error responses are sent when connection reaches release stage
|
- Error responses are sent when connection reaches release stage
|
||||||
- Connection ownership is always transferred to ensure cleanup
|
- Server-owned connections ensure proper cleanup and response handling
|
||||||
|
|
||||||
### Pipeline Integrity
|
### Pipeline Integrity
|
||||||
|
|
||||||
@@ -327,7 +328,7 @@ std::visit([&](auto&& entry) {
|
|||||||
|
|
||||||
- **Single-Pass Processing**: Each connection flows through all stages once
|
- **Single-Pass Processing**: Each connection flows through all stages once
|
||||||
- **Streaming Design**: Stages process concurrently
|
- **Streaming Design**: Stages process concurrently
|
||||||
- **Minimal Copying**: Connection ownership transfer, not data copying
|
- **Minimal Copying**: Request processing with server-owned connections
|
||||||
- **Direct Response**: Release stage triggers immediate response transmission
|
- **Direct Response**: Release stage triggers immediate response transmission
|
||||||
|
|
||||||
### Scalability Characteristics
|
### Scalability Characteristics
|
||||||
@@ -345,7 +346,7 @@ private:
|
|||||||
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
|
static constexpr int lg_size = 16; // Ring buffer size = 2^16 entries
|
||||||
|
|
||||||
// 4-stage pipeline configuration
|
// 4-stage pipeline configuration
|
||||||
StaticThreadPipeline<std::unique_ptr<Connection>,
|
StaticThreadPipeline<PipelineEntry,
|
||||||
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1, 1>
|
||||||
commitPipeline{lg_size};
|
commitPipeline{lg_size};
|
||||||
```
|
```
|
||||||
@@ -362,8 +363,8 @@ The pipeline processes different types of entries using a variant/union type sys
|
|||||||
|
|
||||||
### Pipeline Entry Variants
|
### Pipeline Entry Variants
|
||||||
|
|
||||||
- **CommitEntry**: Contains `std::unique_ptr<Connection>` with CommitRequest and connection state
|
- **CommitEntry**: Contains connection reference/ID with CommitRequest and connection state
|
||||||
- **StatusEntry**: Contains `std::unique_ptr<Connection>` with StatusRequest (transferred to status threadpool after sequence)
|
- **StatusEntry**: Contains connection reference/ID with StatusRequest (transferred to status threadpool after sequence)
|
||||||
- **ShutdownEntry**: Signals pipeline shutdown to all stages
|
- **ShutdownEntry**: Signals pipeline shutdown to all stages
|
||||||
- **Future types**: Pipeline design supports additional entry types
|
- **Future types**: Pipeline design supports additional entry types
|
||||||
|
|
||||||
@@ -472,7 +473,7 @@ void HttpHandler::handleGetSubscribe(Connection &conn, const HttpConnectionState
|
|||||||
The pipeline integrates with the HTTP handler at two points:
|
The pipeline integrates with the HTTP handler at two points:
|
||||||
|
|
||||||
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
|
1. **Entry**: `on_batch_complete()` feeds connections into sequence stage
|
||||||
1. **Exit**: Release stage calls `Server::release_back_to_server()`
|
1. **Exit**: Release stage responds to clients with server-owned connections
|
||||||
|
|
||||||
### Persistence Layer Integration
|
### Persistence Layer Integration
|
||||||
|
|
||||||
|
|||||||
20
design.md
20
design.md
@@ -243,19 +243,19 @@ CommitRequest {
|
|||||||
|
|
||||||
#### Connection Ownership Lifecycle
|
#### Connection Ownership Lifecycle
|
||||||
|
|
||||||
1. **Creation**: Accept threads create connections, transfer to epoll as raw pointers
|
1. **Creation**: Server creates connections and stores them in registry
|
||||||
1. **Processing**: Network threads claim ownership by wrapping in unique_ptr
|
1. **Processing**: I/O threads access connections via registry lookup
|
||||||
1. **Handler Transfer**: Handlers can take ownership for async processing via unique_ptr.release()
|
1. **Handler Access**: Handlers receive Connection& references, server retains ownership
|
||||||
1. **Return Path**: Handlers use Server::release_back_to_server() to return connections
|
1. **Async Processing**: Handlers use WeakRef<Connection> for safe async access
|
||||||
1. **Safety**: All transfers use weak_ptr to server for safe cleanup
|
1. **Safety**: Connection mutex synchronizes concurrent access between threads
|
||||||
1. **Cleanup**: RAII ensures proper resource cleanup in all scenarios
|
1. **Cleanup**: RAII ensures proper resource cleanup when connections are destroyed
|
||||||
|
|
||||||
#### Arena Memory Lifecycle
|
#### Arena Memory Lifecycle
|
||||||
|
|
||||||
1. **Request Processing**: Handler uses `conn->get_arena()` to allocate memory for parsing request data
|
1. **Request Processing**: Handler creates request-scoped arena for parsing request data
|
||||||
1. **Response Generation**: Handler uses arena for temporary response construction (headers, JSON, etc.)
|
1. **Response Generation**: Handler uses same arena for response construction (headers, JSON, etc.)
|
||||||
1. **Response Queuing**: Handler calls `conn->append_message()` which copies data to arena-backed message queue
|
1. **Response Queuing**: Handler calls `conn->append_message()` passing span + arena ownership
|
||||||
1. **Response Writing**: Server writes all queued messages to socket via `writeBytes()`
|
1. **Response Writing**: I/O thread writes messages to socket, arena freed after completion
|
||||||
|
|
||||||
> **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written.
|
> **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written.
|
||||||
|
|
||||||
|
|||||||
@@ -65,21 +65,19 @@
|
|||||||
* at a time
|
* at a time
|
||||||
*
|
*
|
||||||
* ### Thread Ownership Model:
|
* ### Thread Ownership Model:
|
||||||
* 1. **Network Thread**: Claims connection ownership, accesses arena for I/O
|
* 1. **I/O Thread**: Server owns connections, processes socket I/O events
|
||||||
* buffers
|
* 2. **Handler Thread**: Receives Connection& reference, creates request-scoped
|
||||||
* 2. **Handler Thread**: Can take ownership via unique_ptr.release(), uses
|
* arenas for parsing and response generation
|
||||||
* arena for request parsing and response generation
|
* 3. **Pipeline Thread**: Can use WeakRef<Connection> for async processing,
|
||||||
* 3. **Background Thread**: Can receive ownership for async processing, uses
|
* creates own arenas for temporary data structures
|
||||||
* arena for temporary data structures
|
* 4. **Arena Lifecycle**: Request-scoped arenas moved to message queue, freed
|
||||||
* 4. **Return Path**: Connection (and its arena) safely returned via
|
* after I/O completion without holding connection mutex
|
||||||
* Server::release_back_to_server()
|
|
||||||
*
|
*
|
||||||
* ### Why This Design is Thread-Safe:
|
* ### Why This Design is Thread-Safe:
|
||||||
* - **Exclusive Access**: Only the current owner thread should access the arena
|
* - **Request-Scoped**: Each request gets its own Arena instance for isolation
|
||||||
* - **Transfer Points**: Ownership transfers happen at well-defined
|
* - **Move Semantics**: Arenas transferred via move, avoiding shared access
|
||||||
* synchronization points with proper memory barriers.
|
* - **Deferred Cleanup**: Arena destruction deferred to avoid malloc contention
|
||||||
* - **No Shared State**: Each arena is completely isolated - no shared data
|
* while holding connection mutex
|
||||||
* between different arena instances
|
|
||||||
*
|
*
|
||||||
* @warning Do not share Arena instances between threads. Use separate
|
* @warning Do not share Arena instances between threads. Use separate
|
||||||
* instances per thread or per logical unit of work.
|
* instances per thread or per logical unit of work.
|
||||||
|
|||||||
@@ -4,9 +4,10 @@
|
|||||||
#include <climits>
|
#include <climits>
|
||||||
#include <cstdio>
|
#include <cstdio>
|
||||||
#include <cstdlib>
|
#include <cstdlib>
|
||||||
|
#include <sys/epoll.h>
|
||||||
|
|
||||||
#include "metric.hpp"
|
#include "metric.hpp"
|
||||||
#include "server.hpp" // Need this for release_back_to_server implementation
|
#include "server.hpp" // Need this for server reference
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
// Thread-local metric instances
|
// Thread-local metric instances
|
||||||
@@ -35,11 +36,13 @@ thread_local auto write_eagain_failures =
|
|||||||
|
|
||||||
// Static thread-local storage for iovec buffer
|
// Static thread-local storage for iovec buffer
|
||||||
static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
|
static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
|
||||||
|
// Thread-local storage for arenas to be freed after unlocking
|
||||||
|
static thread_local std::vector<Arena> g_arenas_to_free;
|
||||||
|
|
||||||
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
|
||||||
size_t epoll_index, ConnectionHandler *handler,
|
size_t epoll_index, ConnectionHandler *handler,
|
||||||
WeakRef<Server> server)
|
WeakRef<Server> server)
|
||||||
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), arena_(),
|
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr),
|
||||||
handler_(handler), server_(std::move(server)) {
|
handler_(handler), server_(std::move(server)) {
|
||||||
auto server_ref = server_.lock();
|
auto server_ref = server_.lock();
|
||||||
// This should only be called from a member of Server itself, so I should
|
// This should only be called from a member of Server itself, so I should
|
||||||
@@ -75,15 +78,47 @@ Connection::~Connection() {
|
|||||||
// EINTR ignored - fd is guaranteed closed on Linux
|
// EINTR ignored - fd is guaranteed closed on Linux
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::append_message(std::string_view s, bool copy_to_arena) {
|
void Connection::append_message(std::span<std::string_view> data_parts,
|
||||||
if (copy_to_arena) {
|
Arena arena, bool close_after_send) {
|
||||||
char *arena_str = arena_.allocate<char>(s.size());
|
// Calculate total bytes for this message. Don't need to hold the lock yet.
|
||||||
std::memcpy(arena_str, s.data(), s.size());
|
size_t total_bytes = 0;
|
||||||
messages_.emplace_back(arena_str, s.size());
|
for (const auto &part : data_parts) {
|
||||||
} else {
|
total_bytes += part.size();
|
||||||
messages_.push_back(s);
|
}
|
||||||
|
|
||||||
|
std::unique_lock<std::mutex> lock(mutex_);
|
||||||
|
|
||||||
|
if (is_closed_) {
|
||||||
|
return; // Connection is closed, ignore message
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if queue was empty to determine if we need to enable EPOLLOUT
|
||||||
|
bool was_empty = message_queue_.empty();
|
||||||
|
|
||||||
|
// Add message to queue
|
||||||
|
message_queue_.emplace_back(
|
||||||
|
Message{std::move(arena), data_parts, close_after_send});
|
||||||
|
outgoing_bytes_queued_ += total_bytes;
|
||||||
|
|
||||||
|
// If this message has close_after_send flag, set connection flag
|
||||||
|
if (close_after_send) {
|
||||||
|
close_after_send_ = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
// If queue was empty, we need to add EPOLLOUT interest. We don't need to hold
|
||||||
|
// the lock
|
||||||
|
if (was_empty) {
|
||||||
|
auto server = server_.lock();
|
||||||
|
if (server) {
|
||||||
|
// Add EPOLLOUT interest - pipeline thread manages epoll
|
||||||
|
struct epoll_event event;
|
||||||
|
event.data.fd = fd_;
|
||||||
|
event.events = EPOLLIN | EPOLLOUT;
|
||||||
|
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
outgoing_bytes_queued_ += s.size();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int Connection::readBytes(char *buf, size_t buffer_size) {
|
int Connection::readBytes(char *buf, size_t buffer_size) {
|
||||||
@@ -115,27 +150,47 @@ int Connection::readBytes(char *buf, size_t buffer_size) {
|
|||||||
|
|
||||||
bool Connection::writeBytes() {
|
bool Connection::writeBytes() {
|
||||||
ssize_t total_bytes_written = 0;
|
ssize_t total_bytes_written = 0;
|
||||||
while (!messages_.empty()) {
|
|
||||||
|
while (true) {
|
||||||
|
// Build iovec array while holding mutex using thread-local buffer
|
||||||
|
int iov_count = 0;
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
|
||||||
|
if (is_closed_ || message_queue_.empty()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
// Build iovec array up to IOV_MAX limit using thread-local vector
|
// Build iovec array up to IOV_MAX limit using thread-local vector
|
||||||
assert(g_iovec_buffer.size() == IOV_MAX);
|
assert(g_iovec_buffer.size() == IOV_MAX);
|
||||||
struct iovec *iov = g_iovec_buffer.data();
|
struct iovec *iov = g_iovec_buffer.data();
|
||||||
int iov_count = 0;
|
|
||||||
|
|
||||||
for (auto it = messages_.begin();
|
for (auto &message : message_queue_) {
|
||||||
it != messages_.end() && iov_count < IOV_MAX; ++it) {
|
if (iov_count >= IOV_MAX)
|
||||||
const auto &msg = *it;
|
break;
|
||||||
|
|
||||||
|
for (const auto &part : message.data_parts) {
|
||||||
|
if (iov_count >= IOV_MAX)
|
||||||
|
break;
|
||||||
|
if (part.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
iov[iov_count] = {
|
iov[iov_count] = {
|
||||||
const_cast<void *>(static_cast<const void *>(msg.data())),
|
const_cast<void *>(static_cast<const void *>(part.data())),
|
||||||
msg.size()};
|
part.size()};
|
||||||
iov_count++;
|
iov_count++;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
assert(iov_count > 0);
|
if (iov_count == 0)
|
||||||
|
break;
|
||||||
|
} // Release mutex during I/O
|
||||||
|
|
||||||
|
// Perform I/O without holding mutex
|
||||||
ssize_t w;
|
ssize_t w;
|
||||||
for (;;) {
|
for (;;) {
|
||||||
struct msghdr msg = {};
|
struct msghdr msg = {};
|
||||||
msg.msg_iov = iov;
|
msg.msg_iov = g_iovec_buffer.data();
|
||||||
msg.msg_iovlen = iov_count;
|
msg.msg_iovlen = iov_count;
|
||||||
|
|
||||||
w = sendmsg(fd_, &msg, MSG_NOSIGNAL);
|
w = sendmsg(fd_, &msg, MSG_NOSIGNAL);
|
||||||
@@ -146,7 +201,6 @@ bool Connection::writeBytes() {
|
|||||||
if (errno == EAGAIN) {
|
if (errno == EAGAIN) {
|
||||||
// Increment EAGAIN failure metric
|
// Increment EAGAIN failure metric
|
||||||
write_eagain_failures.inc();
|
write_eagain_failures.inc();
|
||||||
// Increment bytes written metric before returning
|
|
||||||
if (total_bytes_written > 0) {
|
if (total_bytes_written > 0) {
|
||||||
bytes_written.inc(total_bytes_written);
|
bytes_written.inc(total_bytes_written);
|
||||||
}
|
}
|
||||||
@@ -161,30 +215,67 @@ bool Connection::writeBytes() {
|
|||||||
assert(w > 0);
|
assert(w > 0);
|
||||||
total_bytes_written += w;
|
total_bytes_written += w;
|
||||||
|
|
||||||
// Handle partial writes by updating string_view data/size
|
// Handle partial writes by updating message data_parts
|
||||||
size_t bytes_written = static_cast<size_t>(w);
|
{
|
||||||
outgoing_bytes_queued_ -= bytes_written;
|
std::lock_guard lock(mutex_);
|
||||||
while (bytes_written > 0 && !messages_.empty()) {
|
outgoing_bytes_queued_ -= w;
|
||||||
auto &front = messages_.front();
|
size_t bytes_remaining = static_cast<size_t>(w);
|
||||||
|
|
||||||
if (bytes_written >= front.size()) {
|
while (bytes_remaining > 0 && !message_queue_.empty()) {
|
||||||
// This message is completely written
|
auto &front_message = message_queue_.front();
|
||||||
bytes_written -= front.size();
|
bool message_complete = true;
|
||||||
messages_.pop_front();
|
|
||||||
|
for (auto &part : front_message.data_parts) {
|
||||||
|
if (part.empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (bytes_remaining >= part.size()) {
|
||||||
|
// This part is completely written
|
||||||
|
bytes_remaining -= part.size();
|
||||||
|
part = std::string_view(); // Mark as consumed
|
||||||
} else {
|
} else {
|
||||||
// Partial write of this message - update string_view
|
// Partial write of this part
|
||||||
front = std::string_view(front.data() + bytes_written,
|
part = std::string_view(part.data() + bytes_remaining,
|
||||||
front.size() - bytes_written);
|
part.size() - bytes_remaining);
|
||||||
bytes_written = 0;
|
bytes_remaining = 0;
|
||||||
|
message_complete = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (message_complete) {
|
||||||
|
// Move arena to thread-local vector for deferred cleanup
|
||||||
|
g_arenas_to_free.emplace_back(std::move(front_message.arena));
|
||||||
|
message_queue_.pop_front();
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if queue is empty and remove EPOLLOUT interest
|
||||||
|
{
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
if (message_queue_.empty()) {
|
||||||
|
auto server = server_.lock();
|
||||||
|
if (server) {
|
||||||
|
struct epoll_event event;
|
||||||
|
event.data.fd = fd_;
|
||||||
|
event.events = EPOLLIN; // Remove EPOLLOUT
|
||||||
|
epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(messages_.empty());
|
|
||||||
|
|
||||||
// Increment bytes written metric
|
// Increment bytes written metric
|
||||||
if (total_bytes_written > 0) {
|
if (total_bytes_written > 0) {
|
||||||
bytes_written.inc(total_bytes_written);
|
bytes_written.inc(total_bytes_written);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clean up arenas after all mutex operations are complete
|
||||||
|
// This avoids holding the connection mutex while free() potentially contends
|
||||||
|
g_arenas_to_free.clear();
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,8 @@
|
|||||||
#include <cassert>
|
#include <cassert>
|
||||||
#include <cstring>
|
#include <cstring>
|
||||||
#include <deque>
|
#include <deque>
|
||||||
|
#include <mutex>
|
||||||
|
#include <span>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <sys/uio.h>
|
#include <sys/uio.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
@@ -15,33 +17,31 @@
|
|||||||
#define __has_feature(x) 0
|
#define __has_feature(x) 0
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/**
|
|
||||||
* Represents a single client connection with efficient memory management.
|
|
||||||
*
|
|
||||||
* Connection ownership model:
|
|
||||||
* - Created by I/O thread, processed immediately, then transferred to epoll via
|
|
||||||
* raw pointer
|
|
||||||
* - I/O threads claim ownership by wrapping raw pointer in unique_ptr
|
|
||||||
* - I/O thread optionally passes ownership to a thread pipeline
|
|
||||||
* - Owner eventually transfers back to epoll by releasing unique_ptr to raw
|
|
||||||
* pointer
|
|
||||||
* - RAII cleanup happens if I/O thread doesn't transfer back
|
|
||||||
*
|
|
||||||
* Arena allocator thread safety:
|
|
||||||
* Each Connection contains its own Arena instance that is accessed
|
|
||||||
* exclusively by the thread that currently owns the connection. This ensures
|
|
||||||
* thread safety without requiring locks:
|
|
||||||
* - Arena is used by the owning thread for I/O buffers, request parsing, and
|
|
||||||
* response generation
|
|
||||||
* - Arena memory is automatically freed when the connection is destroyed
|
|
||||||
* - reset() should only be called by the current owner thread
|
|
||||||
*
|
|
||||||
* Only the handler interface methods are public - all networking details are
|
|
||||||
* private.
|
|
||||||
*/
|
|
||||||
// Forward declaration
|
// Forward declaration
|
||||||
struct Server;
|
struct Server;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Represents a single client connection with thread-safe concurrent access.
|
||||||
|
*
|
||||||
|
* Connection ownership model:
|
||||||
|
* - Server owns all connections
|
||||||
|
* - Handlers receive Connection& references, and can keep a WeakRef to
|
||||||
|
* Connection for async responses.
|
||||||
|
* - Multiple pipeline threads can safely access connection concurrently
|
||||||
|
* - I/O thread has exclusive access to socket operations
|
||||||
|
*
|
||||||
|
* Threading model:
|
||||||
|
* - Single mutex protects all connection state
|
||||||
|
* - Pipeline threads call Connection methods (append_message, etc.)
|
||||||
|
* - I/O thread processes socket events and message queue
|
||||||
|
* - Pipeline threads manage epoll interests via Connection methods
|
||||||
|
* - Connection tracks closed state to prevent EBADF errors
|
||||||
|
*
|
||||||
|
* Arena allocator usage:
|
||||||
|
* - Request-scoped arenas created by handlers for each request
|
||||||
|
* - No connection-owned arena for parsing/response generation
|
||||||
|
* - Message queue stores spans + owning arenas until I/O completion
|
||||||
|
*/
|
||||||
struct Connection {
|
struct Connection {
|
||||||
// No public constructor or factory method - only Server can create
|
// No public constructor or factory method - only Server can create
|
||||||
// connections
|
// connections
|
||||||
@@ -64,90 +64,63 @@ struct Connection {
|
|||||||
// Handler interface - public methods that handlers can use
|
// Handler interface - public methods that handlers can use
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Queue a message to be sent to the client.
|
* @brief Queue an atomic message to be sent to the client.
|
||||||
*
|
*
|
||||||
* Adds data to the connection's outgoing message queue. The data will be sent
|
* Adds a complete message with all associated data to the connection's
|
||||||
* asynchronously by the server's I/O threads using efficient vectored
|
* outgoing message queue. The message will be sent asynchronously by a
|
||||||
* I/O.
|
* server I/O thread using efficient vectored I/O.
|
||||||
*
|
*
|
||||||
* @param s The data to send (string view parameter for efficiency)
|
* @param data_parts Span of string_views pointing to arena-allocated data
|
||||||
* @param copy_to_arena If true (default), copies data to the connection's
|
* @param arena Arena that owns all the memory referenced by data_parts
|
||||||
* arena for safe storage. If false, the caller must ensure the data remains
|
* @param close_after_send Whether to close connection after sending this
|
||||||
* valid until all queued messages are sent.
|
* message
|
||||||
*
|
*
|
||||||
* @warning Thread Safety: Only call from the thread that currently owns this
|
* @note Thread Safety: This method is thread-safe and can be called
|
||||||
* connection. The arena allocator is not thread-safe.
|
* concurrently from multiple pipeline threads.
|
||||||
*
|
*
|
||||||
* @note Performance: Use copy_to_arena=false for static strings or data with
|
* @note The memory referenced by the data_parts span, must outlive @p arena.
|
||||||
* guaranteed lifetime, copy_to_arena=true for temporary/dynamic data.
|
* The arena will be moved and kept alive until the message is fully sent.
|
||||||
*
|
*
|
||||||
* Example usage:
|
* Example usage:
|
||||||
* ```cpp
|
* ```cpp
|
||||||
* conn->append_message("HTTP/1.1 200 OK\r\n\r\n", false); // Static string
|
* Arena arena;
|
||||||
* conn->append_message(dynamic_response, true); // Dynamic data
|
* auto* parts = arena.allocate<std::string_view>(2);
|
||||||
* conn->append_message(arena_allocated_data, false); // Arena data
|
* parts[0] = build_header(arena);
|
||||||
|
* parts[1] = build_body(arena);
|
||||||
|
* conn.append_message({parts, 2}, std::move(arena));
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
void append_message(std::string_view s, bool copy_to_arena = true);
|
void append_message(std::span<std::string_view> data_parts, Arena arena,
|
||||||
|
bool close_after_send = false);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Mark the connection to be closed after sending all queued messages.
|
* @brief Get a WeakRef to this connection for async operations.
|
||||||
*
|
*
|
||||||
* Sets a flag that instructs the server to close this connection gracefully
|
* Returns a WeakRef that can be safely used to access this connection
|
||||||
* after all currently queued messages have been successfully sent to the
|
* from other threads, such as pipeline processing threads. The WeakRef
|
||||||
* client. This enables proper connection cleanup for protocols like HTTP/1.0
|
* allows safe access even if the connection might be destroyed by the
|
||||||
* or when implementing connection limits.
|
* time the async operation executes.
|
||||||
*
|
*
|
||||||
* @note The connection will remain active until:
|
* @return WeakRef to this connection
|
||||||
* 1. All queued messages are sent to the client
|
|
||||||
* 2. The server processes the close flag during the next I/O cycle
|
|
||||||
* 3. The connection is properly closed and cleaned up
|
|
||||||
*
|
*
|
||||||
* @warning Thread Safety: Only call from the thread that currently owns this
|
* @note Thread Safety: This method is thread-safe.
|
||||||
* connection.
|
|
||||||
*
|
*
|
||||||
* Typical usage:
|
* @note The WeakRef should be used with lock() to safely access the
|
||||||
|
* connection. If lock() returns null, the connection has been destroyed.
|
||||||
|
*
|
||||||
|
* Example usage:
|
||||||
* ```cpp
|
* ```cpp
|
||||||
* conn->append_message("HTTP/1.1 200 OK\r\n\r\nBye!");
|
* auto weak_conn = conn.get_weak_ref();
|
||||||
* conn->close_after_send(); // Close after sending response
|
* async_processor.submit([weak_conn, request_data]() {
|
||||||
|
* if (auto conn = weak_conn.lock()) {
|
||||||
|
* Arena arena;
|
||||||
|
* auto response = process_request(request_data, arena);
|
||||||
|
* conn->append_message({&response, 1}, std::move(arena));
|
||||||
|
* }
|
||||||
|
* });
|
||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
void close_after_send() { closeConnection_ = true; }
|
WeakRef<Connection> get_weak_ref() const { return self_ref_.copy(); }
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Get access to the connection's arena allocator.
|
|
||||||
*
|
|
||||||
* Returns a reference to this connection's private Arena instance,
|
|
||||||
* which should be used for all temporary allocations during request
|
|
||||||
* processing. The arena provides extremely fast allocation (~1ns) and
|
|
||||||
* automatic cleanup when the connection is destroyed or reset.
|
|
||||||
*
|
|
||||||
* @return Reference to the connection's arena allocator
|
|
||||||
*
|
|
||||||
* @warning Thread Safety: Only access from the thread that currently owns
|
|
||||||
* this connection. The arena allocator is not thread-safe and concurrent
|
|
||||||
* access will result in undefined behavior.
|
|
||||||
*
|
|
||||||
* @note Memory Lifecycle: Arena memory is automatically freed when:
|
|
||||||
* - The connection is destroyed
|
|
||||||
* - reset() is called (keeps first block, frees others)
|
|
||||||
* - The connection is moved (arena ownership transfers)
|
|
||||||
*
|
|
||||||
* Best practices:
|
|
||||||
* ```cpp
|
|
||||||
* Arena& arena = conn->get_arena();
|
|
||||||
*
|
|
||||||
* // Allocate temporary parsing buffers
|
|
||||||
* char* buffer = arena.allocate<char>(1024);
|
|
||||||
*
|
|
||||||
* // Construct temporary objects
|
|
||||||
* auto* request = arena.construct<HttpRequest>(arena);
|
|
||||||
*
|
|
||||||
* // Use arena-backed STL containers
|
|
||||||
* std::vector<Token, ArenaStlAllocator<Token>> tokens{&arena};
|
|
||||||
* ```
|
|
||||||
*/
|
|
||||||
Arena &get_arena() { return arena_; }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Get the unique identifier for this connection.
|
* @brief Get the unique identifier for this connection.
|
||||||
@@ -210,11 +183,14 @@ struct Connection {
|
|||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
int64_t outgoing_bytes_queued() const {
|
int64_t outgoing_bytes_queued() const {
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
#ifndef NDEBUG
|
#ifndef NDEBUG
|
||||||
// Debug build: validate counter accuracy
|
// Debug build: validate counter accuracy
|
||||||
int64_t computed_total = 0;
|
int64_t computed_total = 0;
|
||||||
for (auto s : messages_) {
|
for (const auto &message : message_queue_) {
|
||||||
computed_total += s.size();
|
for (const auto &part : message.data_parts) {
|
||||||
|
computed_total += part.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
assert(
|
assert(
|
||||||
outgoing_bytes_queued_ == computed_total &&
|
outgoing_bytes_queued_ == computed_total &&
|
||||||
@@ -268,50 +244,14 @@ struct Connection {
|
|||||||
*/
|
*/
|
||||||
void *user_data = nullptr;
|
void *user_data = nullptr;
|
||||||
|
|
||||||
/**
|
|
||||||
* Reset the connection's arena allocator and message queue for reuse.
|
|
||||||
*
|
|
||||||
* This method efficiently reclaims arena memory by keeping the first block
|
|
||||||
* and freeing all others, then reinitializes the message queue.
|
|
||||||
*
|
|
||||||
* @warning Thread Safety: This method should ONLY be called by the thread
|
|
||||||
* that currently owns this connection. Calling reset() while the connection
|
|
||||||
* is being transferred between threads or accessed by another thread will
|
|
||||||
* result in undefined behavior.
|
|
||||||
*
|
|
||||||
* @note The assert(messages_.empty()) ensures all outgoing data has been
|
|
||||||
* sent before resetting. This prevents data loss and indicates the connection
|
|
||||||
* is in a clean state for reuse.
|
|
||||||
*
|
|
||||||
* Typical usage pattern:
|
|
||||||
* - HTTP handlers call this after completing a request/response cycle
|
|
||||||
*/
|
|
||||||
void reset() {
|
|
||||||
assert(messages_.empty());
|
|
||||||
outgoing_bytes_queued_ = 0;
|
|
||||||
arena_.reset();
|
|
||||||
messages_ =
|
|
||||||
std::deque<std::string_view, ArenaStlAllocator<std::string_view>>{
|
|
||||||
ArenaStlAllocator<std::string_view>{&arena_}};
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @note Ownership Transfer: To release a connection back to the server for
|
|
||||||
* continued processing, use the static method:
|
|
||||||
* ```cpp
|
|
||||||
* Server::release_back_to_server(std::move(connection_ptr));
|
|
||||||
* ```
|
|
||||||
*
|
|
||||||
* This is the correct way to return connection ownership when:
|
|
||||||
* - A handler has taken ownership via unique_ptr.release()
|
|
||||||
* - Background processing of the connection is complete
|
|
||||||
* - The connection should resume normal server-managed I/O processing
|
|
||||||
*
|
|
||||||
* The method is thread-safe and handles the case where the server may have
|
|
||||||
* been destroyed while the connection was being processed elsewhere.
|
|
||||||
*/
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
struct Message {
|
||||||
|
Arena arena; // Owns all the memory (movable)
|
||||||
|
std::span<std::string_view> data_parts; // Points to arena-allocated memory
|
||||||
|
// (mutable for partial writes)
|
||||||
|
bool close_after_send = false; // Close connection after sending
|
||||||
|
};
|
||||||
|
|
||||||
// Server is a friend and can access all networking internals
|
// Server is a friend and can access all networking internals
|
||||||
friend struct Server;
|
friend struct Server;
|
||||||
|
|
||||||
@@ -340,26 +280,31 @@ private:
|
|||||||
int readBytes(char *buf, size_t buffer_size);
|
int readBytes(char *buf, size_t buffer_size);
|
||||||
bool writeBytes();
|
bool writeBytes();
|
||||||
|
|
||||||
// Direct access methods for Server
|
// Direct access methods for Server (must hold mutex)
|
||||||
int getFd() const { return fd_; }
|
int getFd() const { return fd_; }
|
||||||
bool has_messages() const { return !messages_.empty(); }
|
bool has_messages() const { return !message_queue_.empty(); }
|
||||||
bool should_close() const { return closeConnection_; }
|
bool should_close() const { return close_after_send_; }
|
||||||
size_t getEpollIndex() const { return epoll_index_; }
|
size_t getEpollIndex() const { return epoll_index_; }
|
||||||
|
|
||||||
|
// Server can set self-reference after creation
|
||||||
|
void setSelfRef(WeakRef<Connection> self) { self_ref_ = std::move(self); }
|
||||||
|
|
||||||
|
// Immutable connection properties
|
||||||
const int fd_;
|
const int fd_;
|
||||||
const int64_t id_;
|
const int64_t id_;
|
||||||
const size_t epoll_index_; // Index of the epoll instance this connection uses
|
const size_t epoll_index_; // Index of the epoll instance this connection uses
|
||||||
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
|
struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6
|
||||||
Arena arena_;
|
|
||||||
ConnectionHandler *handler_;
|
ConnectionHandler *handler_;
|
||||||
WeakRef<Server> server_; // Weak reference to server for safe cleanup
|
WeakRef<Server> server_; // Weak reference to server for safe cleanup
|
||||||
|
WeakRef<Connection> self_ref_; // WeakRef to self for get_weak_ref()
|
||||||
|
|
||||||
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
|
// Thread-safe state (protected by mutex_)
|
||||||
ArenaStlAllocator<std::string_view>{&arena_}};
|
mutable std::mutex mutex_; // Protects all mutable state
|
||||||
|
std::deque<Message>
|
||||||
// Counter tracking total bytes queued for transmission
|
message_queue_; // Queue of messages to send. Protectec by
|
||||||
int64_t outgoing_bytes_queued_{0};
|
// mutex_, but if non-empty mutex_ can be
|
||||||
|
// dropped while server accesses existing elements.
|
||||||
// Whether or not to close the connection after completing writing the
|
int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes
|
||||||
// response
|
bool close_after_send_{false}; // Close after sending all messages
|
||||||
bool closeConnection_{false};
|
bool is_closed_{false}; // Connection closed state
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -26,21 +26,21 @@ public:
|
|||||||
* Process incoming data from a connection.
|
* Process incoming data from a connection.
|
||||||
*
|
*
|
||||||
* @param data Incoming data buffer (may be partial message)
|
* @param data Incoming data buffer (may be partial message)
|
||||||
* @param conn_ptr Unique pointer to connection - handler can take ownership
|
* @param conn Connection reference - server retains ownership
|
||||||
* by releasing it
|
|
||||||
*
|
*
|
||||||
* Implementation should:
|
* Implementation should:
|
||||||
* - Parse incoming data using arena allocator when needed
|
* - Create request-scoped Arena for parsing and response generation
|
||||||
* - Use conn_ptr->append_message() to queue response data to be sent
|
* - Parse incoming data using the request arena
|
||||||
|
* - Use conn.append_message() to queue response data to be sent
|
||||||
* - Handle partial messages and streaming protocols appropriately
|
* - Handle partial messages and streaming protocols appropriately
|
||||||
* - Can take ownership by calling conn_ptr.release() to pass to other threads
|
* - Use conn.get_weak_ref() for async processing if needed
|
||||||
* - If ownership is taken, handler must call Server::release_back_to_server()
|
*
|
||||||
* when done
|
* @note `data` lifetime ends after the call to on_data_arrived.
|
||||||
* @note `data` is *not* owned by the connection arena, and its lifetime ends
|
|
||||||
* after the call to on_data_arrived.
|
|
||||||
* @note May be called from an arbitrary server thread.
|
* @note May be called from an arbitrary server thread.
|
||||||
|
* @note Handler can safely access connection concurrently via thread-safe
|
||||||
|
* methods.
|
||||||
*/
|
*/
|
||||||
virtual void on_data_arrived(std::string_view /*data*/, Ref<Connection> &) {};
|
virtual void on_data_arrived(std::string_view /*data*/, Connection &) {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when data has been successfully written to the connection.
|
* Called when data has been successfully written to the connection.
|
||||||
@@ -50,29 +50,26 @@ public:
|
|||||||
* - Implementing backpressure for continuous data streams
|
* - Implementing backpressure for continuous data streams
|
||||||
* - Progress monitoring for long-running transfers
|
* - Progress monitoring for long-running transfers
|
||||||
*
|
*
|
||||||
* @param conn_ptr Connection that made write progress - handler can take
|
* @param conn Connection that made write progress - server retains ownership
|
||||||
* ownership
|
|
||||||
* @note May be called from an arbitrary server thread.
|
* @note May be called from an arbitrary server thread.
|
||||||
* @note Called during writes, not necessarily when buffer becomes empty
|
* @note Called during writes, not necessarily when buffer becomes empty
|
||||||
*/
|
*/
|
||||||
virtual void on_write_progress(Ref<Connection> &) {}
|
virtual void on_write_progress(Connection &) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when the connection's outgoing write buffer becomes empty.
|
* Called when the connection's outgoing write buffer becomes empty.
|
||||||
*
|
*
|
||||||
* This indicates all queued messages have been successfully written
|
* This indicates all queued messages have been successfully written
|
||||||
* to the socket. Useful for:
|
* to the socket. Useful for:
|
||||||
* - Resetting arena allocators safely
|
|
||||||
* - Implementing keep-alive connection reuse
|
* - Implementing keep-alive connection reuse
|
||||||
* - Closing connections after final response
|
* - Closing connections after final response
|
||||||
* - Relieving backpressure conditions
|
* - Relieving backpressure conditions
|
||||||
*
|
*
|
||||||
* @param conn_ptr Connection with empty write buffer - handler can take
|
* @param conn Connection with empty write buffer - server retains ownership
|
||||||
* ownership
|
|
||||||
* @note May be called from an arbitrary server thread.
|
* @note May be called from an arbitrary server thread.
|
||||||
* @note Only called on transitions from non-empty → empty buffer
|
* @note Only called on transitions from non-empty → empty buffer
|
||||||
*/
|
*/
|
||||||
virtual void on_write_buffer_drained(Ref<Connection> &) {}
|
virtual void on_write_buffer_drained(Connection &) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when a new connection is established.
|
* Called when a new connection is established.
|
||||||
@@ -101,11 +98,9 @@ public:
|
|||||||
*
|
*
|
||||||
* This hook is called after on_data_arrived, on_write_progress, or
|
* This hook is called after on_data_arrived, on_write_progress, or
|
||||||
* on_write_buffer_drained has been called for each connection in the batch.
|
* on_write_buffer_drained has been called for each connection in the batch.
|
||||||
* The handler can take ownership of the connections by moving the unique_ptr
|
* All connections remain server-owned.
|
||||||
* out of the span. Any connections left in the span will remain owned by the
|
|
||||||
* server.
|
|
||||||
*
|
*
|
||||||
* @param batch A span of unique_ptrs to the connections in the batch.
|
* @param batch A span of connection references in the batch.
|
||||||
*/
|
*/
|
||||||
virtual void on_batch_complete(std::span<Ref<Connection>> /*batch*/) {}
|
virtual void on_batch_complete(std::span<Connection *> /*batch*/) {}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -896,9 +896,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
|
|||||||
perfetto::Flow::Global(state->http_request_id));
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return connection to server for further processing or cleanup
|
|
||||||
Server::release_back_to_server(std::move(commit_entry.connection));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
return false; // Continue processing
|
||||||
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
} else if constexpr (std::is_same_v<T, StatusEntry>) {
|
||||||
// Process status entry: return connection to server
|
// Process status entry: return connection to server
|
||||||
@@ -911,9 +908,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
|
|||||||
perfetto::Flow::Global(state->http_request_id));
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return connection to server for further processing or cleanup
|
|
||||||
Server::release_back_to_server(std::move(status_entry.connection));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
return false; // Continue processing
|
||||||
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
} else if constexpr (std::is_same_v<T, HealthCheckEntry>) {
|
||||||
// Process health check entry: return connection to server
|
// Process health check entry: return connection to server
|
||||||
@@ -926,10 +920,6 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
|
|||||||
perfetto::Flow::Global(state->http_request_id));
|
perfetto::Flow::Global(state->http_request_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return connection to server for further processing or cleanup
|
|
||||||
Server::release_back_to_server(
|
|
||||||
std::move(health_check_entry.connection));
|
|
||||||
|
|
||||||
return false; // Continue processing
|
return false; // Continue processing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -138,51 +138,6 @@ void Server::shutdown() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void Server::release_back_to_server(Ref<Connection> connection) {
|
|
||||||
if (!connection) {
|
|
||||||
return; // Nothing to release
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to get the server from the connection's weak_ptr
|
|
||||||
if (auto server = connection->server_.lock()) {
|
|
||||||
// Server still exists - pass unique_ptr directly
|
|
||||||
server->receiveConnectionBack(std::move(connection));
|
|
||||||
}
|
|
||||||
|
|
||||||
// If server is gone, connection will be automatically cleaned up when
|
|
||||||
// unique_ptr destructs
|
|
||||||
}
|
|
||||||
|
|
||||||
void Server::receiveConnectionBack(Ref<Connection> connection) {
|
|
||||||
if (!connection) {
|
|
||||||
return; // Nothing to process
|
|
||||||
}
|
|
||||||
|
|
||||||
// Re-add the connection to epoll for continued processing
|
|
||||||
struct epoll_event event{};
|
|
||||||
|
|
||||||
if (!connection->has_messages()) {
|
|
||||||
event.events = EPOLLIN | EPOLLONESHOT;
|
|
||||||
} else {
|
|
||||||
event.events = EPOLLOUT | EPOLLONESHOT;
|
|
||||||
}
|
|
||||||
|
|
||||||
int fd = connection->getFd();
|
|
||||||
event.data.fd = fd;
|
|
||||||
|
|
||||||
// Store connection in registry before adding to epoll
|
|
||||||
// This mirrors the pattern used in process_connection_batch
|
|
||||||
size_t epoll_index = connection->getEpollIndex();
|
|
||||||
int epollfd = epoll_fds_[epoll_index];
|
|
||||||
connection_registry_.store(fd, std::move(connection));
|
|
||||||
|
|
||||||
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
|
|
||||||
perror("epoll_ctl MOD in receiveConnectionBack");
|
|
||||||
// Remove from registry and clean up on failure
|
|
||||||
(void)connection_registry_.remove(fd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int Server::create_local_connection() {
|
int Server::create_local_connection() {
|
||||||
int sockets[2];
|
int sockets[2];
|
||||||
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
|
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
|
||||||
@@ -224,7 +179,7 @@ int Server::create_local_connection() {
|
|||||||
|
|
||||||
// Add to appropriate epoll instance
|
// Add to appropriate epoll instance
|
||||||
struct epoll_event event{};
|
struct epoll_event event{};
|
||||||
event.events = EPOLLIN | EPOLLONESHOT;
|
event.events = EPOLLIN;
|
||||||
event.data.fd = server_fd;
|
event.data.fd = server_fd;
|
||||||
|
|
||||||
int epollfd = epoll_fds_[epoll_index];
|
int epollfd = epoll_fds_[epoll_index];
|
||||||
@@ -353,7 +308,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
|||||||
assert(conn);
|
assert(conn);
|
||||||
|
|
||||||
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
|
if (events[i].events & (EPOLLERR | EPOLLHUP)) {
|
||||||
// unique_ptr will automatically delete on scope exit
|
// Connection will be destroyed on scope exit
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -467,14 +422,8 @@ void Server::process_connection_reads(Ref<Connection> &conn, int events) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call handler with unique_ptr - handler can take ownership if needed
|
// Call handler with connection reference - server retains ownership
|
||||||
handler_.on_data_arrived(std::string_view{buf, size_t(r)}, conn);
|
handler_.on_data_arrived(std::string_view{buf, size_t(r)}, *conn);
|
||||||
|
|
||||||
// If handler took ownership (conn is now null), return true to indicate
|
|
||||||
// processing is done
|
|
||||||
if (!conn) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -492,21 +441,12 @@ void Server::process_connection_writes(Ref<Connection> &conn, int /*events*/) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call handler with unique_ptr - handler can take ownership if needed
|
// Call handler with connection reference - server retains ownership
|
||||||
handler_.on_write_progress(conn);
|
handler_.on_write_progress(*conn);
|
||||||
// If handler took ownership (conn is now null), return true to indicate
|
|
||||||
// processing is done
|
|
||||||
if (!conn) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if buffer became empty (transition from non-empty -> empty)
|
// Check if buffer became empty (transition from non-empty -> empty)
|
||||||
if (had_messages && !conn->has_messages()) {
|
if (had_messages && !conn->has_messages()) {
|
||||||
handler_.on_write_buffer_drained(conn);
|
handler_.on_write_buffer_drained(*conn);
|
||||||
// If handler took ownership (conn is now null), return
|
|
||||||
if (!conn) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if we should close the connection according to application
|
// Check if we should close the connection according to application
|
||||||
@@ -535,8 +475,15 @@ void Server::process_connection_batch(int epollfd,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call batch complete handler - handlers can take ownership here
|
// Call batch complete handler with connection pointers
|
||||||
handler_.on_batch_complete(batch);
|
std::vector<Connection *> conn_ptrs;
|
||||||
|
conn_ptrs.reserve(batch.size());
|
||||||
|
for (auto &conn_ref : batch) {
|
||||||
|
if (conn_ref) {
|
||||||
|
conn_ptrs.push_back(conn_ref.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
handler_.on_batch_complete(conn_ptrs);
|
||||||
|
|
||||||
// Transfer all remaining connections back to epoll
|
// Transfer all remaining connections back to epoll
|
||||||
for (auto &conn_ptr : batch) {
|
for (auto &conn_ptr : batch) {
|
||||||
@@ -545,13 +492,13 @@ void Server::process_connection_batch(int epollfd,
|
|||||||
|
|
||||||
struct epoll_event event{};
|
struct epoll_event event{};
|
||||||
if (!conn_ptr->has_messages()) {
|
if (!conn_ptr->has_messages()) {
|
||||||
event.events = EPOLLIN | EPOLLONESHOT;
|
event.events = EPOLLIN;
|
||||||
} else {
|
} else {
|
||||||
event.events = EPOLLOUT | EPOLLONESHOT;
|
event.events = EPOLLIN | EPOLLOUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
event.data.fd = fd; // Use file descriptor for epoll
|
event.data.fd = fd;
|
||||||
// Put connection back in registry since handler didn't take ownership.
|
// Put connection back in registry since handler didn't take ownership
|
||||||
// Must happen before epoll_ctl
|
// Must happen before epoll_ctl
|
||||||
connection_registry_.store(fd, std::move(conn_ptr));
|
connection_registry_.store(fd, std::move(conn_ptr));
|
||||||
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
|
if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
|
||||||
|
|||||||
@@ -95,19 +95,6 @@ struct Server {
|
|||||||
*/
|
*/
|
||||||
int create_local_connection();
|
int create_local_connection();
|
||||||
|
|
||||||
/**
|
|
||||||
* Release a connection back to its server for continued processing.
|
|
||||||
*
|
|
||||||
* This static method safely returns ownership of a connection back to its
|
|
||||||
* server. If the server has been destroyed, the connection will be safely
|
|
||||||
* cleaned up.
|
|
||||||
*
|
|
||||||
* This method is thread-safe and can be called from any thread.
|
|
||||||
*
|
|
||||||
* @param connection unique_ptr to the connection being released back
|
|
||||||
*/
|
|
||||||
static void release_back_to_server(Ref<Connection> connection);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
friend struct Connection;
|
friend struct Connection;
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -55,7 +55,6 @@ TEST_CASE(
|
|||||||
}
|
}
|
||||||
assert(message.conn);
|
assert(message.conn);
|
||||||
message.conn->append_message(message.data);
|
message.conn->append_message(message.data);
|
||||||
Server::release_back_to_server(std::move(message.conn));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}};
|
}};
|
||||||
|
|||||||
Reference in New Issue
Block a user