diff --git a/src/arena.hpp b/src/arena.hpp index 61f7c33..f8d4298 100644 --- a/src/arena.hpp +++ b/src/arena.hpp @@ -163,7 +163,15 @@ public: /** * @brief Move constructor - transfers ownership of all blocks. - * @param other The Arena to move from (will be left empty) + * + * @param other The Arena to move from (will be left in a valid, empty state) + * + * @note Post-move state: The moved-from Arena is left in a valid state + * equivalent to a newly constructed Arena. All operations remain safe: + * - allocate_raw(), allocate(), construct() work normally + * - reset() is safe and well-defined (no-op on empty arena) + * - used_bytes(), total_bytes() return 0 + * - Destructor is safe to call */ Arena(Arena &&other) noexcept; @@ -173,8 +181,15 @@ public: * Frees any existing blocks in this allocator before taking ownership * of blocks from the other allocator. * - * @param other The Arena to move from (will be left empty) + * @param other The Arena to move from (will be left in a valid, empty state) * @return Reference to this allocator + * + * @note Post-move state: The moved-from Arena is left in a valid state + * equivalent to a newly constructed Arena. All operations remain safe: + * - allocate_raw(), allocate(), construct() work normally + * - reset() is safe and well-defined (no-op on empty arena) + * - used_bytes(), total_bytes() return 0 + * - Destructor is safe to call */ Arena &operator=(Arena &&other) noexcept; diff --git a/src/connection.cpp b/src/connection.cpp index a988503..22e24d4 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -42,8 +42,8 @@ static thread_local std::vector g_arenas_to_free; Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, size_t epoll_index, ConnectionHandler *handler, WeakRef server) - : fd_(fd), id_(id), epoll_index_(epoll_index), addr_(addr), - handler_(handler), server_(std::move(server)) { + : id_(id), epoll_index_(epoll_index), addr_(addr), handler_(handler), + server_(std::move(server)), fd_(fd) { auto server_ref = server_.lock(); // Should only be called from the io thread assert(server_ref); @@ -256,7 +256,6 @@ uint32_t Connection::write_bytes() { { std::lock_guard lock(mutex_); if (message_queue_.empty()) { - result |= Drained; auto server = server_.lock(); if (server) { struct epoll_event event; diff --git a/src/connection.hpp b/src/connection.hpp index 56b5bf0..ae96c84 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -321,8 +321,7 @@ private: enum WriteBytesResult { Error = 1 << 0, Progress = 1 << 1, - Drained = 1 << 2, - Close = 1 << 3, + Close = 1 << 2, }; uint32_t write_bytes(); diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 4020e34..3306f2a 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -55,21 +55,6 @@ public: */ virtual void on_write_progress(Connection &) {} - /** - * Called when the connection's outgoing write buffer becomes empty. - * - * This indicates all queued messages have been successfully written - * to the socket. Useful for: - * - Implementing keep-alive connection reuse - * - Closing connections after final response - * - Relieving backpressure conditions - * - * @param conn Connection with empty write buffer - server retains ownership - * @note Called from this connection's io thread. - * @note Only called on transitions from non-empty → empty buffer - */ - virtual void on_write_buffer_drained(Connection &) {} - /** * Called when a new connection is established. * @@ -96,9 +81,9 @@ public: /** * @brief Called after a batch of connections has been processed. * - * This hook is called after on_data_arrived, on_write_progress, or - * on_write_buffer_drained has been called for each connection in the batch. - * All connections remain server-owned. + * This hook is called after on_data_arrived or on_write_progress has been + * called for each connection in the batch. All connections remain + * server-owned. * * @param batch A span of connection references in the batch. * @note Called from this connection's io thread. diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 1e63cdd..ca10c09 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -56,6 +56,38 @@ HttpConnectionState::HttpConnectionState() parser.data = this; } +void HttpConnectionState::reset() { + TRACE_EVENT("http", "reply", perfetto::Flow::Global(http_request_id)); + // Reset request-specific state for next HTTP request + method = {}; + url = {}; + headers_complete = false; + message_complete = false; + connection_close = false; + route = HttpRoute::NotFound; + status_request_id = {}; + + // Reset header buffers - need to recreate with arena allocator + current_header_field_buf = ArenaString(ArenaStlAllocator(&arena)); + current_header_value_buf = ArenaString(ArenaStlAllocator(&arena)); + header_field_complete = false; + http_request_id = 0; + + // Reset commit parsing state - safe to reset Arena::Ptr objects here + // since request processing is complete + commit_parser.reset(); + commit_request.reset(); + parsing_commit = false; + basic_validation_passed = false; + + // Reset arena memory for next request to prevent memory growth + arena.reset(); + + // Reset llhttp parser for next request + llhttp_init(&parser, HTTP_REQUEST, &settings); + parser.data = this; +} + // HttpHandler implementation void HttpHandler::on_connection_established(Connection &conn) { // Allocate HTTP state using server-provided arena for connection lifecycle @@ -69,22 +101,6 @@ void HttpHandler::on_connection_closed(Connection &conn) { conn.user_data = nullptr; } -// TODO there might be an issue if we get pipelined requests here - -void HttpHandler::on_write_buffer_drained(Connection &conn) { - // Reset state after entire reply messages have been written for the next - // request - auto *state = static_cast(conn.user_data); - if (state) { - TRACE_EVENT("http", "reply", - perfetto::Flow::Global(state->http_request_id)); - } - // TODO consider replacing with HttpConnectionState->reset() - on_connection_closed(conn); - // Note: Connection reset happens at server level, not connection level - on_connection_established(conn); -} - void HttpHandler::on_batch_complete(std::span batch) { // Collect commit, status, and health check requests for pipeline processing int pipeline_count = 0; @@ -128,6 +144,7 @@ void HttpHandler::on_batch_complete(std::span batch) { CommitEntry{conn->get_weak_ref(), state->http_request_id, state->connection_close, state->commit_request.get(), std::move(state->arena)}; + state->reset(); } // Create StatusEntry for status requests else if (state->route == HttpRoute::GetStatus) { @@ -135,12 +152,14 @@ void HttpHandler::on_batch_complete(std::span batch) { StatusEntry{conn->get_weak_ref(), state->http_request_id, state->connection_close, state->status_request_id, std::move(state->arena)}; + state->reset(); } // Create HealthCheckEntry for health check requests else if (state->route == HttpRoute::GetOk) { *out_iter++ = HealthCheckEntry{conn->get_weak_ref(), state->http_request_id, state->connection_close, std::move(state->arena)}; + state->reset(); } } } @@ -165,6 +184,7 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { if (err != HPE_OK) { send_error_response(conn, 400, "Bad request", std::move(state->arena), 0, true); + state->reset(); return; } @@ -183,6 +203,7 @@ void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { // Handle malformed URL encoding send_error_response(conn, 400, "Malformed URL encoding", std::move(state->arena), 0, true); + state->reset(); return; } @@ -234,6 +255,7 @@ void HttpHandler::handle_get_version(Connection &conn, format(state.arena, R"({"version":%ld,"leader":""})", this->committed_version.load(std::memory_order_seq_cst)), std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); } void HttpHandler::handle_post_commit(Connection &conn, @@ -243,6 +265,7 @@ void HttpHandler::handle_post_commit(Connection &conn, if (!state.commit_request || !state.parsing_commit) { send_error_response(conn, 400, "Parse failed", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); return; } @@ -284,6 +307,7 @@ void HttpHandler::handle_post_commit(Connection &conn, if (!valid) { send_error_response(conn, 400, error_msg, std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); return; } @@ -299,6 +323,7 @@ void HttpHandler::handle_get_subscribe(Connection &conn, conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); } void HttpHandler::handle_get_status(Connection &conn, @@ -316,6 +341,7 @@ void HttpHandler::handle_get_status(Connection &conn, send_error_response( conn, 400, "Missing required query parameter: request_id", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); return; } @@ -323,6 +349,7 @@ void HttpHandler::handle_get_status(Connection &conn, send_error_response(conn, 400, "Empty request_id parameter", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); return; } @@ -339,6 +366,7 @@ void HttpHandler::handle_put_retention(Connection &conn, send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); } void HttpHandler::handle_get_retention(Connection &conn, @@ -347,6 +375,7 @@ void HttpHandler::handle_get_retention(Connection &conn, // TODO: Extract policy_id from URL or return all policies send_json_response(conn, 200, R"({"policies":[]})", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); } void HttpHandler::handle_delete_retention(Connection &conn, @@ -356,6 +385,7 @@ void HttpHandler::handle_delete_retention(Connection &conn, send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", std::move(state.arena), state.http_request_id, state.connection_close); + state.reset(); } void HttpHandler::handle_get_metrics(Connection &conn, @@ -397,6 +427,7 @@ void HttpHandler::handle_get_metrics(Connection &conn, *out++ = sv; } conn.append_message(result, std::move(state.arena)); + state.reset(); } void HttpHandler::handle_get_ok(Connection &, HttpConnectionState &) { diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 34b57f6..0b38b88 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -60,6 +60,7 @@ struct HttpConnectionState { false; // Set to true if basic validation passes HttpConnectionState(); + void reset(); // Reset state for next HTTP request (keeps arena) }; /** @@ -133,7 +134,6 @@ struct HttpHandler : ConnectionHandler { void on_connection_closed(Connection &conn) override; void on_data_arrived(std::string_view data, Connection &conn) override; void on_batch_complete(std::span batch) override; - void on_write_buffer_drained(Connection &conn) override; // llhttp callbacks (public for HttpConnectionState access) static int onUrl(llhttp_t *parser, const char *at, size_t length); diff --git a/src/server.cpp b/src/server.cpp index 233da48..76fcd96 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -446,11 +446,6 @@ void Server::process_connection_writes(Ref &conn, int /*events*/) { handler_.on_write_progress(*conn); } - if (result & Connection::WriteBytesResult::Drained) { - // Call handler with connection reference - server retains ownership - handler_.on_write_buffer_drained(*conn); - } - // Check if we should close the connection according to application if (result & Connection::WriteBytesResult::Close) { close_connection(conn);