diff --git a/CMakeLists.txt b/CMakeLists.txt index c611bf7..f0c0211 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -250,9 +250,6 @@ target_link_libraries(load_tester Threads::Threads llhttp_static perfetto) add_test(NAME arena_tests COMMAND test_arena) add_test(NAME commit_request_tests COMMAND test_commit_request) -add_test(NAME http_handler_tests COMMAND test_http_handler) -add_test(NAME server_connection_return_tests - COMMAND test_server_connection_return) add_test(NAME arena_benchmarks COMMAND bench_arena) add_test(NAME commit_request_benchmarks COMMAND bench_commit_request) add_test(NAME parser_comparison_benchmarks COMMAND bench_parser_comparison) diff --git a/src/arena.hpp b/src/arena.hpp index 87ce709..5acbf53 100644 --- a/src/arena.hpp +++ b/src/arena.hpp @@ -406,6 +406,7 @@ public: * ## Note: * This method only allocates memory - it does not construct objects. * Use placement new or other initialization methods as needed. + * TODO should this return a std::span ? */ template T *allocate(uint32_t size) { static_assert( diff --git a/src/connection.cpp b/src/connection.cpp index d036822..f1d49ac 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -59,25 +59,29 @@ Connection::Connection(struct sockaddr_storage addr, int fd, int64_t id, } Connection::~Connection() { - if (handler_) { - handler_->on_connection_closed(*this); - } - // Server may legitimately be gone now - if (auto server_ptr = server_.lock()) { - server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); - } + handler_->on_connection_closed(*this); + assert(fd_ < 0 && "Connection fd was not closed before ~Connection"); +} - // Decrement active connections gauge - connections_active.dec(); - - int e = close(fd_); +void Connection::close() { + std::lock_guard lock{mutex_}; + auto server_ptr = server_.lock(); + // Should only be called from the io thread + assert(server_ptr); + server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed); + assert(fd_ >= 0); + int e = ::close(fd_); if (e == -1 && errno != EINTR) { perror("close"); std::abort(); } // EINTR ignored - fd is guaranteed closed on Linux + fd_ = -1; + // Decrement active connections gauge + connections_active.dec(); } +// May be called off the io thread! void Connection::append_message(std::span data_parts, Arena arena, bool close_after_send) { // Calculate total bytes for this message. Don't need to hold the lock yet. @@ -86,11 +90,7 @@ void Connection::append_message(std::span data_parts, total_bytes += part.size(); } - std::unique_lock lock(mutex_); - - if (is_closed_) { - return; // Connection is closed, ignore message - } + std::unique_lock lock(mutex_); // Check if queue was empty to determine if we need to enable EPOLLOUT bool was_empty = message_queue_.empty(); @@ -100,22 +100,15 @@ void Connection::append_message(std::span data_parts, Message{std::move(arena), data_parts, close_after_send}); outgoing_bytes_queued_ += total_bytes; - // If this message has close_after_send flag, set connection flag - if (close_after_send) { - close_after_send_ = true; - } - - lock.unlock(); - - // If queue was empty, we need to add EPOLLOUT interest. We don't need to hold - // the lock - if (was_empty) { + // If queue was empty, we need to add EPOLLOUT interest. + if (was_empty && fd_ >= 0) { auto server = server_.lock(); if (server) { // Add EPOLLOUT interest - pipeline thread manages epoll struct epoll_event event; event.data.fd = fd_; event.events = EPOLLIN | EPOLLOUT; + tsan_release(); epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } } @@ -148,16 +141,18 @@ int Connection::readBytes(char *buf, size_t buffer_size) { } } -bool Connection::writeBytes() { +uint32_t Connection::write_bytes() { ssize_t total_bytes_written = 0; + uint32_t result = 0; + while (true) { // Build iovec array while holding mutex using thread-local buffer int iov_count = 0; { std::lock_guard lock(mutex_); - if (is_closed_ || message_queue_.empty()) { + if (message_queue_.empty()) { break; } @@ -204,14 +199,17 @@ bool Connection::writeBytes() { if (total_bytes_written > 0) { bytes_written.inc(total_bytes_written); } - return false; + return result; } perror("sendmsg"); - return true; + result |= Error; + return result; } break; } + result |= Progress; + assert(w > 0); total_bytes_written += w; @@ -244,9 +242,15 @@ bool Connection::writeBytes() { } if (message_complete) { + if (front_message.close_after_send) { + result |= Close; + } // Move arena to thread-local vector for deferred cleanup g_arenas_to_free.emplace_back(std::move(front_message.arena)); message_queue_.pop_front(); + if (result & Close) { + break; + } } else { break; } @@ -258,11 +262,13 @@ bool Connection::writeBytes() { { std::lock_guard lock(mutex_); if (message_queue_.empty()) { + result |= Drained; auto server = server_.lock(); if (server) { struct epoll_event event; event.data.fd = fd_; event.events = EPOLLIN; // Remove EPOLLOUT + tsan_release(); epoll_ctl(server->epoll_fds_[epoll_index_], EPOLL_CTL_MOD, fd_, &event); } } @@ -277,5 +283,5 @@ bool Connection::writeBytes() { // This avoids holding the connection mutex while free() potentially contends g_arenas_to_free.clear(); - return false; + return result; } diff --git a/src/connection.hpp b/src/connection.hpp index 73265fc..22ce42c 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -20,6 +20,40 @@ // Forward declaration struct Server; +/** + * Base interface for sending messages to a connection. + * This restricted interface is safe for use by pipeline threads, + * containing only the append_message method needed for responses. + * Pipeline threads should use WeakRef to safely + * send responses without accessing other connection functionality + * that should only be used by the I/O thread. + */ +struct MessageSender { + /** + * @brief Append message data to connection's outgoing message queue. + * + * Thread-safe method that can be called from any thread, including + * pipeline processing threads. The arena is moved into the connection + * to maintain data lifetime until the message is sent. + * + * @param data_parts Span of string_view parts to send (arena-allocated) + * @param arena Arena containing the memory for data_parts string_views + * @param close_after_send Whether to close connection after sending + * + * Example usage: + * ```cpp + * auto response_parts = std::span{arena.allocate(2), 2}; + * response_parts[0] = "HTTP/1.1 200 OK\r\n\r\n"; + * response_parts[1] = "Hello World"; + * conn.append_message(response_parts, std::move(arena)); + * ``` + */ + virtual void append_message(std::span data_parts, + Arena arena, bool close_after_send = false) = 0; + + virtual ~MessageSender() = default; +}; + /** * Represents a single client connection with thread-safe concurrent access. * @@ -42,7 +76,7 @@ struct Server; * - No connection-owned arena for parsing/response generation * - Message queue stores spans + owning arenas until I/O completion */ -struct Connection { +struct Connection : MessageSender { // No public constructor or factory method - only Server can create // connections @@ -91,7 +125,7 @@ struct Connection { * ``` */ void append_message(std::span data_parts, Arena arena, - bool close_after_send = false); + bool close_after_send = false) override; /** * @brief Get a WeakRef to this connection for async operations. @@ -120,7 +154,10 @@ struct Connection { * }); * ``` */ - WeakRef get_weak_ref() const { return self_ref_.copy(); } + WeakRef get_weak_ref() const { + assert(self_ref_.lock()); + return self_ref_.copy(); + } /** * @brief Get the unique identifier for this connection. @@ -278,23 +315,26 @@ private: // Networking interface - only accessible by Server int readBytes(char *buf, size_t buffer_size); - bool writeBytes(); + enum WriteBytesResult { + Error = 1 << 0, + Progress = 1 << 1, + Drained = 1 << 2, + Close = 1 << 3, + }; + uint32_t write_bytes(); // Direct access methods for Server (must hold mutex) int getFd() const { return fd_; } bool has_messages() const { return !message_queue_.empty(); } - bool should_close() const { return close_after_send_; } size_t getEpollIndex() const { return epoll_index_; } - - // Server can set self-reference after creation - void setSelfRef(WeakRef self) { self_ref_ = std::move(self); } + void close(); // Immutable connection properties - const int fd_; + int fd_; const int64_t id_; const size_t epoll_index_; // Index of the epoll instance this connection uses struct sockaddr_storage addr_; // sockaddr_storage handles IPv4/IPv6 - ConnectionHandler *handler_; + ConnectionHandler *const handler_; WeakRef server_; // Weak reference to server for safe cleanup WeakRef self_ref_; // WeakRef to self for get_weak_ref() @@ -305,6 +345,13 @@ private: // mutex_, but if non-empty mutex_ can be // dropped while server accesses existing elements. int64_t outgoing_bytes_queued_{0}; // Counter of queued bytes - bool close_after_send_{false}; // Close after sending all messages - bool is_closed_{false}; // Connection closed state + +#if __has_feature(thread_sanitizer) + void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } + void tsan_release() { tsan_sync.store(0, std::memory_order_release); } + std::atomic tsan_sync; +#else + void tsan_acquire() {} + void tsan_release() {} +#endif }; diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index bedc864..656aab9 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -3,8 +3,6 @@ #include #include -#include "reference.hpp" - // Forward declaration to avoid circular dependency struct Connection; @@ -36,7 +34,7 @@ public: * - Use conn.get_weak_ref() for async processing if needed * * @note `data` lifetime ends after the call to on_data_arrived. - * @note May be called from an arbitrary server thread. + * @note Called from this connection's io thread. * @note Handler can safely access connection concurrently via thread-safe * methods. */ @@ -51,7 +49,7 @@ public: * - Progress monitoring for long-running transfers * * @param conn Connection that made write progress - server retains ownership - * @note May be called from an arbitrary server thread. + * @note Called from this connection's io thread. * @note Called during writes, not necessarily when buffer becomes empty */ virtual void on_write_progress(Connection &) {} @@ -66,7 +64,7 @@ public: * - Relieving backpressure conditions * * @param conn Connection with empty write buffer - server retains ownership - * @note May be called from an arbitrary server thread. + * @note Called from this connection's io thread. * @note Only called on transitions from non-empty → empty buffer */ virtual void on_write_buffer_drained(Connection &) {} @@ -78,7 +76,7 @@ public: * * Use this for: * - Connection-specific initialization. - * @note May be called from an arbitrary server thread. + * @note Called from this connection's io thread. */ virtual void on_connection_established(Connection &) {} @@ -89,7 +87,8 @@ public: * * Use this for: * - Cleanup of connection-specific resources. - * @note May be called from an arbitrary server thread. + * @note Called from this connection's io thread, or possibly a foreign thread + * that has locked the MessageSender associated with this connection. */ virtual void on_connection_closed(Connection &) {} @@ -101,6 +100,7 @@ public: * All connections remain server-owned. * * @param batch A span of connection references in the batch. + * @note Called from this connection's io thread. */ - virtual void on_batch_complete(std::span /*batch*/) {} + virtual void on_batch_complete(std::span /*batch*/) {} }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 6b47347..4f11f63 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -7,13 +7,13 @@ #include "api_url_parser.hpp" #include "arena.hpp" +#include "connection.hpp" #include "cpu_work.hpp" #include "format.hpp" #include "json_commit_request_parser.hpp" #include "metric.hpp" #include "perfetto_categories.hpp" #include "pipeline_entry.hpp" -#include "server.hpp" auto requests_counter_family = metric::create_counter( "weaseldb_http_requests_total", "Total http requests"); @@ -37,8 +37,8 @@ auto banned_request_ids_memory_gauge = .create({}); // HttpConnectionState implementation -HttpConnectionState::HttpConnectionState(Arena &arena) - : arena(arena), current_header_field_buf(ArenaStlAllocator(&arena)), +HttpConnectionState::HttpConnectionState() + : current_header_field_buf(ArenaStlAllocator(&arena)), current_header_value_buf(ArenaStlAllocator(&arena)) { llhttp_settings_init(&settings); @@ -58,62 +58,54 @@ HttpConnectionState::HttpConnectionState(Arena &arena) // HttpHandler implementation void HttpHandler::on_connection_established(Connection &conn) { - // Allocate HTTP state in connection's arena - Arena &arena = conn.get_arena(); - void *mem = arena.allocate_raw(sizeof(HttpConnectionState), - alignof(HttpConnectionState)); - auto *state = new (mem) HttpConnectionState(arena); + // Allocate HTTP state using server-provided arena for connection lifecycle + auto *state = new HttpConnectionState(); conn.user_data = state; } void HttpHandler::on_connection_closed(Connection &conn) { - // Arena cleanup happens automatically when connection is destroyed auto *state = static_cast(conn.user_data); - if (state) { - // Arena::Ptr automatically calls destructors - state->~HttpConnectionState(); - } + delete state; conn.user_data = nullptr; } -void HttpHandler::on_write_buffer_drained(Ref &conn_ptr) { - // Reset arena after all messages have been written for the next request - auto *state = static_cast(conn_ptr->user_data); +void HttpHandler::on_write_buffer_drained(Connection &conn) { + // Reset state after all messages have been written for the next request + auto *state = static_cast(conn.user_data); if (state) { TRACE_EVENT("http", "reply", perfetto::Flow::Global(state->http_request_id)); } - on_connection_closed(*conn_ptr); - conn_ptr->reset(); - on_connection_established(*conn_ptr); + // TODO we don't need this anymore. Look at removing it. + on_connection_closed(conn); + // Note: Connection reset happens at server level, not connection level + on_connection_established(conn); } -void HttpHandler::on_batch_complete(std::span> batch) { +void HttpHandler::on_batch_complete(std::span batch) { // Collect commit, status, and health check requests for pipeline processing int pipeline_count = 0; // Count commit, status, and health check requests - for (auto &conn : batch) { - if (conn && conn->user_data) { - auto *state = static_cast(conn->user_data); + for (auto conn : batch) { + auto *state = static_cast(conn->user_data); - // Count commit requests that passed basic validation - if (state->route == HttpRoute::PostCommit && state->commit_request && - state->parsing_commit && state->basic_validation_passed) { - pipeline_count++; - } - // Count status requests - else if (state->route == HttpRoute::GetStatus && - // Error message not already queued - conn->outgoing_bytes_queued() == 0) { - pipeline_count++; - } - // Count health check requests - else if (state->route == HttpRoute::GetOk && - // Error message not already queued - conn->outgoing_bytes_queued() == 0) { - pipeline_count++; - } + // Count commit requests that passed basic validation + if (state->route == HttpRoute::PostCommit && state->commit_request && + state->parsing_commit && state->basic_validation_passed) { + pipeline_count++; + } + // Count status requests + else if (state->route == HttpRoute::GetStatus && + // Error message not already queued + conn->outgoing_bytes_queued() == 0) { + pipeline_count++; + } + // Count health check requests + else if (state->route == HttpRoute::GetOk && + // Error message not already queued + conn->outgoing_bytes_queued() == 0) { + pipeline_count++; } } @@ -122,33 +114,36 @@ void HttpHandler::on_batch_complete(std::span> batch) { auto guard = commitPipeline.push(pipeline_count, true); auto out_iter = guard.batch.begin(); - for (auto &conn : batch) { - if (conn && conn->user_data) { - auto *state = static_cast(conn->user_data); + for (auto conn : batch) { + auto *state = static_cast(conn->user_data); - // Create CommitEntry for commit requests - if (state->route == HttpRoute::PostCommit && state->commit_request && - state->parsing_commit && state->basic_validation_passed) { - *out_iter++ = CommitEntry{std::move(conn)}; - } - // Create StatusEntry for status requests - else if (state->route == HttpRoute::GetStatus) { - *out_iter++ = StatusEntry{std::move(conn)}; - } - // Create HealthCheckEntry for health check requests - else if (state->route == HttpRoute::GetOk) { - *out_iter++ = HealthCheckEntry{std::move(conn)}; - } + // Create CommitEntry for commit requests + if (state->route == HttpRoute::PostCommit && state->commit_request && + state->parsing_commit && state->basic_validation_passed) { + *out_iter++ = + CommitEntry{conn->get_weak_ref(), state->http_request_id, + state->connection_close, state->commit_request.get()}; + } + // Create StatusEntry for status requests + else if (state->route == HttpRoute::GetStatus) { + *out_iter++ = + StatusEntry{conn->get_weak_ref(), state->http_request_id, + state->connection_close, state->status_request_id}; + } + // Create HealthCheckEntry for health check requests + else if (state->route == HttpRoute::GetOk) { + *out_iter++ = + HealthCheckEntry{conn->get_weak_ref(), state->http_request_id, + state->connection_close}; } } } } -void HttpHandler::on_data_arrived(std::string_view data, - Ref &conn_ptr) { - auto *state = static_cast(conn_ptr->user_data); +void HttpHandler::on_data_arrived(std::string_view data, Connection &conn) { + auto *state = static_cast(conn.user_data); if (!state) { - send_error_response(*conn_ptr, 500, "Internal server error", true); + send_error_response(conn, 500, "Internal server error", Arena{}, 0, true); return; } @@ -162,15 +157,18 @@ void HttpHandler::on_data_arrived(std::string_view data, llhttp_execute(&state->parser, data.data(), data.size()); if (err != HPE_OK) { - send_error_response(*conn_ptr, 400, "Bad request", true); + send_error_response(conn, 400, "Bad request", Arena{}, 0, true); return; } // If message is complete, route and handle the request if (state->message_complete) { - // Copy URL to arena for in-place decoding - Arena &arena = conn_ptr->get_arena(); - char *url_buffer = arena.allocate(state->url.size()); + // Create request-scoped arena for URL parsing + // FIX: request_arena lifetime ends too soon. Should move arena into + // individual handlers and propagate it all the way through to + // append_message + Arena request_arena; + char *url_buffer = request_arena.allocate(state->url.size()); std::memcpy(url_buffer, state->url.data(), state->url.size()); RouteMatch route_match; @@ -180,7 +178,8 @@ void HttpHandler::on_data_arrived(std::string_view data, if (parse_result != ParseResult::Success) { // Handle malformed URL encoding - send_error_response(*conn_ptr, 400, "Malformed URL encoding", true); + send_error_response(conn, 400, "Malformed URL encoding", Arena{}, 0, + true); return; } @@ -189,35 +188,36 @@ void HttpHandler::on_data_arrived(std::string_view data, // Route to appropriate handler switch (state->route) { case HttpRoute::GetVersion: - handle_get_version(*conn_ptr, *state); + handle_get_version(conn, *state, std::move(request_arena)); break; case HttpRoute::PostCommit: - handle_post_commit(*conn_ptr, *state); + handle_post_commit(conn, *state, std::move(request_arena)); break; case HttpRoute::GetSubscribe: - handle_get_subscribe(*conn_ptr, *state); + handle_get_subscribe(conn, *state, std::move(request_arena)); break; case HttpRoute::GetStatus: - handle_get_status(*conn_ptr, *state, route_match); + handle_get_status(conn, *state, route_match, std::move(request_arena)); break; case HttpRoute::PutRetention: - handle_put_retention(*conn_ptr, *state, route_match); + handle_put_retention(conn, *state, route_match, std::move(request_arena)); break; case HttpRoute::GetRetention: - handle_get_retention(*conn_ptr, *state, route_match); + handle_get_retention(conn, *state, route_match, std::move(request_arena)); break; case HttpRoute::DeleteRetention: - handle_delete_retention(*conn_ptr, *state, route_match); + handle_delete_retention(conn, *state, route_match, + std::move(request_arena)); break; case HttpRoute::GetMetrics: - handle_get_metrics(*conn_ptr, *state); + handle_get_metrics(conn, *state, std::move(request_arena)); break; case HttpRoute::GetOk: - handle_get_ok(*conn_ptr, *state); + handle_get_ok(conn, *state, std::move(request_arena)); break; case HttpRoute::NotFound: default: - handle_not_found(*conn_ptr, *state); + handle_not_found(conn, *state, std::move(request_arena)); break; } } @@ -225,27 +225,29 @@ void HttpHandler::on_data_arrived(std::string_view data, // Route handlers (basic implementations) void HttpHandler::handle_get_version(Connection &conn, - const HttpConnectionState &state) { + const HttpConnectionState &state, + Arena request_arena) { version_counter.inc(); send_json_response( conn, 200, - format(conn.get_arena(), R"({"version":%ld,"leader":""})", + format(request_arena, R"({"version":%ld,"leader":""})", this->committed_version.load(std::memory_order_seq_cst)), - state.connection_close); + std::move(request_arena), state.http_request_id, state.connection_close); } void HttpHandler::handle_post_commit(Connection &conn, - const HttpConnectionState &state) { + const HttpConnectionState &state, + Arena request_arena) { commit_counter.inc(); // Check if streaming parse was successful if (!state.commit_request || !state.parsing_commit) { const char *error = state.commit_parser ? state.commit_parser->get_parse_error() : "No parser initialized"; - Arena &arena = conn.get_arena(); - std::string_view error_msg = - format(arena, "Parse failed: %s", error ? error : "Unknown error"); - send_error_response(conn, 400, error_msg, state.connection_close); + std::string_view error_msg = format(request_arena, "Parse failed: %s", + error ? error : "Unknown error"); + send_error_response(conn, 400, error_msg, std::move(request_arena), + state.http_request_id, state.connection_close); return; } @@ -285,7 +287,8 @@ void HttpHandler::handle_post_commit(Connection &conn, } if (!valid) { - send_error_response(conn, 400, error_msg, state.connection_close); + send_error_response(conn, 400, error_msg, std::move(request_arena), + state.http_request_id, state.connection_close); return; } @@ -295,17 +298,18 @@ void HttpHandler::handle_post_commit(Connection &conn, } void HttpHandler::handle_get_subscribe(Connection &conn, - const HttpConnectionState &state) { + const HttpConnectionState &state, + Arena) { // TODO: Implement subscription streaming send_json_response( conn, 200, R"({"message":"Subscription endpoint - streaming not yet implemented"})", - state.connection_close); + Arena{}, state.http_request_id, state.connection_close); } void HttpHandler::handle_get_status(Connection &conn, HttpConnectionState &state, - const RouteMatch &route_match) { + const RouteMatch &route_match, Arena) { status_counter.inc(); // Status requests are processed through the pipeline // Response will be generated in the sequence stage @@ -316,14 +320,14 @@ void HttpHandler::handle_get_status(Connection &conn, route_match.params[static_cast(ApiParameterKey::RequestId)]; if (!request_id) { send_error_response(conn, 400, - "Missing required query parameter: request_id", - state.connection_close); + "Missing required query parameter: request_id", Arena{}, + state.http_request_id, state.connection_close); return; } if (request_id->empty()) { - send_error_response(conn, 400, "Empty request_id parameter", - state.connection_close); + send_error_response(conn, 400, "Empty request_id parameter", Arena{}, + state.http_request_id, state.connection_close); return; } @@ -335,32 +339,33 @@ void HttpHandler::handle_get_status(Connection &conn, void HttpHandler::handle_put_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match) { + const RouteMatch &, Arena) { // TODO: Parse retention policy from body and store send_json_response(conn, 200, R"({"policy_id":"example","status":"created"})", - state.connection_close); + Arena{}, state.http_request_id, state.connection_close); } void HttpHandler::handle_get_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match) { + const RouteMatch &, Arena) { // TODO: Extract policy_id from URL or return all policies - send_json_response(conn, 200, R"({"policies":[]})", state.connection_close); + send_json_response(conn, 200, R"({"policies":[]})", Arena{}, + state.http_request_id, state.connection_close); } void HttpHandler::handle_delete_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match) { + const RouteMatch &, Arena) { // TODO: Extract policy_id from URL and delete send_json_response(conn, 200, R"({"policy_id":"example","status":"deleted"})", - state.connection_close); + Arena{}, state.http_request_id, state.connection_close); } void HttpHandler::handle_get_metrics(Connection &conn, - const HttpConnectionState &state) { + const HttpConnectionState &state, + Arena request_arena) { metrics_counter.inc(); - Arena &arena = conn.get_arena(); - auto metrics_span = metric::render(arena); + auto metrics_span = metric::render(request_arena); // Calculate total size for the response body size_t total_size = 0; @@ -374,32 +379,33 @@ void HttpHandler::handle_get_metrics(Connection &conn, std::string_view headers; if (state.connection_close) { headers = static_format( - arena, "HTTP/1.1 200 OK\r\n", + request_arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", "X-Response-ID: ", static_cast(http_state->http_request_id), "\r\n", "Connection: close\r\n", "\r\n"); - conn.close_after_send(); } else { headers = static_format( - arena, "HTTP/1.1 200 OK\r\n", + request_arena, "HTTP/1.1 200 OK\r\n", "Content-Type: text/plain; version=0.0.4\r\n", "Content-Length: ", static_cast(total_size), "\r\n", "X-Response-ID: ", static_cast(http_state->http_request_id), "\r\n", "Connection: keep-alive\r\n", "\r\n"); } - // Send headers - conn.append_message(headers, false); - - // Send body in chunks - for (const auto &sv : metrics_span) { - conn.append_message(sv, false); + auto result = std::span{ + request_arena.allocate(metrics_span.size() + 1), + metrics_span.size() + 1}; + auto out = result.begin(); + *out++ = headers; + for (auto sv : metrics_span) { + *out++ = sv; } + conn.append_message(result, std::move(request_arena)); } -void HttpHandler::handle_get_ok(Connection &conn, - const HttpConnectionState &state) { +void HttpHandler::handle_get_ok(Connection &, const HttpConnectionState &state, + Arena) { ok_counter.inc(); TRACE_EVENT("http", "GET /ok", perfetto::Flow::Global(state.http_request_id)); @@ -408,16 +414,17 @@ void HttpHandler::handle_get_ok(Connection &conn, } void HttpHandler::handle_not_found(Connection &conn, - const HttpConnectionState &state) { - send_error_response(conn, 404, "Not found", state.connection_close); + const HttpConnectionState &state, Arena) { + send_error_response(conn, 404, "Not found", Arena{}, state.http_request_id, + state.connection_close); } // HTTP utility methods -void HttpHandler::send_response(Connection &conn, int status_code, +void HttpHandler::send_response(MessageSender &conn, int status_code, std::string_view content_type, - std::string_view body, bool close_connection) { - Arena &arena = conn.get_arena(); - auto *state = static_cast(conn.user_data); + std::string_view body, Arena response_arena, + int64_t http_request_id, + bool close_connection) { // Status text std::string_view status_text; @@ -441,8 +448,10 @@ void HttpHandler::send_response(Connection &conn, int status_code, const char *connection_header = close_connection ? "close" : "keep-alive"; - std::string_view response = - format(arena, + auto response = std::span{response_arena.allocate(1), 1}; + + response[0] = + format(response_arena, "HTTP/1.1 %d %.*s\r\n" "Content-Type: %.*s\r\n" "Content-Length: %zu\r\n" @@ -451,32 +460,32 @@ void HttpHandler::send_response(Connection &conn, int status_code, "\r\n%.*s", status_code, static_cast(status_text.size()), status_text.data(), static_cast(content_type.size()), - content_type.data(), body.size(), state->http_request_id, + content_type.data(), body.size(), http_request_id, connection_header, static_cast(body.size()), body.data()); - if (close_connection) { - conn.close_after_send(); - } - - conn.append_message(response); + conn.append_message(response, std::move(response_arena), close_connection); } -void HttpHandler::send_json_response(Connection &conn, int status_code, +void HttpHandler::send_json_response(MessageSender &conn, int status_code, std::string_view json, + Arena response_arena, + int64_t http_request_id, bool close_connection) { - send_response(conn, status_code, "application/json", json, close_connection); + send_response(conn, status_code, "application/json", json, + std::move(response_arena), http_request_id, close_connection); } -void HttpHandler::send_error_response(Connection &conn, int status_code, +void HttpHandler::send_error_response(MessageSender &conn, int status_code, std::string_view message, + Arena response_arena, + int64_t http_request_id, bool close_connection) { - Arena &arena = conn.get_arena(); - std::string_view json = - format(arena, R"({"error":"%.*s"})", static_cast(message.size()), - message.data()); + format(response_arena, R"({"error":"%.*s"})", + static_cast(message.size()), message.data()); - send_json_response(conn, status_code, json, close_connection); + send_json_response(conn, status_code, json, std::move(response_arena), + http_request_id, close_connection); } // llhttp callbacks @@ -623,29 +632,35 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { } else if constexpr (std::is_same_v) { // Process commit entry: check banned list, assign version auto &commit_entry = e; - auto *state = static_cast( - commit_entry.connection->user_data); + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing + } - if (!state || !state->commit_request) { + if (!commit_entry.commit_request) { // Should not happen - basic validation was done on I/O thread - send_error_response(*commit_entry.connection, 500, - "Internal server error", true); + send_error_response(*conn_ref, 500, "Internal server error", + Arena{}, commit_entry.http_request_id, true); return false; } // Check if request_id is banned (for status queries) // Only check CommitRequest request_id, not HTTP header - if (state->commit_request && - state->commit_request->request_id().has_value()) { + if (commit_entry.commit_request && + commit_entry.commit_request->request_id().has_value()) { auto commit_request_id = - state->commit_request->request_id().value(); + commit_entry.commit_request->request_id().value(); if (banned_request_ids.find(commit_request_id) != banned_request_ids.end()) { // Request ID is banned, this commit should fail send_json_response( - *commit_entry.connection, 409, + *conn_ref, 409, R"({"status": "not_committed", "error": "request_id_banned"})", - state->connection_close); + Arena{}, commit_entry.http_request_id, + commit_entry.connection_close); return false; } } @@ -654,25 +669,30 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { commit_entry.assigned_version = next_version++; TRACE_EVENT("http", "sequence_commit", - perfetto::Flow::Global(state->http_request_id)); + perfetto::Flow::Global(commit_entry.http_request_id)); return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: add request_id to banned list, get version // upper bound auto &status_entry = e; - auto *state = static_cast( - status_entry.connection->user_data); + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing + } - if (state && !state->status_request_id.empty()) { + if (!status_entry.status_request_id.empty()) { // Add request_id to banned list - store the string in arena and // use string_view char *arena_chars = banned_request_arena.allocate( - state->status_request_id.size()); - std::memcpy(arena_chars, state->status_request_id.data(), - state->status_request_id.size()); - std::string_view request_id_view(arena_chars, - state->status_request_id.size()); + status_entry.status_request_id.size()); + std::memcpy(arena_chars, status_entry.status_request_id.data(), + status_entry.status_request_id.size()); + std::string_view request_id_view( + arena_chars, status_entry.status_request_id.size()); banned_request_ids.insert(request_id_view); // Update memory usage metric @@ -684,26 +704,30 @@ bool HttpHandler::process_sequence_batch(BatchType &batch) { } TRACE_EVENT("http", "sequence_status", - perfetto::Flow::Global(state->http_request_id)); + perfetto::Flow::Global(status_entry.http_request_id)); // TODO: Transfer to status threadpool - for now just respond // not_committed - send_json_response(*status_entry.connection, 200, - R"({"status": "not_committed"})", - state->connection_close); + send_json_response(*conn_ref, 200, R"({"status": "not_committed"})", + Arena{}, status_entry.http_request_id, + status_entry.connection_close); return false; // Continue processing } else if constexpr (std::is_same_v) { // Process health check entry: noop in sequence stage auto &health_check_entry = e; - auto *state = static_cast( - health_check_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "sequence_health_check", - perfetto::Flow::Global(state->http_request_id)); + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT( + "http", "sequence_health_check", + perfetto::Flow::Global(health_check_entry.http_request_id)); + return false; // Continue processing } @@ -736,10 +760,15 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { // Process commit entry: accept all commits (simplified // implementation) auto &commit_entry = e; - auto *state = static_cast( - commit_entry.connection->user_data); + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing + } - if (!state || !state->commit_request) { + if (!commit_entry.commit_request) { // Skip processing for failed sequence stage return false; } @@ -748,7 +777,7 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { commit_entry.resolve_success = true; TRACE_EVENT("http", "resolve_commit", - perfetto::Flow::Global(state->http_request_id)); + perfetto::Flow::Global(commit_entry.http_request_id)); return false; // Continue processing } else if constexpr (std::is_same_v) { @@ -758,14 +787,18 @@ bool HttpHandler::process_resolve_batch(BatchType &batch) { } else if constexpr (std::is_same_v) { // Process health check entry: perform configurable CPU work auto &health_check_entry = e; - auto *state = static_cast( - health_check_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "resolve_health_check", - perfetto::Flow::Global(state->http_request_id)); + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT( + "http", "resolve_health_check", + perfetto::Flow::Global(health_check_entry.http_request_id)); + // Perform configurable CPU-intensive work for benchmarking spend_cpu_cycles(config_.benchmark.ok_resolve_iterations); @@ -799,12 +832,17 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { } else if constexpr (std::is_same_v) { // Process commit entry: mark as durable, generate response auto &commit_entry = e; - auto *state = static_cast( - commit_entry.connection->user_data); + // Check if connection is still alive first + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing + } // Skip if resolve failed or connection is in error state - if (!state || !state->commit_request || - !commit_entry.resolve_success) { + if (!commit_entry.commit_request || !commit_entry.resolve_success) { return false; } @@ -814,29 +852,31 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { std::memory_order_seq_cst); TRACE_EVENT("http", "persist_commit", - perfetto::Flow::Global(state->http_request_id)); + perfetto::Flow::Global(commit_entry.http_request_id)); - const CommitRequest &commit_request = *state->commit_request; - Arena &arena = commit_entry.connection->get_arena(); + const CommitRequest &commit_request = *commit_entry.commit_request; + + Arena response_arena; std::string_view response; // Generate success response with actual assigned version if (commit_request.request_id().has_value()) { response = format( - arena, + response_arena, R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})", static_cast(commit_request.request_id().value().size()), commit_request.request_id().value().data(), commit_entry.assigned_version); } else { response = format( - arena, + response_arena, R"({"status":"committed","version":%ld,"leader_id":"leader123"})", commit_entry.assigned_version); } - send_json_response(*commit_entry.connection, 200, response, - state->connection_close); + send_json_response( + *conn_ref, 200, response, std::move(response_arena), + commit_entry.http_request_id, commit_entry.connection_close); return false; // Continue processing } else if constexpr (std::is_same_v) { @@ -846,18 +886,23 @@ bool HttpHandler::process_persist_batch(BatchType &batch) { } else if constexpr (std::is_same_v) { // Process health check entry: generate OK response auto &health_check_entry = e; - auto *state = static_cast( - health_check_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "persist_health_check", - perfetto::Flow::Global(state->http_request_id)); - - // Generate OK response - send_response(*health_check_entry.connection, 200, "text/plain", - "OK", state->connection_close); + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT( + "http", "persist_health_check", + perfetto::Flow::Global(health_check_entry.http_request_id)); + + // Generate OK response + send_response(*conn_ref, 200, "text/plain", "OK", Arena{}, + health_check_entry.http_request_id, + health_check_entry.connection_close); + return false; // Continue processing } @@ -888,38 +933,48 @@ bool HttpHandler::process_release_batch(BatchType &batch) { } else if constexpr (std::is_same_v) { // Process commit entry: return connection to server auto &commit_entry = e; - auto *state = static_cast( - commit_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "release_commit", - perfetto::Flow::Global(state->http_request_id)); + auto conn_ref = commit_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT("http", "release_commit", + perfetto::Flow::Global(commit_entry.http_request_id)); + return false; // Continue processing } else if constexpr (std::is_same_v) { // Process status entry: return connection to server auto &status_entry = e; - auto *state = static_cast( - status_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "release_status", - perfetto::Flow::Global(state->http_request_id)); + auto conn_ref = status_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT("http", "release_status", + perfetto::Flow::Global(status_entry.http_request_id)); + return false; // Continue processing } else if constexpr (std::is_same_v) { // Process health check entry: return connection to server auto &health_check_entry = e; - auto *state = static_cast( - health_check_entry.connection->user_data); - - if (state) { - TRACE_EVENT("http", "release_health_check", - perfetto::Flow::Global(state->http_request_id)); + auto conn_ref = health_check_entry.connection.lock(); + if (!conn_ref) { + // Connection is gone, drop the entry silently and increment + // metric + // TODO: Add dropped_pipeline_entries metric + return false; // Skip this entry and continue processing } + TRACE_EVENT( + "http", "release_health_check", + perfetto::Flow::Global(health_check_entry.http_request_id)); + return false; // Continue processing } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index b80bd9e..9620a24 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -1,7 +1,6 @@ #pragma once #include -#include #include #include #include @@ -13,9 +12,7 @@ #include "config.hpp" #include "connection.hpp" #include "connection_handler.hpp" -#include "perfetto_categories.hpp" #include "pipeline_entry.hpp" -#include "server.hpp" #include "thread_pipeline.hpp" // Forward declarations @@ -28,7 +25,7 @@ struct RouteMatch; * Manages llhttp parser state and request data. */ struct HttpConnectionState { - Arena &arena; + Arena arena; // Request-scoped arena for parsing state llhttp_t parser; llhttp_settings_t settings; @@ -62,7 +59,7 @@ struct HttpConnectionState { bool basic_validation_passed = false; // Set to true if basic validation passes - explicit HttpConnectionState(Arena &arena); + HttpConnectionState(); }; /** @@ -134,10 +131,9 @@ struct HttpHandler : ConnectionHandler { void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; - void on_data_arrived(std::string_view data, - Ref &conn_ptr) override; - void on_write_buffer_drained(Ref &conn_ptr) override; - void on_batch_complete(std::span> /*batch*/) override; + void on_data_arrived(std::string_view data, Connection &conn) override; + void on_batch_complete(std::span batch) override; + void on_write_buffer_drained(Connection &conn_ptr) override; // llhttp callbacks (public for HttpConnectionState access) static int onUrl(llhttp_t *parser, const char *at, size_t length); @@ -193,31 +189,40 @@ private: bool process_release_batch(BatchType &batch); // Route handlers - void handle_get_version(Connection &conn, const HttpConnectionState &state); - void handle_post_commit(Connection &conn, const HttpConnectionState &state); - void handle_get_subscribe(Connection &conn, const HttpConnectionState &state); + void handle_get_version(Connection &conn, const HttpConnectionState &state, + Arena request_arena); + void handle_post_commit(Connection &conn, const HttpConnectionState &state, + Arena request_arena); + void handle_get_subscribe(Connection &conn, const HttpConnectionState &state, + Arena request_arena); void handle_get_status(Connection &conn, HttpConnectionState &state, - const RouteMatch &route_match); + const RouteMatch &route_match, Arena request_arena); void handle_put_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match); + const RouteMatch &route_match, Arena request_arena); void handle_get_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match); + const RouteMatch &route_match, Arena request_arena); void handle_delete_retention(Connection &conn, const HttpConnectionState &state, - const RouteMatch &route_match); - void handle_get_metrics(Connection &conn, const HttpConnectionState &state); - void handle_get_ok(Connection &conn, const HttpConnectionState &state); - void handle_not_found(Connection &conn, const HttpConnectionState &state); + const RouteMatch &route_match, + Arena request_arena); + void handle_get_metrics(Connection &conn, const HttpConnectionState &state, + Arena request_arena); + void handle_get_ok(Connection &conn, const HttpConnectionState &state, + Arena request_arena); + void handle_not_found(Connection &conn, const HttpConnectionState &state, + Arena request_arena); // HTTP utilities - static void send_response(Connection &conn, int status_code, + static void send_response(MessageSender &conn, int status_code, std::string_view content_type, - std::string_view body, - bool close_connection = false); - static void send_json_response(Connection &conn, int status_code, - std::string_view json, - bool close_connection = false); - static void send_error_response(Connection &conn, int status_code, + std::string_view body, Arena response_arena, + int64_t http_request_id, bool close_connection); + static void send_json_response(MessageSender &conn, int status_code, + std::string_view json, Arena response_arena, + int64_t http_request_id, + bool close_connection); + static void send_error_response(MessageSender &conn, int status_code, std::string_view message, - bool close_connection = false); + Arena response_arena, int64_t http_request_id, + bool close_connection); }; diff --git a/src/metric.cpp b/src/metric.cpp index e1194f5..7e90b01 100644 --- a/src/metric.cpp +++ b/src/metric.cpp @@ -454,7 +454,7 @@ struct Metric { Arena arena; ThreadInit() { // Register this thread's arena for memory tracking - std::unique_lock _{mutex}; + std::unique_lock _{mutex}; get_thread_arenas()[std::this_thread::get_id()] = &arena; } ~ThreadInit() { @@ -462,7 +462,7 @@ struct Metric { // THREAD SAFETY: All operations below are protected by the global mutex, // including writes to global accumulated state, preventing races with // render thread - std::unique_lock _{mutex}; + std::unique_lock _{mutex}; // NOTE: registration_version increment is REQUIRED here because: // - Cached render plan has pre-resolved pointers to thread-local state // - When threads disappear, these pointers become invalid @@ -501,7 +501,7 @@ struct Metric { if (thread_it != family->per_thread_state.end()) { for (auto &[labels_key, instance] : thread_it->second.instances) { // Acquire lock to get consistent snapshot - std::lock_guard lock(instance->mutex); + std::lock_guard lock(instance->mutex); // Global accumulator should have been created when we made the // histogram @@ -592,7 +592,7 @@ struct Metric { // Force thread_local initialization (void)thread_init; - std::unique_lock _{mutex}; + std::unique_lock _{mutex}; ++Metric::registration_version; const LabelsKey &key = intern_labels(labels); @@ -633,7 +633,7 @@ struct Metric { static Gauge create_gauge_instance( Family *family, std::span> labels) { - std::unique_lock _{mutex}; + std::unique_lock _{mutex}; ++Metric::registration_version; const LabelsKey &key = intern_labels(labels); @@ -659,7 +659,7 @@ struct Metric { // Force thread_local initialization (void)thread_init; - std::unique_lock _{mutex}; + std::unique_lock _{mutex}; ++Metric::registration_version; const LabelsKey &key = intern_labels(labels); @@ -1137,7 +1137,7 @@ struct Metric { uint64_t observations_snapshot; { - std::lock_guard lock(instance->mutex); + std::lock_guard lock(instance->mutex); for (size_t i = 0; i < instance->counts.size(); ++i) { counts_snapshot[i] = instance->counts[i]; } @@ -1423,7 +1423,7 @@ update_histogram_buckets_simd(std::span thresholds, void Histogram::observe(double x) { assert(p->thresholds.size() == p->counts.size()); - std::lock_guard lock(p->mutex); + std::lock_guard lock(p->mutex); // Update bucket counts using SIMD update_histogram_buckets_simd(p->thresholds, p->counts, x, 0); @@ -1458,7 +1458,7 @@ Histogram Family::create( Family create_counter(std::string_view name, std::string_view help) { validate_or_abort(is_valid_metric_name(name), "invalid counter name", name); - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; auto &global_arena = Metric::get_global_arena(); auto name_view = arena_copy_string(name, global_arena); @@ -1480,7 +1480,7 @@ Family create_counter(std::string_view name, std::string_view help) { Family create_gauge(std::string_view name, std::string_view help) { validate_or_abort(is_valid_metric_name(name), "invalid gauge name", name); - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; auto &global_arena = Metric::get_global_arena(); auto name_view = arena_copy_string(name, global_arena); @@ -1504,7 +1504,7 @@ Family create_histogram(std::string_view name, std::string_view help, std::span buckets) { validate_or_abort(is_valid_metric_name(name), "invalid histogram name", name); - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; auto &global_arena = Metric::get_global_arena(); auto name_view = arena_copy_string(name, global_arena); @@ -1693,7 +1693,7 @@ std::span render(Arena &arena) { // Hold lock throughout all phases to prevent registry changes // THREAD SAFETY: Global mutex protects cached_plan initialization and access, // prevents races during static member initialization at program startup - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; // Call all registered collectors to update their metrics for (const auto &collector : Metric::get_collectors()) { @@ -1723,7 +1723,7 @@ template <> void Family::register_callback( std::span> labels, MetricCallback callback) { - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; const LabelsKey &key = Metric::intern_labels(labels); @@ -1748,7 +1748,7 @@ template <> void Family::register_callback( std::span> labels, MetricCallback callback) { - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; const LabelsKey &key = Metric::intern_labels(labels); @@ -1804,7 +1804,7 @@ void reset_metrics_for_testing() { } void register_collector(Ref collector) { - std::unique_lock _{Metric::mutex}; + std::unique_lock _{Metric::mutex}; ++Metric::registration_version; Metric::get_collectors().push_back(std::move(collector)); } diff --git a/src/perfetto_categories.hpp b/src/perfetto_categories.hpp index 882f86b..cc75a65 100644 --- a/src/perfetto_categories.hpp +++ b/src/perfetto_categories.hpp @@ -1,11 +1,15 @@ #pragma once -#define ENABLE_PERFETTO 1 +#ifndef ENABLE_PERFETTO +#define ENABLE_PERFETTO 0 +#endif #if ENABLE_PERFETTO #include #else #define PERFETTO_DEFINE_CATEGORIES(...) +#define PERFETTO_TRACK_EVENT_STATIC_STORAGE \ + void perfetto_track_event_static_storage #define TRACE_EVENT(...) #endif diff --git a/src/pipeline_entry.hpp b/src/pipeline_entry.hpp index 0b78359..bffd8da 100644 --- a/src/pipeline_entry.hpp +++ b/src/pipeline_entry.hpp @@ -1,21 +1,32 @@ #pragma once #include "connection.hpp" -#include #include +// Forward declarations +struct CommitRequest; + /** * Pipeline entry for commit requests that need full 4-stage processing. * Contains connection with parsed CommitRequest. */ struct CommitEntry { - Ref connection; + WeakRef connection; int64_t assigned_version = 0; // Set by sequence stage bool resolve_success = false; // Set by resolve stage bool persist_success = false; // Set by persist stage + // Copied HTTP state (pipeline threads cannot access connection user_data) + int64_t http_request_id = 0; + bool connection_close = false; + const CommitRequest *commit_request = + nullptr; // Points to connection's arena data + CommitEntry() = default; // Default constructor for variant - explicit CommitEntry(Ref conn) : connection(std::move(conn)) {} + explicit CommitEntry(WeakRef conn, int64_t req_id, + bool close_conn, const CommitRequest *req) + : connection(std::move(conn)), http_request_id(req_id), + connection_close(close_conn), commit_request(req) {} }; /** @@ -23,11 +34,19 @@ struct CommitEntry { * then transfer to status threadpool. */ struct StatusEntry { - Ref connection; + WeakRef connection; int64_t version_upper_bound = 0; // Set by sequence stage + // Copied HTTP state + int64_t http_request_id = 0; + bool connection_close = false; + std::string_view status_request_id; // Points to connection's arena data + StatusEntry() = default; // Default constructor for variant - explicit StatusEntry(Ref conn) : connection(std::move(conn)) {} + explicit StatusEntry(WeakRef conn, int64_t req_id, + bool close_conn, std::string_view request_id) + : connection(std::move(conn)), http_request_id(req_id), + connection_close(close_conn), status_request_id(request_id) {} }; /** @@ -36,11 +55,17 @@ struct StatusEntry { * Resolve stage can perform configurable CPU work for benchmarking. */ struct HealthCheckEntry { - Ref connection; + WeakRef connection; + + // Copied HTTP state + int64_t http_request_id = 0; + bool connection_close = false; HealthCheckEntry() = default; // Default constructor for variant - explicit HealthCheckEntry(Ref conn) - : connection(std::move(conn)) {} + explicit HealthCheckEntry(WeakRef conn, int64_t req_id, + bool close_conn) + : connection(std::move(conn)), http_request_id(req_id), + connection_close(close_conn) {} }; /** diff --git a/src/server.cpp b/src/server.cpp index 8a53e64..835ec2b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -173,6 +173,8 @@ int Server::create_local_connection() { auto connection = make_ref( addr, server_fd, connection_id_.fetch_add(1, std::memory_order_relaxed), epoll_index, &handler_, self_.copy()); + connection->self_ref_ = connection.as_weak(); + connection->tsan_release(); // Store in registry connection_registry_.store(server_fd, std::move(connection)); @@ -305,10 +307,11 @@ void Server::start_io_threads(std::vector &threads) { // Handle existing connection events int fd = events[i].data.fd; Ref conn = connection_registry_.remove(fd); + conn->tsan_acquire(); assert(conn); if (events[i].events & (EPOLLERR | EPOLLHUP)) { - // Connection will be destroyed on scope exit + close_connection(conn); continue; } @@ -361,9 +364,9 @@ void Server::start_io_threads(std::vector &threads) { perror("setsockopt SO_KEEPALIVE"); } - // Add to epoll with no interests + // Add to epoll struct epoll_event event{}; - event.events = 0; + event.events = EPOLLIN; event.data.fd = fd; if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) { perror("epoll_ctl ADD"); @@ -376,6 +379,8 @@ void Server::start_io_threads(std::vector &threads) { addr, fd, connection_id_.fetch_add(1, std::memory_order_relaxed), epoll_index, &handler_, self_.copy()); + batch[batch_count]->self_ref_ = batch[batch_count].as_weak(); + batch[batch_count]->tsan_release(); batch_events[batch_count] = EPOLLIN; // New connections always start with read batch_count++; @@ -413,7 +418,7 @@ void Server::process_connection_reads(Ref &conn, int events) { if (r < 0) { // Error or EOF - connection should be closed - conn.reset(); + close_connection(conn); return; } @@ -429,34 +434,37 @@ void Server::process_connection_reads(Ref &conn, int events) { void Server::process_connection_writes(Ref &conn, int /*events*/) { assert(conn); - // For simplicity, we always attempt to write when an event fires. We could be - // more precise and skip the write if we detect that we've already seen EAGAIN - // on this connection and we don't have EPOLLOUT. - if (conn->has_messages()) { - bool had_messages = conn->has_messages(); - bool error = conn->writeBytes(); + auto result = conn->write_bytes(); - if (error) { - conn.reset(); // Connection should be closed - return; - } + if (result & Connection::WriteBytesResult::Error) { + close_connection(conn); + return; + } + if (result & Connection::WriteBytesResult::Progress) { // Call handler with connection reference - server retains ownership handler_.on_write_progress(*conn); + } - // Check if buffer became empty (transition from non-empty -> empty) - if (had_messages && !conn->has_messages()) { - handler_.on_write_buffer_drained(*conn); - } + if (result & Connection::WriteBytesResult::Drained) { + // Call handler with connection reference - server retains ownership + handler_.on_write_buffer_drained(*conn); + } - // Check if we should close the connection according to application - if (!conn->has_messages() && conn->should_close()) { - conn.reset(); // Connection should be closed - return; - } + // Check if we should close the connection according to application + if (result & Connection::WriteBytesResult::Close) { + close_connection(conn); + return; } } +void Server::close_connection(Ref &conn) { + conn->close(); + conn.reset(); +} + +static thread_local std::vector conn_ptrs; + void Server::process_connection_batch(int epollfd, std::span> batch, std::span events) { @@ -476,8 +484,7 @@ void Server::process_connection_batch(int epollfd, } // Call batch complete handler with connection pointers - std::vector conn_ptrs; - conn_ptrs.reserve(batch.size()); + conn_ptrs.clear(); for (auto &conn_ref : batch) { if (conn_ref) { conn_ptrs.push_back(conn_ref.get()); @@ -485,26 +492,11 @@ void Server::process_connection_batch(int epollfd, } handler_.on_batch_complete(conn_ptrs); - // Transfer all remaining connections back to epoll + // Transfer all remaining connections back to registry for (auto &conn_ptr : batch) { if (conn_ptr) { int fd = conn_ptr->getFd(); - - struct epoll_event event{}; - if (!conn_ptr->has_messages()) { - event.events = EPOLLIN; - } else { - event.events = EPOLLIN | EPOLLOUT; - } - - event.data.fd = fd; - // Put connection back in registry since handler didn't take ownership - // Must happen before epoll_ctl connection_registry_.store(fd, std::move(conn_ptr)); - if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { - perror("epoll_ctl MOD"); - (void)connection_registry_.remove(fd); - } } } } diff --git a/src/server.hpp b/src/server.hpp index a9a0cfa..581b1a4 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -148,6 +148,8 @@ private: void process_connection_reads(Ref &conn_ptr, int events); void process_connection_writes(Ref &conn_ptr, int events); + void close_connection(Ref &conn); + // Helper for processing a batch of connections with their events void process_connection_batch(int epollfd, std::span> batch, std::span events); diff --git a/tests/test_server.cpp b/tests/test_server.cpp index 4c6fd2a..e614a86 100644 --- a/tests/test_server.cpp +++ b/tests/test_server.cpp @@ -4,7 +4,7 @@ #include "server.hpp" #include -#include +#include #include #include @@ -19,19 +19,16 @@ static std::string_view arena_copy_string(std::string_view str, Arena &arena) { } struct EchoHandler : ConnectionHandler { - std::future f; + Arena arena; + std::span reply; + WeakRef wconn; + std::latch done{1}; void on_data_arrived(std::string_view data, Connection &conn) override { - Arena arena; - auto reply = std::span{arena.allocate(1), 1}; + reply = std::span{arena.allocate(1), 1}; reply[0] = arena_copy_string(data, arena); - f = std::async(std::launch::async, [wconn = conn.get_weak_ref(), reply, - arena = std::move(arena)]() mutable { - if (auto conn = wconn.lock()) { - conn->append_message(reply, std::move(arena)); - } else { - REQUIRE(false); - } - }); + wconn = conn.get_weak_ref(); + CHECK(wconn.lock()); + done.count_down(); } }; @@ -44,16 +41,23 @@ TEST_CASE("Echo test") { auto runThread = std::thread{[&]() { server->run(); }}; - SUBCASE("writes hello back") { - int w = write(fd, "hello", 5); - REQUIRE(w == 5); - char buf[6]; - buf[5] = 0; - int r = read(fd, buf, 5); - REQUIRE(r == 5); - CHECK(std::string(buf) == "hello"); + int w = write(fd, "hello", 5); + REQUIRE(w == 5); + + handler.done.wait(); + if (auto conn = handler.wconn.lock()) { + conn->append_message(std::exchange(handler.reply, {}), + std::move(handler.arena)); + } else { + REQUIRE(false); } + char buf[6]; + buf[5] = 0; + int r = read(fd, buf, 5); + REQUIRE(r == 5); + CHECK(std::string(buf) == "hello"); + close(fd); server->shutdown(); diff --git a/tools/load_tester.cpp b/tools/load_tester.cpp index 5df9210..9339db1 100644 --- a/tools/load_tester.cpp +++ b/tools/load_tester.cpp @@ -297,7 +297,7 @@ struct Connection { } } - bool writeBytes() { + bool write_bytes() { for (;;) { assert(!request.empty()); int w = send(fd, request.data(), request.size(), MSG_NOSIGNAL); @@ -672,7 +672,7 @@ int main(int argc, char *argv[]) { continue; // Let unique_ptr destructor clean up } if (events[i].events & EPOLLOUT) { - bool finished = conn->writeBytes(); + bool finished = conn->write_bytes(); if (conn->error) { continue; } @@ -748,14 +748,14 @@ int main(int argc, char *argv[]) { // Try to write once in the connect thread before handing off to network // threads assert(conn->has_messages()); - bool writeFinished = conn->writeBytes(); + bool write_finished = conn->write_bytes(); if (conn->error) { continue; // Connection failed, destructor will clean up } // Determine the appropriate epoll events based on write result struct epoll_event event{}; - if (writeFinished) { + if (write_finished) { // All data was written, wait for response int shutdown_result = shutdown(conn->fd, SHUT_WR); if (shutdown_result == -1) {