From 729fe5c0334df5d2eca01258c146680118767b4e Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Wed, 20 Aug 2025 15:45:38 -0400 Subject: [PATCH] Measure latency --- tools/load_tester.cpp | 63 ++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/tools/load_tester.cpp b/tools/load_tester.cpp index ece392d..bffcc08 100644 --- a/tools/load_tester.cpp +++ b/tools/load_tester.cpp @@ -145,6 +145,12 @@ sem_t connectionLimit; std::atomic g_shutdown{false}; std::atomic g_connect_threads{0}; +// Latency statistics +std::atomic g_total_requests{0}; +std::atomic g_total_latency{0.0}; +std::atomic g_min_latency{1e9}; +std::atomic g_max_latency{0.0}; + void signal_handler(int sig) { if (sig == SIGTERM || sig == SIGINT) { g_shutdown.store(true, std::memory_order_relaxed); @@ -215,6 +221,7 @@ struct Connection { Connection(int fd, int64_t id) : fd(fd), id(id) { llhttp_init(&parser, HTTP_RESPONSE, &settings); parser.data = this; + requestStartTime = now(); // Record when first request starts initRequest(); } @@ -297,6 +304,9 @@ struct Connection { const int fd; const int64_t id; + // Latency tracking + double requestStartTime = 0.0; + #if __has_feature(thread_sanitizer) void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); } void tsan_release() { tsan_sync.store(0, std::memory_order_release); } @@ -355,8 +365,38 @@ private: int on_message_complete() { assert(responseId == requestId); TRACE_EVENT("http", "Receive response", perfetto::Flow::Global(responseId)); + + // Calculate and record latency + if (requestStartTime > 0.0) { + double responseTime = now(); + double latency = responseTime - requestStartTime; + + // Update global statistics atomically + g_total_requests.fetch_add(1, std::memory_order_relaxed); + g_total_latency.fetch_add(latency, std::memory_order_relaxed); + + // Update min/max latency with compare-and-swap + double current_min = g_min_latency.load(std::memory_order_relaxed); + while (latency < current_min && + !g_min_latency.compare_exchange_weak(current_min, latency, + std::memory_order_relaxed)) { + // Retry if another thread updated min_latency + } + + double current_max = g_max_latency.load(std::memory_order_relaxed); + while (latency > current_max && + !g_max_latency.compare_exchange_weak(current_max, latency, + std::memory_order_relaxed)) { + // Retry if another thread updated max_latency + } + } + responseId = 0; ++responsesReceived; + + // For subsequent requests, start timing from when the last response was + // received + requestStartTime = now(); initRequest(); return 0; } @@ -709,9 +749,26 @@ int main(int argc, char *argv[]) { double currTime = now(); double currConnections = connectionId.load(std::memory_order_relaxed); - printf("req/s: %f\n", (currConnections - prevConnections) / - (currTime - prevTime) * - g_config.requests_per_connection); + double throughput = (currConnections - prevConnections) / + (currTime - prevTime) * + g_config.requests_per_connection; + + // Get latency statistics + uint64_t total_requests = g_total_requests.load(std::memory_order_relaxed); + double total_latency = g_total_latency.load(std::memory_order_relaxed); + double min_latency = g_min_latency.load(std::memory_order_relaxed); + double max_latency = g_max_latency.load(std::memory_order_relaxed); + + printf("req/s: %.2f", throughput); + + if (total_requests > 0) { + double avg_latency = total_latency / total_requests; + printf(" | latency: avg=%.3fms min=%.3fms max=%.3fms (n=%lu)", + avg_latency * 1000.0, min_latency * 1000.0, max_latency * 1000.0, + total_requests); + } + + printf("\n"); // Check if we should exit based on duration if (g_config.duration > 0 && (currTime - startTime) >= g_config.duration) {