Add on_write_progress
This commit is contained in:
11
design.md
11
design.md
@@ -105,7 +105,6 @@ See `config.md` for complete configuration documentation.
|
|||||||
- **Safe shutdown mechanism** with async-signal-safe shutdown() method
|
- **Safe shutdown mechanism** with async-signal-safe shutdown() method
|
||||||
- **Connection ownership management** with automatic cleanup on server destruction
|
- **Connection ownership management** with automatic cleanup on server destruction
|
||||||
- **Pluggable protocol handlers** via ConnectionHandler interface
|
- **Pluggable protocol handlers** via ConnectionHandler interface
|
||||||
- TODO design a way to stream server sent events
|
|
||||||
|
|
||||||
Key features:
|
Key features:
|
||||||
- Multi-threaded architecture: separate accept and network thread pools
|
- Multi-threaded architecture: separate accept and network thread pools
|
||||||
@@ -135,7 +134,7 @@ Key features:
|
|||||||
- **Connection lifecycle hooks** for initialization and cleanup
|
- **Connection lifecycle hooks** for initialization and cleanup
|
||||||
|
|
||||||
Key features:
|
Key features:
|
||||||
- process_data() with unique_ptr<Connection>& for ownership transfer
|
- on_data_arrived()/on_write_progress() with unique_ptr<Connection>& for ownership transfer
|
||||||
- on_connection_established/closed() hooks for protocol state management
|
- on_connection_established/closed() hooks for protocol state management
|
||||||
- Zero-copy data processing with arena allocator integration
|
- Zero-copy data processing with arena allocator integration
|
||||||
- Thread-safe ownership transfer via Server::releaseBackToServer()
|
- Thread-safe ownership transfer via Server::releaseBackToServer()
|
||||||
@@ -282,7 +281,7 @@ The modular design allows each component to be optimized independently while mai
|
|||||||
|
|
||||||
### Adding New Protocol Handlers
|
### Adding New Protocol Handlers
|
||||||
- Inherit from `ConnectionHandler` in `src/connection_handler.hpp`
|
- Inherit from `ConnectionHandler` in `src/connection_handler.hpp`
|
||||||
- Implement `process_data()` with proper ownership semantics
|
- Implement `on_data_arrived()` with proper ownership semantics
|
||||||
- Use connection's arena allocator for temporary allocations: `conn->getArena()`
|
- Use connection's arena allocator for temporary allocations: `conn->getArena()`
|
||||||
- Handle partial messages and streaming protocols appropriately
|
- Handle partial messages and streaming protocols appropriately
|
||||||
- Use `Server::releaseBackToServer()` if taking ownership for async processing
|
- Use `Server::releaseBackToServer()` if taking ownership for async processing
|
||||||
@@ -338,7 +337,7 @@ auto server = Server::create(config, handler);
|
|||||||
```cpp
|
```cpp
|
||||||
class HttpHandler : public ConnectionHandler {
|
class HttpHandler : public ConnectionHandler {
|
||||||
public:
|
public:
|
||||||
void process_data(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
||||||
// Parse HTTP request using connection's arena
|
// Parse HTTP request using connection's arena
|
||||||
ArenaAllocator& arena = conn_ptr->getArena();
|
ArenaAllocator& arena = conn_ptr->getArena();
|
||||||
|
|
||||||
@@ -354,7 +353,7 @@ public:
|
|||||||
```cpp
|
```cpp
|
||||||
class AsyncHandler : public ConnectionHandler {
|
class AsyncHandler : public ConnectionHandler {
|
||||||
public:
|
public:
|
||||||
void process_data(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
||||||
// Take ownership for async processing
|
// Take ownership for async processing
|
||||||
auto connection = std::move(conn_ptr); // conn_ptr is now null
|
auto connection = std::move(conn_ptr); // conn_ptr is now null
|
||||||
|
|
||||||
@@ -369,6 +368,8 @@ public:
|
|||||||
};
|
};
|
||||||
```
|
```
|
||||||
|
|
||||||
|
TODO add example of streaming data
|
||||||
|
|
||||||
### Arena-Based String Handling
|
### Arena-Based String Handling
|
||||||
```cpp
|
```cpp
|
||||||
// Preferred: Zero-copy string view with arena allocation
|
// Preferred: Zero-copy string view with arena allocation
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ bool Connection::writeBytes() {
|
|||||||
// This reclaims memory from request parsing and response generation
|
// This reclaims memory from request parsing and response generation
|
||||||
arena_.reset();
|
arena_.reset();
|
||||||
|
|
||||||
return closeConnection_;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Connection::tsan_acquire() {
|
void Connection::tsan_acquire() {
|
||||||
|
|||||||
@@ -44,6 +44,13 @@ struct Connection {
|
|||||||
void closeAfterSend() { closeConnection_ = true; }
|
void closeAfterSend() { closeConnection_ = true; }
|
||||||
ArenaAllocator &getArena() { return arena_; }
|
ArenaAllocator &getArena() { return arena_; }
|
||||||
int64_t getId() const { return id_; }
|
int64_t getId() const { return id_; }
|
||||||
|
int64_t outgoingBytesQueued() const {
|
||||||
|
int64_t result = 0;
|
||||||
|
for (auto s : messages_) {
|
||||||
|
result += s.size();
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
// Note: To release connection back to server, use
|
// Note: To release connection back to server, use
|
||||||
// Server::releaseBackToServer(std::move(connection_ptr))
|
// Server::releaseBackToServer(std::move(connection_ptr))
|
||||||
|
|||||||
@@ -35,12 +35,18 @@ public:
|
|||||||
* - If ownership is taken, handler must call Server::releaseBackToServer()
|
* - If ownership is taken, handler must call Server::releaseBackToServer()
|
||||||
* when done
|
* when done
|
||||||
* @note `data` is *not* owned by the connection arena, and its lifetime ends
|
* @note `data` is *not* owned by the connection arena, and its lifetime ends
|
||||||
* after the call to process_data.
|
* after the call to on_data_arrived.
|
||||||
* @note May be called from an arbitrary network thread.
|
* @note May be called from an arbitrary network thread.
|
||||||
*/
|
*/
|
||||||
virtual void process_data(std::string_view data,
|
virtual void on_data_arrived(std::string_view data,
|
||||||
std::unique_ptr<Connection> &conn_ptr) = 0;
|
std::unique_ptr<Connection> &conn_ptr) = 0;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Successfully wrote data on the connection.
|
||||||
|
* @note May be called from an arbitrary network thread.
|
||||||
|
*/
|
||||||
|
virtual void on_write_progress(std::unique_ptr<Connection> &) {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called when a new connection is established.
|
* Called when a new connection is established.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ void signal_handler(int sig) {
|
|||||||
*/
|
*/
|
||||||
class EchoHandler : public ConnectionHandler {
|
class EchoHandler : public ConnectionHandler {
|
||||||
public:
|
public:
|
||||||
void process_data(std::string_view data,
|
void on_data_arrived(std::string_view data,
|
||||||
std::unique_ptr<Connection> &conn_ptr) override {
|
std::unique_ptr<Connection> &conn_ptr) override {
|
||||||
// Echo the received data back to the client
|
// Echo the received data back to the client
|
||||||
conn_ptr->appendMessage(data);
|
conn_ptr->appendMessage(data);
|
||||||
|
|||||||
@@ -270,7 +270,7 @@ void Server::start_network_threads() {
|
|||||||
|
|
||||||
// Call handler with unique_ptr - handler can take ownership if
|
// Call handler with unique_ptr - handler can take ownership if
|
||||||
// needed
|
// needed
|
||||||
handler_.process_data(data, conn);
|
handler_.on_data_arrived(data, conn);
|
||||||
|
|
||||||
// If handler took ownership (conn is now null), don't continue
|
// If handler took ownership (conn is now null), don't continue
|
||||||
// processing
|
// processing
|
||||||
@@ -280,8 +280,22 @@ void Server::start_network_threads() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (events[i].events & EPOLLOUT) {
|
if (events[i].events & EPOLLOUT) {
|
||||||
bool done = conn->writeBytes();
|
bool error = conn->writeBytes();
|
||||||
if (done) {
|
if (error) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call handler with unique_ptr - handler can take ownership if
|
||||||
|
// needed
|
||||||
|
handler_.on_write_progress(conn);
|
||||||
|
// If handler took ownership (conn is now null), don't continue
|
||||||
|
// processing
|
||||||
|
if (!conn) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we should close the connection according to application
|
||||||
|
if (!conn->hasMessages() && conn->closeConnection_) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user