Let WaitIfUpstreamIdle spin long enough to stay saturated by load_tester

This commit is contained in:
2025-08-26 15:37:04 -04:00
parent 6dbf29d1e1
commit 6e48a0ff9a
4 changed files with 24 additions and 11 deletions

View File

@@ -1,12 +1,14 @@
#include <nanobench.h> #include <nanobench.h>
#include "../src/loop_iterations.h"
int main() { int main() {
constexpr int loopIterations = 800; ankerl::nanobench::Bench bench;
ankerl::nanobench::Bench().run( bench.minEpochIterations(100000);
"volatile loop to " + std::to_string(loopIterations), [&] { bench.run("volatile loop to " + std::to_string(loopIterations), [&] {
for (volatile int i = 0; i < loopIterations; i = i + 1) for (volatile int i = 0; i < loopIterations; i = i + 1)
; ;
}); });
return 0; return 0;
} }

View File

@@ -8,6 +8,7 @@
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp" #include "connection_handler.hpp"
#include "loop_iterations.h"
#include "perfetto_categories.hpp" #include "perfetto_categories.hpp"
#include "server.hpp" #include "server.hpp"
#include "thread_pipeline.hpp" #include "thread_pipeline.hpp"
@@ -108,7 +109,7 @@ struct HttpHandler : ConnectionHandler {
if (!c) { if (!c) {
return; return;
} }
for (volatile int i = 0; i < 1200; i = i + 1) for (volatile int i = 0; i < loopIterations; i = i + 1)
; ;
} }
} }
@@ -152,7 +153,7 @@ private:
static constexpr int kFinalStageThreads = 2; static constexpr int kFinalStageThreads = 2;
static constexpr int kLogSize = 12; static constexpr int kLogSize = 12;
StaticThreadPipeline<std::unique_ptr<Connection>, StaticThreadPipeline<std::unique_ptr<Connection>,
WaitStrategy::WaitIfStageEmpty, 1, 2> WaitStrategy::WaitIfUpstreamIdle, 1, 2>
pipeline{kLogSize}; pipeline{kLogSize};
std::thread stage0Thread; std::thread stage0Thread;
std::vector<std::thread> finalStageThreads; std::vector<std::thread> finalStageThreads;

3
src/loop_iterations.h Normal file
View File

@@ -0,0 +1,3 @@
#pragma once
constexpr int loopIterations = 1550;

View File

@@ -114,10 +114,17 @@ uint32_t calculate_safe_len(
// Empty - busy wait // Empty - busy wait
} else if constexpr (wait_strategy == } else if constexpr (wait_strategy ==
WaitStrategy::WaitIfUpstreamIdle) { WaitStrategy::WaitIfUpstreamIdle) {
auto push = pushes.load(std::memory_order_relaxed); // We're allowed to spin as long as we eventually go to 0% cpu
if (push == thread.local_pops) { // usage on idle
pushes.wait(push, std::memory_order_relaxed); uint32_t push;
for (int i = 0; i < 100000; ++i) {
push = pushes.load(std::memory_order_relaxed);
if (push != thread.local_pops) {
goto dont_wait;
}
} }
pushes.wait(push, std::memory_order_relaxed);
dont_wait:;
} else { } else {
static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty); static_assert(wait_strategy == WaitStrategy::WaitIfStageEmpty);
last_push.wait(thread.last_push_read[Is], last_push.wait(thread.last_push_read[Is],