Compare commits
3 Commits
772797155b
...
e56cf41a01
| Author | SHA1 | Date | |
|---|---|---|---|
| e56cf41a01 | |||
| c0242317ed | |||
| 6f1806d0b8 |
@@ -395,7 +395,7 @@ public:
|
|||||||
proto_data->process(data);
|
proto_data->process(data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_post_batch(std::span<std::unique_ptr<Connection>> batch) override {
|
void on_batch_complete(std::span<std::unique_ptr<Connection>> batch) override {
|
||||||
// Process a batch of connections
|
// Process a batch of connections
|
||||||
for (auto& conn_ptr : batch) {
|
for (auto& conn_ptr : batch) {
|
||||||
if (conn_ptr) {
|
if (conn_ptr) {
|
||||||
|
|||||||
@@ -18,7 +18,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* @brief Status returned by streaming parse operations.
|
* @brief Status returned by streaming parse operations.
|
||||||
*/
|
*/
|
||||||
enum class ParseStatus {
|
enum class [[nodiscard]] ParseStatus {
|
||||||
Incomplete, ///< Parser needs more data to complete parsing
|
Incomplete, ///< Parser needs more data to complete parsing
|
||||||
Complete, ///< Successfully parsed a complete commit request
|
Complete, ///< Successfully parsed a complete commit request
|
||||||
Error ///< Parse error occurred (check get_parse_error() for details)
|
Error ///< Parse error occurred (check get_parse_error() for details)
|
||||||
@@ -27,7 +27,7 @@ public:
|
|||||||
/**
|
/**
|
||||||
* @brief Result type for one-shot parsing operations.
|
* @brief Result type for one-shot parsing operations.
|
||||||
*/
|
*/
|
||||||
enum class ParseResult {
|
enum class [[nodiscard]] ParseResult {
|
||||||
Success, ///< Parsing completed successfully
|
Success, ///< Parsing completed successfully
|
||||||
InvalidJson, ///< Invalid JSON format or structure
|
InvalidJson, ///< Invalid JSON format or structure
|
||||||
MissingField, ///< Required field missing from input
|
MissingField, ///< Required field missing from input
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ void Connection::appendMessage(std::string_view s, bool copy_to_arena) {
|
|||||||
} else {
|
} else {
|
||||||
messages_.push_back(s);
|
messages_.push_back(s);
|
||||||
}
|
}
|
||||||
|
outgoing_bytes_queued_ += s.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
int Connection::readBytes(char *buf, size_t buffer_size) {
|
int Connection::readBytes(char *buf, size_t buffer_size) {
|
||||||
@@ -104,6 +105,7 @@ bool Connection::writeBytes() {
|
|||||||
|
|
||||||
// Handle partial writes by updating string_view data/size
|
// Handle partial writes by updating string_view data/size
|
||||||
size_t bytes_written = static_cast<size_t>(w);
|
size_t bytes_written = static_cast<size_t>(w);
|
||||||
|
outgoing_bytes_queued_ -= bytes_written;
|
||||||
while (bytes_written > 0 && !messages_.empty()) {
|
while (bytes_written > 0 && !messages_.empty()) {
|
||||||
auto &front = messages_.front();
|
auto &front = messages_.front();
|
||||||
|
|
||||||
|
|||||||
@@ -178,7 +178,7 @@ struct Connection {
|
|||||||
/**
|
/**
|
||||||
* @brief Get the number of bytes queued for transmission.
|
* @brief Get the number of bytes queued for transmission.
|
||||||
*
|
*
|
||||||
* Calculates and returns the total number of bytes in all messages currently
|
* Returns the total number of bytes in all messages currently
|
||||||
* queued for transmission to the client. This includes all data added via
|
* queued for transmission to the client. This includes all data added via
|
||||||
* appendMessage() that has not yet been sent over the network.
|
* appendMessage() that has not yet been sent over the network.
|
||||||
*
|
*
|
||||||
@@ -187,8 +187,8 @@ struct Connection {
|
|||||||
* @warning Thread Safety: Only call from the thread that currently owns this
|
* @warning Thread Safety: Only call from the thread that currently owns this
|
||||||
* connection. Concurrent access to the message queue is not thread-safe.
|
* connection. Concurrent access to the message queue is not thread-safe.
|
||||||
*
|
*
|
||||||
* @note Performance: This method iterates through all queued messages to
|
* @note Performance: This method uses an O(1) counter for fast retrieval
|
||||||
* calculate the total, so avoid calling it frequently in hot paths.
|
* in release builds. In debug builds, validates counter accuracy.
|
||||||
*
|
*
|
||||||
* @note The count decreases as the server sends data via writeBytes() and
|
* @note The count decreases as the server sends data via writeBytes() and
|
||||||
* removes completed messages from the queue.
|
* removes completed messages from the queue.
|
||||||
@@ -210,11 +210,17 @@ struct Connection {
|
|||||||
* ```
|
* ```
|
||||||
*/
|
*/
|
||||||
int64_t outgoingBytesQueued() const {
|
int64_t outgoingBytesQueued() const {
|
||||||
int64_t result = 0;
|
#ifndef NDEBUG
|
||||||
|
// Debug build: validate counter accuracy
|
||||||
|
int64_t computed_total = 0;
|
||||||
for (auto s : messages_) {
|
for (auto s : messages_) {
|
||||||
result += s.size();
|
computed_total += s.size();
|
||||||
}
|
}
|
||||||
return result;
|
assert(
|
||||||
|
outgoing_bytes_queued_ == computed_total &&
|
||||||
|
"outgoing_bytes_queued_ counter is out of sync with actual queue size");
|
||||||
|
#endif
|
||||||
|
return outgoing_bytes_queued_;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -282,6 +288,7 @@ struct Connection {
|
|||||||
*/
|
*/
|
||||||
void reset() {
|
void reset() {
|
||||||
assert(messages_.empty());
|
assert(messages_.empty());
|
||||||
|
outgoing_bytes_queued_ = 0;
|
||||||
arena_.reset();
|
arena_.reset();
|
||||||
messages_ =
|
messages_ =
|
||||||
std::deque<std::string_view, ArenaStlAllocator<std::string_view>>{
|
std::deque<std::string_view, ArenaStlAllocator<std::string_view>>{
|
||||||
@@ -345,6 +352,9 @@ private:
|
|||||||
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
|
std::deque<std::string_view, ArenaStlAllocator<std::string_view>> messages_{
|
||||||
ArenaStlAllocator<std::string_view>{&arena_}};
|
ArenaStlAllocator<std::string_view>{&arena_}};
|
||||||
|
|
||||||
|
// Counter tracking total bytes queued for transmission
|
||||||
|
int64_t outgoing_bytes_queued_{0};
|
||||||
|
|
||||||
// Whether or not to close the connection after completing writing the
|
// Whether or not to close the connection after completing writing the
|
||||||
// response
|
// response
|
||||||
bool closeConnection_{false};
|
bool closeConnection_{false};
|
||||||
|
|||||||
@@ -107,6 +107,6 @@ public:
|
|||||||
*
|
*
|
||||||
* @param batch A span of unique_ptrs to the connections in the batch.
|
* @param batch A span of unique_ptrs to the connections in the batch.
|
||||||
*/
|
*/
|
||||||
virtual void on_post_batch(std::span<std::unique_ptr<Connection>> /*batch*/) {
|
virtual void
|
||||||
}
|
on_batch_complete(std::span<std::unique_ptr<Connection>> /*batch*/) {}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -50,7 +50,8 @@ void HttpHandler::on_write_buffer_drained(
|
|||||||
on_connection_established(*conn_ptr);
|
on_connection_established(*conn_ptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void HttpHandler::on_post_batch(std::span<std::unique_ptr<Connection>> batch) {
|
void HttpHandler::on_batch_complete(
|
||||||
|
std::span<std::unique_ptr<Connection>> batch) {
|
||||||
int readyCount = 0;
|
int readyCount = 0;
|
||||||
for (int i = 0; i < int(batch.size()); ++i) {
|
for (int i = 0; i < int(batch.size()); ++i) {
|
||||||
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
|
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
|
||||||
|
|||||||
@@ -103,7 +103,8 @@ struct HttpHandler : ConnectionHandler {
|
|||||||
void on_data_arrived(std::string_view data,
|
void on_data_arrived(std::string_view data,
|
||||||
std::unique_ptr<Connection> &conn_ptr) override;
|
std::unique_ptr<Connection> &conn_ptr) override;
|
||||||
void on_write_buffer_drained(std::unique_ptr<Connection> &conn_ptr) override;
|
void on_write_buffer_drained(std::unique_ptr<Connection> &conn_ptr) override;
|
||||||
void on_post_batch(std::span<std::unique_ptr<Connection>> /*batch*/) override;
|
void on_batch_complete(
|
||||||
|
std::span<std::unique_ptr<Connection>> /*batch*/) override;
|
||||||
|
|
||||||
// Route parsing (public for testing)
|
// Route parsing (public for testing)
|
||||||
static HttpRoute parseRoute(std::string_view method, std::string_view url);
|
static HttpRoute parseRoute(std::string_view method, std::string_view url);
|
||||||
|
|||||||
@@ -535,8 +535,8 @@ void Server::process_connection_batch(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Call post-batch handler - handlers can take ownership here
|
// Call batch complete handler - handlers can take ownership here
|
||||||
handler_.on_post_batch(batch);
|
handler_.on_batch_complete(batch);
|
||||||
|
|
||||||
// Transfer all remaining connections back to epoll
|
// Transfer all remaining connections back to epoll
|
||||||
for (auto &conn_ptr : batch) {
|
for (auto &conn_ptr : batch) {
|
||||||
|
|||||||
3
style.md
3
style.md
@@ -344,9 +344,10 @@ arena_allocator.reset(); // Reset arena memory
|
|||||||
- **Error messages are human-readable only** - never parse message strings
|
- **Error messages are human-readable only** - never parse message strings
|
||||||
- **Consistent error boundaries** - each component defines what it can/cannot recover from
|
- **Consistent error boundaries** - each component defines what it can/cannot recover from
|
||||||
- **Interface precondition violations are undefined behavior** - acceptable to skip checks for performance in hot paths
|
- **Interface precondition violations are undefined behavior** - acceptable to skip checks for performance in hot paths
|
||||||
|
- **Error code types must be nodiscard** - mark error code enums with `[[nodiscard]]` to prevent silent failures
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
enum class ParseResult { Success, InvalidJson, MissingField };
|
enum class [[nodiscard]] ParseResult { Success, InvalidJson, MissingField };
|
||||||
|
|
||||||
// System failure - abort immediately
|
// System failure - abort immediately
|
||||||
void* memory = std::malloc(size);
|
void* memory = std::malloc(size);
|
||||||
|
|||||||
Reference in New Issue
Block a user