Measure latency
This commit is contained in:
@@ -145,6 +145,12 @@ sem_t connectionLimit;
|
||||
std::atomic<bool> g_shutdown{false};
|
||||
std::atomic<int> g_connect_threads{0};
|
||||
|
||||
// Latency statistics
|
||||
std::atomic<uint64_t> g_total_requests{0};
|
||||
std::atomic<double> g_total_latency{0.0};
|
||||
std::atomic<double> g_min_latency{1e9};
|
||||
std::atomic<double> g_max_latency{0.0};
|
||||
|
||||
void signal_handler(int sig) {
|
||||
if (sig == SIGTERM || sig == SIGINT) {
|
||||
g_shutdown.store(true, std::memory_order_relaxed);
|
||||
@@ -215,6 +221,7 @@ struct Connection {
|
||||
Connection(int fd, int64_t id) : fd(fd), id(id) {
|
||||
llhttp_init(&parser, HTTP_RESPONSE, &settings);
|
||||
parser.data = this;
|
||||
requestStartTime = now(); // Record when first request starts
|
||||
initRequest();
|
||||
}
|
||||
|
||||
@@ -297,6 +304,9 @@ struct Connection {
|
||||
const int fd;
|
||||
const int64_t id;
|
||||
|
||||
// Latency tracking
|
||||
double requestStartTime = 0.0;
|
||||
|
||||
#if __has_feature(thread_sanitizer)
|
||||
void tsan_acquire() { tsan_sync.load(std::memory_order_acquire); }
|
||||
void tsan_release() { tsan_sync.store(0, std::memory_order_release); }
|
||||
@@ -355,8 +365,38 @@ private:
|
||||
int on_message_complete() {
|
||||
assert(responseId == requestId);
|
||||
TRACE_EVENT("http", "Receive response", perfetto::Flow::Global(responseId));
|
||||
|
||||
// Calculate and record latency
|
||||
if (requestStartTime > 0.0) {
|
||||
double responseTime = now();
|
||||
double latency = responseTime - requestStartTime;
|
||||
|
||||
// Update global statistics atomically
|
||||
g_total_requests.fetch_add(1, std::memory_order_relaxed);
|
||||
g_total_latency.fetch_add(latency, std::memory_order_relaxed);
|
||||
|
||||
// Update min/max latency with compare-and-swap
|
||||
double current_min = g_min_latency.load(std::memory_order_relaxed);
|
||||
while (latency < current_min &&
|
||||
!g_min_latency.compare_exchange_weak(current_min, latency,
|
||||
std::memory_order_relaxed)) {
|
||||
// Retry if another thread updated min_latency
|
||||
}
|
||||
|
||||
double current_max = g_max_latency.load(std::memory_order_relaxed);
|
||||
while (latency > current_max &&
|
||||
!g_max_latency.compare_exchange_weak(current_max, latency,
|
||||
std::memory_order_relaxed)) {
|
||||
// Retry if another thread updated max_latency
|
||||
}
|
||||
}
|
||||
|
||||
responseId = 0;
|
||||
++responsesReceived;
|
||||
|
||||
// For subsequent requests, start timing from when the last response was
|
||||
// received
|
||||
requestStartTime = now();
|
||||
initRequest();
|
||||
return 0;
|
||||
}
|
||||
@@ -709,9 +749,26 @@ int main(int argc, char *argv[]) {
|
||||
|
||||
double currTime = now();
|
||||
double currConnections = connectionId.load(std::memory_order_relaxed);
|
||||
printf("req/s: %f\n", (currConnections - prevConnections) /
|
||||
(currTime - prevTime) *
|
||||
g_config.requests_per_connection);
|
||||
double throughput = (currConnections - prevConnections) /
|
||||
(currTime - prevTime) *
|
||||
g_config.requests_per_connection;
|
||||
|
||||
// Get latency statistics
|
||||
uint64_t total_requests = g_total_requests.load(std::memory_order_relaxed);
|
||||
double total_latency = g_total_latency.load(std::memory_order_relaxed);
|
||||
double min_latency = g_min_latency.load(std::memory_order_relaxed);
|
||||
double max_latency = g_max_latency.load(std::memory_order_relaxed);
|
||||
|
||||
printf("req/s: %.2f", throughput);
|
||||
|
||||
if (total_requests > 0) {
|
||||
double avg_latency = total_latency / total_requests;
|
||||
printf(" | latency: avg=%.3fms min=%.3fms max=%.3fms (n=%lu)",
|
||||
avg_latency * 1000.0, min_latency * 1000.0, max_latency * 1000.0,
|
||||
total_requests);
|
||||
}
|
||||
|
||||
printf("\n");
|
||||
|
||||
// Check if we should exit based on duration
|
||||
if (g_config.duration > 0 && (currTime - startTime) >= g_config.duration) {
|
||||
|
||||
Reference in New Issue
Block a user