Reset connection state after finishing with it in http_handler

This commit is contained in:
2025-09-14 21:16:41 -04:00
parent 632113f792
commit 1f61f91bf5
7 changed files with 71 additions and 47 deletions

View File

@@ -163,7 +163,15 @@ public:
/** /**
* @brief Move constructor - transfers ownership of all blocks. * @brief Move constructor - transfers ownership of all blocks.
* @param other The Arena to move from (will be left empty) *
* @param other The Arena to move from (will be left in a valid, empty state)
*
* @note Post-move state: The moved-from Arena is left in a valid state
* equivalent to a newly constructed Arena. All operations remain safe:
* - allocate_raw(), allocate(), construct() work normally
* - reset() is safe and well-defined (no-op on empty arena)
* - used_bytes(), total_bytes() return 0
* - Destructor is safe to call
*/ */
Arena(Arena &&other) noexcept; Arena(Arena &&other) noexcept;
@@ -173,8 +181,15 @@ public:
* Frees any existing blocks in this allocator before taking ownership * Frees any existing blocks in this allocator before taking ownership
* of blocks from the other allocator. * of blocks from the other allocator.
* *
* @param other The Arena to move from (will be left empty) * @param other The Arena to move from (will be left in a valid, empty state)
* @return Reference to this allocator * @return Reference to this allocator
*
* @note Post-move state: The moved-from Arena is left in a valid state
* equivalent to a newly constructed Arena. All operations remain safe:
* - allocate_raw(), allocate(), construct() work normally
* - reset() is safe and well-defined (no-op on empty arena)
* - used_bytes(), total_bytes() return 0
* - Destructor is safe to call
*/ */
Arena &operator=(Arena &&other) noexcept; Arena &operator=(Arena &&other) noexcept;

View File

