diff --git a/src/ThreadPipeline.h b/src/ThreadPipeline.h index 4d74390..63e79e6 100644 --- a/src/ThreadPipeline.h +++ b/src/ThreadPipeline.h @@ -83,13 +83,13 @@ template struct ThreadPipeline { using reference = value_type &; reference operator*() const { - return (*ring)[index & (ring->size() - 1)]; + return (*ring)[index_ & (ring->size() - 1)]; } pointer operator->() const { - return &(*ring)[index & (ring->size() - 1)]; + return &(*ring)[index_ & (ring->size() - 1)]; } Iterator &operator++() { - ++index; + ++index_; return *this; } Iterator operator++(int) { @@ -98,7 +98,7 @@ template struct ThreadPipeline { return tmp; } Iterator &operator--() { - --index; + --index_; return *this; } Iterator operator--(int) { @@ -107,61 +107,66 @@ template struct ThreadPipeline { return tmp; } Iterator &operator+=(difference_type n) { - index += n; + index_ += n; return *this; } Iterator &operator-=(difference_type n) { - index -= n; + index_ -= n; return *this; } Iterator operator+(difference_type n) const { - return Iterator(index + n, ring); + return Iterator(index_ + n, ring); } Iterator operator-(difference_type n) const { - return Iterator(index - n, ring); + return Iterator(index_ - n, ring); } difference_type operator-(const Iterator &rhs) const { assert(ring == rhs.ring); - return static_cast(index) - - static_cast(rhs.index); + return static_cast(index_) - + static_cast(rhs.index_); } reference operator[](difference_type n) const { - return (*ring)[(index + n) & (ring->size() - 1)]; + return (*ring)[(index_ + n) & (ring->size() - 1)]; } friend Iterator operator+(difference_type n, const Iterator &iter) { return iter + n; } friend bool operator==(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - return lhs.index == rhs.index; + return lhs.index_ == rhs.index_; } friend bool operator!=(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - return lhs.index != rhs.index; + return lhs.index_ != rhs.index_; } friend bool operator<(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); // Handle potential uint32_t wraparound by using signed difference - return static_cast(lhs.index - rhs.index) < 0; + return static_cast(lhs.index_ - rhs.index_) < 0; } friend bool operator<=(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - return static_cast(lhs.index - rhs.index) <= 0; + return static_cast(lhs.index_ - rhs.index_) <= 0; } friend bool operator>(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - return static_cast(lhs.index - rhs.index) > 0; + return static_cast(lhs.index_ - rhs.index_) > 0; } friend bool operator>=(const Iterator &lhs, const Iterator &rhs) { assert(lhs.ring == rhs.ring); - return static_cast(lhs.index - rhs.index) >= 0; + return static_cast(lhs.index_ - rhs.index_) >= 0; } + /// Returns the ring buffer index (0 to ring->size()-1) for this iterator + /// position. Useful for distributing work across multiple threads by + /// using modulo operations. + uint32_t index() const { return index_ & (ring->size() - 1); } + private: Iterator(uint32_t index, std::vector *const ring) - : index(index), ring(ring) {} + : index_(index), ring(ring) {} friend struct Batch; - uint32_t index; + uint32_t index_; std::vector *const ring; }; diff --git a/src/http_handler.cpp b/src/http_handler.cpp index 5a7f3a4..ea8c61e 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -51,6 +51,22 @@ void HttpHandler::on_write_progress(std::unique_ptr &conn_ptr) { } } +void HttpHandler::on_post_batch(std::span> batch) { + int readyCount = 0; + for (int i = 0; i < int(batch.size()); ++i) { + readyCount += batch[i] && batch[i]->outgoingBytesQueued() > 0; + } + if (readyCount > 0) { + auto guard = pipeline.push(readyCount, /*block=*/true); + auto outIter = guard.batch.begin(); + for (int i = 0; i < int(batch.size()); ++i) { + if (batch[i] && batch[i]->outgoingBytesQueued() > 0) { + *outIter++ = std::move(batch[i]); + } + } + } +} + void HttpHandler::on_data_arrived(std::string_view data, std::unique_ptr &conn_ptr) { auto *state = static_cast(conn_ptr->user_data); @@ -112,13 +128,6 @@ void HttpHandler::on_data_arrived(std::string_view data, handleNotFound(*conn_ptr, *state); break; } - - { - auto guard = ok_pipeline.push(1, true); - for (auto &c : guard.batch) { - c = std::move(conn_ptr); - } - } } } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 1f2555e..01af22c 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -61,33 +61,46 @@ struct HttpConnectionState { * Supports the WeaselDB REST API endpoints with enum-based routing. */ class HttpHandler : public ConnectionHandler { - ThreadPipeline> ok_pipeline{10, {1}}; - std::thread ok_thread{[this]() { - pthread_setname_np(pthread_self(), "stage-0"); - for (;;) { - auto guard = ok_pipeline.acquire(0, 0); - for (auto &c : guard.batch) { - if (!c) { - return; - } - auto *state = static_cast(c->user_data); - TRACE_EVENT("http", "pipeline thread", - perfetto::Flow::Global(state->request_id)); - Server::releaseBackToServer(std::move(c)); - } - } - }}; + static constexpr int kFinalStageThreads = 2; + static constexpr int kLogSize = 12; + ThreadPipeline> pipeline{kLogSize, + {kFinalStageThreads}}; + std::vector finalStageThreads; public: - HttpHandler() = default; + HttpHandler() { + for (int threadId = 0; threadId < kFinalStageThreads; ++threadId) { + finalStageThreads.emplace_back([this, threadId]() { + pthread_setname_np(pthread_self(), + ("stage-0-" + std::to_string(threadId)).c_str()); + for (;;) { + auto guard = pipeline.acquire(0, threadId); + for (auto it = guard.batch.begin(); it != guard.batch.end(); ++it) { + if ((it.index() % kFinalStageThreads) == threadId) { + auto &c = *it; + if (!c) { + return; + } + auto *state = static_cast(c->user_data); + TRACE_EVENT("http", "pipeline thread", + perfetto::Flow::Global(state->request_id)); + Server::releaseBackToServer(std::move(c)); + } + } + } + }); + } + } ~HttpHandler() { { - auto guard = ok_pipeline.push(1, true); + auto guard = pipeline.push(kFinalStageThreads, true); for (auto &c : guard.batch) { c = {}; } } - ok_thread.join(); + for (auto &thread : finalStageThreads) { + thread.join(); + } } void on_connection_established(Connection &conn) override; @@ -95,6 +108,7 @@ public: void on_data_arrived(std::string_view data, std::unique_ptr &conn_ptr) override; void on_write_progress(std::unique_ptr &conn_ptr) override; + void on_post_batch(std::span> /*batch*/) override; // Route parsing (public for testing) static HttpRoute parseRoute(std::string_view method, std::string_view url); diff --git a/src/server.cpp b/src/server.cpp index 99fd688..6ae6637 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -411,8 +411,8 @@ void Server::start_io_threads(std::vector &threads) { } } -void Server::process_connection_io(std::unique_ptr &conn, - int events) { +void Server::process_connection_reads(std::unique_ptr &conn, + int events) { assert(conn); // Handle EPOLLIN - read data and process it if (events & EPOLLIN) { @@ -440,7 +440,11 @@ void Server::process_connection_io(std::unique_ptr &conn, return; } } +} +void Server::process_connection_writes(std::unique_ptr &conn, + int events) { + assert(conn); // Send immediately if we have outgoing messages (either from EPOLLOUT or // after reading) if ((events & EPOLLOUT) || ((events & EPOLLIN) && conn->hasMessages())) { @@ -469,10 +473,18 @@ void Server::process_connection_io(std::unique_ptr &conn, void Server::process_connection_batch( int epollfd, std::span> batch, std::span events) { - // First process I/O for each connection + + // First process writes for each connection for (size_t i = 0; i < batch.size(); ++i) { if (batch[i]) { - process_connection_io(batch[i], events[i]); + process_connection_writes(batch[i], events[i]); + } + } + + // Then process reads for each connection + for (size_t i = 0; i < batch.size(); ++i) { + if (batch[i]) { + process_connection_reads(batch[i], events[i]); } } diff --git a/src/server.hpp b/src/server.hpp index 268ec51..33afae7 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -153,7 +153,10 @@ private: int get_epoll_for_thread(int thread_id) const; // Helper for processing connection I/O - void process_connection_io(std::unique_ptr &conn_ptr, int events); + void process_connection_reads(std::unique_ptr &conn_ptr, + int events); + void process_connection_writes(std::unique_ptr &conn_ptr, + int events); // Helper for processing a batch of connections with their events void process_connection_batch(int epollfd, diff --git a/test_config.toml b/test_config.toml index 924ba30..2915869 100644 --- a/test_config.toml +++ b/test_config.toml @@ -7,9 +7,10 @@ port = 8080 # Maximum request size in bytes (for 413 Content Too Large responses) max_request_size_bytes = 1048576 # 1MB # Number of I/O threads for handling connections and network events -io_threads = 12 +io_threads = 8 +epoll_instances = 8 # Event batch size for epoll processing -event_batch_size = 32 +event_batch_size = 64 [commit] # Minimum length for request_id to ensure sufficient entropy