Add an index to thread pipeline iterators for load balancing

This commit is contained in:
2025-08-22 16:32:48 -04:00
parent f43e623a7e
commit 12d4289568
6 changed files with 96 additions and 52 deletions

View File

@@ -83,13 +83,13 @@ template <class T> struct ThreadPipeline {
using reference = value_type &; using reference = value_type &;
reference operator*() const { reference operator*() const {
return (*ring)[index & (ring->size() - 1)]; return (*ring)[index_ & (ring->size() - 1)];
} }
pointer operator->() const { pointer operator->() const {
return &(*ring)[index & (ring->size() - 1)]; return &(*ring)[index_ & (ring->size() - 1)];
} }
Iterator &operator++() { Iterator &operator++() {
++index; ++index_;
return *this; return *this;
} }
Iterator operator++(int) { Iterator operator++(int) {
@@ -98,7 +98,7 @@ template <class T> struct ThreadPipeline {
return tmp; return tmp;
} }
Iterator &operator--() { Iterator &operator--() {
--index; --index_;
return *this; return *this;
} }
Iterator operator--(int) { Iterator operator--(int) {
@@ -107,61 +107,66 @@ template <class T> struct ThreadPipeline {
return tmp; return tmp;
} }
Iterator &operator+=(difference_type n) { Iterator &operator+=(difference_type n) {
index += n; index_ += n;
return *this; return *this;
} }
Iterator &operator-=(difference_type n) { Iterator &operator-=(difference_type n) {
index -= n; index_ -= n;
return *this; return *this;
} }
Iterator operator+(difference_type n) const { Iterator operator+(difference_type n) const {
return Iterator(index + n, ring); return Iterator(index_ + n, ring);
} }
Iterator operator-(difference_type n) const { Iterator operator-(difference_type n) const {
return Iterator(index - n, ring); return Iterator(index_ - n, ring);
} }
difference_type operator-(const Iterator &rhs) const { difference_type operator-(const Iterator &rhs) const {
assert(ring == rhs.ring); assert(ring == rhs.ring);
return static_cast<difference_type>(index) - return static_cast<difference_type>(index_) -
static_cast<difference_type>(rhs.index); static_cast<difference_type>(rhs.index_);
} }
reference operator[](difference_type n) const { reference operator[](difference_type n) const {
return (*ring)[(index + n) & (ring->size() - 1)]; return (*ring)[(index_ + n) & (ring->size() - 1)];
} }
friend Iterator operator+(difference_type n, const Iterator &iter) { friend Iterator operator+(difference_type n, const Iterator &iter) {
return iter + n; return iter + n;
} }
friend bool operator==(const Iterator &lhs, const Iterator &rhs) { friend bool operator==(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
return lhs.index == rhs.index; return lhs.index_ == rhs.index_;
} }
friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { friend bool operator!=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
return lhs.index != rhs.index; return lhs.index_ != rhs.index_;
} }
friend bool operator<(const Iterator &lhs, const Iterator &rhs) { friend bool operator<(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
// Handle potential uint32_t wraparound by using signed difference // Handle potential uint32_t wraparound by using signed difference
return static_cast<int32_t>(lhs.index - rhs.index) < 0; return static_cast<int32_t>(lhs.index_ - rhs.index_) < 0;
} }
friend bool operator<=(const Iterator &lhs, const Iterator &rhs) { friend bool operator<=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index - rhs.index) <= 0; return static_cast<int32_t>(lhs.index_ - rhs.index_) <= 0;
} }
friend bool operator>(const Iterator &lhs, const Iterator &rhs) { friend bool operator>(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index - rhs.index) > 0; return static_cast<int32_t>(lhs.index_ - rhs.index_) > 0;
} }
friend bool operator>=(const Iterator &lhs, const Iterator &rhs) { friend bool operator>=(const Iterator &lhs, const Iterator &rhs) {
assert(lhs.ring == rhs.ring); assert(lhs.ring == rhs.ring);
return static_cast<int32_t>(lhs.index - rhs.index) >= 0; return static_cast<int32_t>(lhs.index_ - rhs.index_) >= 0;
} }
/// Returns the ring buffer index (0 to ring->size()-1) for this iterator
/// position. Useful for distributing work across multiple threads by
/// using modulo operations.
uint32_t index() const { return index_ & (ring->size() - 1); }
private: private:
Iterator(uint32_t index, std::vector<T> *const ring) Iterator(uint32_t index, std::vector<T> *const ring)
: index(index), ring(ring) {} : index_(index), ring(ring) {}
friend struct Batch; friend struct Batch;
uint32_t index; uint32_t index_;
std::vector<T> *const ring; std::vector<T> *const ring;
}; };

View File

@@ -51,6 +51,22 @@ void HttpHandler::on_write_progress(std::unique_ptr<Connection> &conn_ptr) {
} }
} }
void HttpHandler::on_post_batch(std::span<std::unique_ptr<Connection>> batch) {
int readyCount = 0;
for (int i = 0; i < int(batch.size()); ++i) {
readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0;
}
if (readyCount > 0) {
auto guard = pipeline.push(readyCount, /*block=*/true);
auto outIter = guard.batch.begin();
for (int i = 0; i < int(batch.size()); ++i) {
if (batch[i] && batch[i]->outgoingBytesQueued() > 0) {
*outIter++ = std::move(batch[i]);
}
}
}
}
void HttpHandler::on_data_arrived(std::string_view data, void HttpHandler::on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) { std::unique_ptr<Connection> &conn_ptr) {
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data); auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
@@ -112,13 +128,6 @@ void HttpHandler::on_data_arrived(std::string_view data,
handleNotFound(*conn_ptr, *state); handleNotFound(*conn_ptr, *state);
break; break;
} }
{
auto guard = ok_pipeline.push(1, true);
for (auto &c : guard.batch) {
c = std::move(conn_ptr);
}
}
} }
} }

