diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 2498c39..c5904ba 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -6,14 +6,17 @@ #include // HttpConnectionState implementation -HttpConnectionState::HttpConnectionState( - [[maybe_unused]] ArenaAllocator &arena) { +HttpConnectionState::HttpConnectionState(ArenaAllocator &arena) + : current_header_field_buf(ArenaStlAllocator(&arena)), + current_header_value_buf(ArenaStlAllocator(&arena)) { llhttp_settings_init(&settings); // Set up llhttp callbacks settings.on_url = HttpHandler::onUrl; settings.on_header_field = HttpHandler::onHeaderField; + settings.on_header_field_complete = HttpHandler::onHeaderFieldComplete; settings.on_header_value = HttpHandler::onHeaderValue; + settings.on_header_value_complete = HttpHandler::onHeaderValueComplete; settings.on_headers_complete = HttpHandler::onHeadersComplete; settings.on_body = HttpHandler::onBody; settings.on_message_complete = HttpHandler::onMessageComplete; @@ -26,18 +29,23 @@ HttpConnectionState::HttpConnectionState( void HttpHandler::on_connection_established(Connection &conn) { // Allocate HTTP state in connection's arena ArenaAllocator &arena = conn.getArena(); - auto *state = arena.construct(arena); + void *mem = arena.allocate_raw(sizeof(HttpConnectionState), + alignof(HttpConnectionState)); + auto *state = new (mem) HttpConnectionState(arena); conn.user_data = state; } void HttpHandler::on_connection_closed(Connection &conn) { // Arena cleanup happens automatically when connection is destroyed + auto *state = static_cast(conn.user_data); + state->~HttpConnectionState(); conn.user_data = nullptr; } void HttpHandler::on_write_progress(std::unique_ptr &conn_ptr) { // Reset arena after all messages have been written for the next request if (conn_ptr->outgoingBytesQueued() == 0) { + on_connection_closed(*conn_ptr); conn_ptr->reset(); on_connection_established(*conn_ptr); } @@ -312,31 +320,49 @@ int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) { int HttpHandler::onHeaderField(llhttp_t *parser, const char *at, size_t length) { auto *state = static_cast(parser->data); - // Store current header field name for processing in onHeaderValue - state->current_header_field = std::string_view(at, length); + // Accumulate header field data + state->current_header_field_buf.append(at, length); + return 0; +} + +int HttpHandler::onHeaderFieldComplete(llhttp_t *parser) { + auto *state = static_cast(parser->data); + state->header_field_complete = true; return 0; } int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at, size_t length) { auto *state = static_cast(parser->data); - std::string_view value(at, length); + // Accumulate header value data + state->current_header_value_buf.append(at, length); + return 0; +} + +int HttpHandler::onHeaderValueComplete(llhttp_t *parser) { + auto *state = static_cast(parser->data); + + if (!state->header_field_complete) { + // Field is not complete yet, wait + return 0; + } + + // Now we have complete header field and value + const auto &field = state->current_header_field_buf; + const auto &value = state->current_header_value_buf; // Check for Connection header - if (state->current_header_field.size() == 10 && - strncasecmp(state->current_header_field.data(), "connection", 10) == 0) { + if (field.size() == 10 && strncasecmp(field.data(), "connection", 10) == 0) { if (value.size() == 5 && strncasecmp(value.data(), "close", 5) == 0) { state->connection_close = true; } } // Check for X-Request-Id header - if (state->current_header_field.size() == 12 && - strncasecmp(state->current_header_field.data(), "x-request-id", 12) == - 0) { + if (field.size() == 12 && + strncasecmp(field.data(), "x-request-id", 12) == 0) { uint64_t id = 0; - for (int i = 0; i < int(length); ++i) { - auto c = at[i]; + for (char c : value) { if (c >= '0' && c <= '9') { id = id * 10 + (c - '0'); } @@ -344,6 +370,11 @@ int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at, state->request_id = id; } + // Clear buffers for next header + state->current_header_field_buf.clear(); + state->current_header_value_buf.clear(); + state->header_field_complete = false; + return 0; } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index e1d5603..20a16cc 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -41,8 +41,14 @@ struct HttpConnectionState { bool message_complete = false; bool connection_close = false; // Client requested connection close HttpRoute route = HttpRoute::NotFound; - std::string_view current_header_field; // Current header being parsed - uint64_t request_id = 0; // X-Request-Id header value + + // Header accumulation buffers (arena-allocated) + using ArenaString = + std::basic_string, ArenaStlAllocator>; + ArenaString current_header_field_buf; + ArenaString current_header_value_buf; + bool header_field_complete = false; + uint64_t request_id = 0; // X-Request-Id header value explicit HttpConnectionState(ArenaAllocator &arena); }; @@ -67,7 +73,9 @@ public: // llhttp callbacks (public for HttpConnectionState access) static int onUrl(llhttp_t *parser, const char *at, size_t length); static int onHeaderField(llhttp_t *parser, const char *at, size_t length); + static int onHeaderFieldComplete(llhttp_t *parser); static int onHeaderValue(llhttp_t *parser, const char *at, size_t length); + static int onHeaderValueComplete(llhttp_t *parser); static int onHeadersComplete(llhttp_t *parser); static int onBody(llhttp_t *parser, const char *at, size_t length); static int onMessageComplete(llhttp_t *parser); diff --git a/tools/load_tester.cpp b/tools/load_tester.cpp index c27e833..c5f7187 100644 --- a/tools/load_tester.cpp +++ b/tools/load_tester.cpp @@ -159,17 +159,17 @@ void signal_handler(int sig) { struct Connection { static const llhttp_settings_t settings; - static std::atomic requestId; + static std::atomic nextRequestId; - char buf[1024]; // Increased size for dynamic request format + char buf[1024]; std::string_view request; - uint64_t currentRequestId; + uint64_t requestId = 0; void initRequest() { - currentRequestId = requestId.fetch_add(1, std::memory_order_relaxed); + requestId = nextRequestId.fetch_add(1, std::memory_order_relaxed); int len = snprintf(buf, sizeof(buf), "GET /ok HTTP/1.1\r\nX-Request-Id: %" PRIu64 "\r\n\r\n", - currentRequestId); + requestId); if (len == -1 || len > int(sizeof(buf))) { abort(); } @@ -203,7 +203,7 @@ struct Connection { bool readBytes() { for (;;) { - char buf[1024]; // Use a reasonable default, configurable via g_config + char buf[64 * (1 << 10)]; // 64 KiB int buf_size = std::min(int(sizeof(buf)), g_config.connection_buf_size); int r = read(fd, buf, buf_size); if (r == -1) { @@ -214,6 +214,7 @@ struct Connection { return false; } } + // printf("read: %.*s\n", r, buf); if (r == 0) { llhttp_finish(&parser); return true; @@ -233,8 +234,8 @@ struct Connection { bool writeBytes() { for (;;) { - int w; - w = write(fd, request.data(), request.size()); + assert(!request.empty()); + int w = write(fd, request.data(), request.size()); if (w == -1) { if (errno == EINTR) { continue; @@ -247,14 +248,12 @@ struct Connection { return true; } assert(w != 0); + // printf("write: %.*s\n", w, request.data()); request = request.substr(w, request.size() - w); if (request.empty()) { ++requestsSent; - TRACE_EVENT("http", "Send request", - perfetto::Flow::Global(currentRequestId)); - if (requestsSent == g_config.requests_per_connection) { - return true; - } + TRACE_EVENT("http", "Send request", perfetto::Flow::Global(requestId)); + return requestsSent == g_config.requests_per_connection; } } } @@ -285,14 +284,40 @@ private: } uint64_t responseId = 0; + std::string headerValueBuffer; + bool expectingResponseId = false; + + int on_header_field(const char *data, size_t s) { + std::string_view field(data, s); + expectingResponseId = (field == "X-Response-ID"); + if (expectingResponseId) { + headerValueBuffer.clear(); + } + return 0; + } + int on_header_value(const char *data, size_t s) { - for (int i = 0; i < int(s); ++i) { - responseId = responseId * 10 + data[i] - '0'; + if (expectingResponseId) { + headerValueBuffer.append(data, s); + } + return 0; + } + + int on_header_value_complete() { + if (expectingResponseId) { + responseId = 0; + for (char c : headerValueBuffer) { + if (c >= '0' && c <= '9') { + responseId = responseId * 10 + (c - '0'); + } + } + expectingResponseId = false; } return 0; } int on_message_complete() { + assert(responseId == requestId); TRACE_EVENT("http", "Receive response", perfetto::Flow::Global(responseId)); responseId = 0; ++responsesReceived; @@ -303,13 +328,16 @@ private: llhttp_t parser; }; -std::atomic Connection::requestId = {}; +std::atomic Connection::nextRequestId = {}; const llhttp_settings_t Connection::settings = []() { llhttp_settings_t settings; llhttp_settings_init(&settings); settings.on_message_complete = callback<&Connection::on_message_complete>; + settings.on_header_field = callback<&Connection::on_header_field>; settings.on_header_value = callback<&Connection::on_header_value>; + settings.on_header_value_complete = + callback<&Connection::on_header_value_complete>; return settings; }(); @@ -514,7 +542,7 @@ int main(int argc, char *argv[]) { pthread_setname_np(pthread_self(), ("network-" + std::to_string(i)).c_str()); while (!g_shutdown.load(std::memory_order_relaxed)) { - struct epoll_event events[64]; // Use a reasonable max size + struct epoll_event events[256]; // Use a reasonable max size int batch_size = std::min(int(sizeof(events) / sizeof(events[0])), g_config.event_batch_size); int eventCount; @@ -528,10 +556,6 @@ int main(int argc, char *argv[]) { perror("epoll_wait"); abort(); // Keep abort for critical errors like server does } - if (eventCount == 0) { - // Timeout - check shutdown flag - break; - } break; } @@ -624,6 +648,7 @@ int main(int argc, char *argv[]) { // Add to epoll with proper events matching server pattern struct epoll_event event{}; + assert(conn->hasMessages()); event.events = EPOLLOUT | EPOLLONESHOT | EPOLLRDHUP; conn->tsan_release(); Connection *raw_conn = conn.release();