@@ -42,8 +42,8 @@ static thread_local std::vector<Arena> g_arenas_to_free;
Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id,
size_t epoll_index, ConnectionHandler *handler, size_t epoll_index, ConnectionHandler *handler,
WeakRef<Server> server) WeakRef<Server> server)
: fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), : id_(id), epoll_index_(epoll_index), addr_(addr), handler_(handler),
handler_(handler), server_(std::move(server)) { server_(std::move(server)), fd_(fd) {
auto server_ref = server_.lock(); auto server_ref = server_.lock();
// Should only be called from the io thread // Should only be called from the io thread
assert(server_ref); assert(server_ref);
@@ -256,7 +256,6 @@ uint32_t Connection::write_bytes() {
{ {
std::lock_guard lock(mutex_); std::lock_guard lock(mutex_);
if (message_queue_.empty()) { if (message_queue_.empty()) {
result |= Drained;
auto server = server_.lock(); auto server = server_.lock();
if (server) { if (server) {
struct epoll_event event; struct epoll_event event;

View File

@@ -321,8 +321,7 @@ private:
enum WriteBytesResult { enum WriteBytesResult {
Error = 1 << 0, Error = 1 << 0,
Progress = 1 << 1, Progress = 1 << 1,
Drained = 1 << 2, Close = 1 << 2,
Close = 1 << 3,
}; };
uint32_t write_bytes(); uint32_t write_bytes();

View File

@@ -55,21 +55,6 @@ public:
*/ */
virtual void on_write_progress(Connection &) {} virtual void on_write_progress(Connection &) {}
/**
* Called when the connection's outgoing write buffer becomes empty.
*
* This indicates all queued messages have been successfully written
* to the socket. Useful for:
* - Implementing keep-alive connection reuse
* - Closing connections after final response
* - Relieving backpressure conditions
*
* @param conn Connection with empty write buffer - server retains ownership
* @note Called from this connection's io thread.
* @note Only called on transitions from non-empty → empty buffer
*/
virtual void on_write_buffer_drained(Connection &) {}
/** /**
* Called when a new connection is established. * Called when a new connection is established.
* *
@@ -96,9 +81,9 @@ public:
/** /**
* @brief Called after a batch of connections has been processed. * @brief Called after a batch of connections has been processed.
* *
* This hook is called after on_data_arrived, on_write_progress, or * This hook is called after on_data_arrived or on_write_progress has been
* on_write_buffer_drained has been called for each connection in the batch. * called for each connection in the batch. All connections remain
* All connections remain server-owned. * server-owned.
* *
* @param batch A span of connection references in the batch. * @param batch A span of connection references in the batch.
* @note Called from this connection's io thread. * @note Called from this connection's io thread.

View File

@@ -56,6 +56,38 @@ HttpConnectionState::HttpConnectionState()
parser.data = this; parser.data = this;
} }
void HttpConnectionState::reset() {
TRACE_EVENT("http", "reply", perfetto::Flow::Global(http_request_id));
// Reset request-specific state for next HTTP request
method = {};
url = {};
headers_complete = false;
message_complete = false;
connection_close = false;
route = HttpRoute::NotFound;
status_request_id = {};
// Reset header buffers - need to recreate with arena allocator
current_header_field_buf = ArenaString(ArenaStlAllocator<char>(&arena));
current_header_value_buf = ArenaString(ArenaStlAllocator<char>(&arena));
header_field_complete = false;
http_request_id = 0;
// Reset commit parsing state - safe to reset Arena::Ptr objects here
// since request processing is complete
commit_parser.reset();
commit_request.reset();
parsing_commit = false;
basic_validation_passed = false;
// Reset arena memory for next request to prevent memory growth
arena.reset();
// Reset llhttp parser for next request
llhttp_init(&parser, HTTP_REQUEST, &settings);
parser.data = this;
}
// HttpHandler implementation // HttpHandler implementation
void HttpHandler::on_connection_established(Connection &conn) { void HttpHandler::on_connection_established(Connection &conn) {
// Allocate HTTP state using server-provided arena for connection lifecycle // Allocate HTTP state using server-provided arena for connection lifecycle
@@ -69,22 +101,6 @@ void HttpHandler::on_connection_closed(Connection &conn) {
conn.user_data = nullptr; conn.user_data = nullptr;
} }
// TODO there might be an issue if we get pipelined requests here
void HttpHandler::on_write_buffer_drained(Connection &conn) {
// Reset state after entire reply messages have been written for the next
// request
auto *state = static_cast<HttpConnectionState *>(conn.user_data);
if (state) {
TRACE_EVENT("http", "reply",
perfetto::Flow::Global(state->http_request_id));
}
// TODO consider replacing with HttpConnectionState->reset()
on_connection_closed(conn);
// Note: Connection reset happens at server level, not connection level
on_connection_established(conn);
}
void HttpHandler::on_batch_complete(std::span<Connection *const> batch) { void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
// Collect commit, status, and health check requests for pipeline processing // Collect commit, status, and health check requests for pipeline processing
int pipeline_count = 0; int pipeline_count = 0;
@@ -128,6 +144,7 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
CommitEntry{conn->get_weak_ref(), state->http_request_id, CommitEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->commit_request.get(), state->connection_close, state->commit_request.get(),
std::move(state->arena)}; std::move(state->arena)};
state->reset();
} }
// Create StatusEntry for status requests // Create StatusEntry for status requests
else if (state->route == HttpRoute::GetStatus) { else if (state->route == HttpRoute::GetStatus) {
@@ -135,12 +152,14 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
StatusEntry{conn->get_weak_ref(), state->http_request_id, StatusEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->status_request_id, state->connection_close, state->status_request_id,
std::move(state->arena)}; std::move(state->arena)};
state->reset();
} }
// Create HealthCheckEntry for health check requests // Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) { else if (state->route == HttpRoute::GetOk) {
*out_iter++ = *out_iter++ =
HealthCheckEntry{conn->get_weak_ref(), state->http_request_id, HealthCheckEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, std::move(state->arena)}; state->connection_close, std::move(state->arena)};
state->reset();
} }
} }
} }
@@ -165,6 +184,7 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
if (err != HPE_OK) { if (err != HPE_OK) {
send_error_response(conn, 400, "Bad request", std::move(state->arena), 0, send_error_response(conn, 400, "Bad request", std::move(state->arena), 0,
true); true);
state->reset();
return; return;
} }
@@ -183,6 +203,7 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) {
// Handle malformed URL encoding // Handle malformed URL encoding
send_error_response(conn, 400, "Malformed URL encoding", send_error_response(conn, 400, "Malformed URL encoding",
std::move(state->arena), 0, true); std::move(state->arena), 0, true);
state->reset();
return; return;
} }
@@ -234,6 +255,7 @@ void HttpHandler::handle_get_version(Connection &conn,
format(state.arena, R"({"version":%ld,"leader":""})", format(state.arena, R"({"version":%ld,"leader":""})",
this->committed_version.load(std::memory_order_seq_cst)), this->committed_version.load(std::memory_order_seq_cst)),
std::move(state.arena), state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
} }
void HttpHandler::handle_post_commit(Connection &conn, void HttpHandler::handle_post_commit(Connection &conn,
@@ -243,6 +265,7 @@ void HttpHandler::handle_post_commit(Connection &conn,
if (!state.commit_request || !state.parsing_commit) { if (!state.commit_request || !state.parsing_commit) {
send_error_response(conn, 400, "Parse failed", std::move(state.arena), send_error_response(conn, 400, "Parse failed", std::move(state.arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
state.reset();
return; return;
} }
@@ -284,6 +307,7 @@ void HttpHandler::handle_post_commit(Connection &conn,
if (!valid) { if (!valid) {
send_error_response(conn, 400, error_msg, std::move(state.arena), send_error_response(conn, 400, error_msg, std::move(state.arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
state.reset();
return; return;
} }
@@ -299,6 +323,7 @@ void HttpHandler::handle_get_subscribe(Connection &conn,
conn, 200, conn, 200,
R"({"message":"Subscription endpoint - streaming not yet implemented"})", R"({"message":"Subscription endpoint - streaming not yet implemented"})",
std::move(state.arena), state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
} }
void HttpHandler::handle_get_status(Connection &conn, void HttpHandler::handle_get_status(Connection &conn,
@@ -316,6 +341,7 @@ void HttpHandler::handle_get_status(Connection &conn,
send_error_response( send_error_response(
conn, 400, "Missing required query parameter: request_id", conn, 400, "Missing required query parameter: request_id",
std::move(state.arena), state.http_request_id, state.connection_close); std::move(state.arena), state.http_request_id, state.connection_close);
state.reset();
return; return;
} }
@@ -323,6 +349,7 @@ void HttpHandler::handle_get_status(Connection &conn,
send_error_response(conn, 400, "Empty request_id parameter", send_error_response(conn, 400, "Empty request_id parameter",
std::move(state.arena), state.http_request_id, std::move(state.arena), state.http_request_id,
state.connection_close); state.connection_close);
state.reset();
return; return;
} }
@@ -339,6 +366,7 @@ void HttpHandler::handle_put_retention(Connection &conn,
send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})",
std::move(state.arena), state.http_request_id, std::move(state.arena), state.http_request_id,
state.connection_close); state.connection_close);
state.reset();
} }
void HttpHandler::handle_get_retention(Connection &conn, void HttpHandler::handle_get_retention(Connection &conn,
@@ -347,6 +375,7 @@ void HttpHandler::handle_get_retention(Connection &conn,
// TODO: Extract policy_id from URL or return all policies // TODO: Extract policy_id from URL or return all policies
send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena),
state.http_request_id, state.connection_close); state.http_request_id, state.connection_close);
state.reset();
} }
void HttpHandler::handle_delete_retention(Connection &conn, void HttpHandler::handle_delete_retention(Connection &conn,
@@ -356,6 +385,7 @@ void HttpHandler::handle_delete_retention(Connection &conn,
send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})",
std::move(state.arena), state.http_request_id, std::move(state.arena), state.http_request_id,
state.connection_close); state.connection_close);
state.reset();
} }
void HttpHandler::handle_get_metrics(Connection &conn, void HttpHandler::handle_get_metrics(Connection &conn,
@@ -397,6 +427,7 @@ void HttpHandler::handle_get_metrics(Connection &conn,
*out++ = sv; *out++ = sv;
} }
conn.append_message(result, std::move(state.arena)); conn.append_message(result, std::move(state.arena));
state.reset();
} }
void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) { void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) {

View File

@@ -60,6 +60,7 @@ struct HttpConnectionState {
false; // Set to true if basic validation passes false; // Set to true if basic validation passes
HttpConnectionState(); HttpConnectionState();
void reset(); // Reset state for next HTTP request (keeps arena)
}; };
/** /**
@@ -133,7 +134,6 @@ struct HttpHandler : ConnectionHandler {
void on_connection_closed(Connection &conn) override; void on_connection_closed(Connection &conn) override;
void on_data_arrived(std::string_view data, Connection &conn) override; void on_data_arrived(std::string_view data, Connection &conn) override;
void on_batch_complete(std::span<Connection *const> batch) override; void on_batch_complete(std::span<Connection *const> batch) override;
void on_write_buffer_drained(Connection &conn) override;
// llhttp callbacks (public for HttpConnectionState access) // llhttp callbacks (public for HttpConnectionState access)
static int onUrl(llhttp_t *parser, const char *at, size_t length); static int onUrl(llhttp_t *parser, const char *at, size_t length);

View File

@@ -446,11 +446,6 @@ void Server::process_connection_writes(Ref<Connection> &conn, int /*events*/) {
handler_.on_write_progress(*conn); handler_.on_write_progress(*conn);
} }
if (result & Connection::WriteBytesResult::Drained) {
// Call handler with connection reference - server retains ownership
handler_.on_write_buffer_drained(*conn);
}
// Check if we should close the connection according to application // Check if we should close the connection according to application
if (result & Connection::WriteBytesResult::Close) { if (result & Connection::WriteBytesResult::Close) {
close_connection(conn); close_connection(conn);