View File

@@ -61,12 +61,23 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing. * Supports the WeaselDB REST API endpoints with enum-based routing.
*/ */
class HttpHandler : public ConnectionHandler { class HttpHandler : public ConnectionHandler {
ThreadPipeline<std::unique_ptr<Connection>> ok_pipeline{10, {1}}; static constexpr int kFinalStageThreads = 2;
std::thread ok_thread{[this]() { static constexpr int kLogSize = 12;
pthread_setname_np(pthread_self(), "stage-0"); ThreadPipeline<std::unique_ptr<Connection>> pipeline{kLogSize,
{kFinalStageThreads}};
std::vector<std::thread> finalStageThreads;
public:
HttpHandler() {
for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) {
finalStageThreads.emplace_back([this, threadId]() {
pthread_setname_np(pthread_self(),
("stage-0-" + std::to_string(threadId)).c_str());
for (;;) { for (;;) {
auto guard = ok_pipeline.acquire(0, 0); auto guard = pipeline.acquire(0, threadId);
for (auto &c : guard.batch) { for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) {
if ((it.index() % kFinalStageThreads) == threadId) {
auto &c = *it;
if (!c) { if (!c) {
return; return;
} }
@@ -76,18 +87,20 @@ class HttpHandler : public ConnectionHandler {
Server::releaseBackToServer(std::move(c)); Server::releaseBackToServer(std::move(c));
} }
} }
}}; }
});
public: }
HttpHandler() = default; }
~HttpHandler() { ~HttpHandler() {
{ {
auto guard = ok_pipeline.push(1, true); auto guard = pipeline.push(kFinalStageThreads, true);
for (auto &c : guard.batch) { for (auto &c : guard.batch) {
c = {}; c = {};
} }
} }
ok_thread.join(); for (auto &thread : finalStageThreads) {
thread.join();
}
} }
void on_connection_established(Connection &conn) override; void on_connection_established(Connection &conn) override;
@@ -95,6 +108,7 @@ public:
void on_data_arrived(std::string_view data, void on_data_arrived(std::string_view data,
std::unique_ptr<Connection> &conn_ptr) override; std::unique_ptr<Connection> &conn_ptr) override;
void on_write_progress(std::unique_ptr<Connection> &conn_ptr) override; void on_write_progress(std::unique_ptr<Connection> &conn_ptr) override;
void on_post_batch(std::span<std::unique_ptr<Connection>> /*batch*/) override;
// Route parsing (public for testing) // Route parsing (public for testing)
static HttpRoute parseRoute(std::string_view method, std::string_view url); static HttpRoute parseRoute(std::string_view method, std::string_view url);

View File

@@ -411,7 +411,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
} }
} }
void Server::process_connection_io(std::unique_ptr<Connection> &conn, void Server::process_connection_reads(std::unique_ptr<Connection> &conn,
int events) { int events) {
assert(conn); assert(conn);
// Handle EPOLLIN - read data and process it // Handle EPOLLIN - read data and process it
@@ -440,7 +440,11 @@ void Server::process_connection_io(std::unique_ptr<Connection> &conn,
return; return;
} }
} }
}
void Server::process_connection_writes(std::unique_ptr<Connection> &conn,
int events) {
assert(conn);
// Send immediately if we have outgoing messages (either from EPOLLOUT or // Send immediately if we have outgoing messages (either from EPOLLOUT or
// after reading) // after reading)
if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) {
@@ -469,10 +473,18 @@ void Server::process_connection_io(std::unique_ptr<Connection> &conn,
void Server::process_connection_batch( void Server::process_connection_batch(
int epollfd, std::span<std::unique_ptr<Connection>> batch, int epollfd, std::span<std::unique_ptr<Connection>> batch,
std::span<const int> events) { std::span<const int> events) {
// First process I/O for each connection
// First process writes for each connection
for (size_t i = 0; i < batch.size(); ++i) { for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) { if (batch[i]) {
process_connection_io(batch[i], events[i]); process_connection_writes(batch[i], events[i]);
}
}
// Then process reads for each connection
for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) {
process_connection_reads(batch[i], events[i]);
} }
} }

View File

@@ -153,7 +153,10 @@ private:
int get_epoll_for_thread(int thread_id) const; int get_epoll_for_thread(int thread_id) const;
// Helper for processing connection I/O // Helper for processing connection I/O
void process_connection_io(std::unique_ptr<Connection> &conn_ptr, int events); void process_connection_reads(std::unique_ptr<Connection> &conn_ptr,
int events);
void process_connection_writes(std::unique_ptr<Connection> &conn_ptr,
int events);
// Helper for processing a batch of connections with their events // Helper for processing a batch of connections with their events
void process_connection_batch(int epollfd, void process_connection_batch(int epollfd,

View File

@@ -7,9 +7,10 @@ port = 8080
# Maximum request size in bytes (for 413 Content Too Large responses) # Maximum request size in bytes (for 413 Content Too Large responses)
max_request_size_bytes = 1048576 # 1MB max_request_size_bytes = 1048576 # 1MB
# Number of I/O threads for handling connections and network events # Number of I/O threads for handling connections and network events
io_threads = 12 io_threads = 8
epoll_instances = 8
# Event batch size for epoll processing # Event batch size for epoll processing
event_batch_size = 32 event_batch_size = 64
[commit] [commit]
# Minimum length for request_id to ensure sufficient entropy # Minimum length for request_id to ensure sufficient entropy