diff --git a/src/http_handler.cpp b/src/http_handler.cpp index b16ddc0..5a7f3a4 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -113,8 +113,12 @@ void HttpHandler::on_data_arrived(std::string_view data, break; } - // Connection state will be reset when appropriate by the server - // For now, we don't reset immediately since messages may still be queued + { + auto guard = ok_pipeline.push(1, true); + for (auto &c : guard.batch) { + c = std::move(conn_ptr); + } + } } } diff --git a/src/http_handler.hpp b/src/http_handler.hpp index ca5ff62..1f2555e 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -1,10 +1,14 @@ #pragma once +#include "ThreadPipeline.h" #include "connection.hpp" #include "connection_handler.hpp" +#include "perfetto_categories.hpp" +#include "server.hpp" #include #include #include +#include /** * HTTP routes supported by WeaselDB server. @@ -57,8 +61,34 @@ struct HttpConnectionState { * Supports the WeaselDB REST API endpoints with enum-based routing. */ class HttpHandler : public ConnectionHandler { + ThreadPipeline> ok_pipeline{10, {1}}; + std::thread ok_thread{[this]() { + pthread_setname_np(pthread_self(), "stage-0"); + for (;;) { + auto guard = ok_pipeline.acquire(0, 0); + for (auto &c : guard.batch) { + if (!c) { + return; + } + auto *state = static_cast(c->user_data); + TRACE_EVENT("http", "pipeline thread", + perfetto::Flow::Global(state->request_id)); + Server::releaseBackToServer(std::move(c)); + } + } + }}; + public: HttpHandler() = default; + ~HttpHandler() { + { + auto guard = ok_pipeline.push(1, true); + for (auto &c : guard.batch) { + c = {}; + } + } + ok_thread.join(); + } void on_connection_established(Connection &conn) override; void on_connection_closed(Connection &conn) override; diff --git a/src/main.cpp b/src/main.cpp index d317036..7a686f7 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -258,6 +258,5 @@ int main(int argc, char *argv[]) { server->run(); std::cout << "Server shutdown complete." << std::endl; - g_server = nullptr; return 0; } diff --git a/src/server.cpp b/src/server.cpp index 099f78f..99fd688 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -337,7 +337,7 @@ void Server::start_io_threads(std::vector &threads) { // Process existing connections in batch if (batch_count > 0) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, - {batch_events, (size_t)batch_count}, false); + {batch_events, (size_t)batch_count}); } // Only accept on listen sockets that epoll indicates are ready @@ -372,6 +372,15 @@ void Server::start_io_threads(std::vector &threads) { perror("setsockopt SO_KEEPALIVE"); } + // Add to epoll with no interests + struct epoll_event event{}; + event.events = 0; + event.data.fd = fd; + if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) { + perror("epoll_ctl ADD"); + (void)connection_registry_.remove(fd); + } + // Transfer ownership from registry to batch processing size_t epoll_index = thread_id % epoll_fds_.size(); batch[batch_count] = std::unique_ptr(new Connection( @@ -385,8 +394,7 @@ void Server::start_io_threads(std::vector &threads) { // Process batch if full if (batch_count == config_.server.event_batch_size) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, - {batch_events, (size_t)batch_count}, - true); + {batch_events, (size_t)batch_count}); batch_count = 0; } } // End inner accept loop @@ -395,7 +403,7 @@ void Server::start_io_threads(std::vector &threads) { // Process remaining accepted connections if (batch_count > 0) { process_connection_batch(epollfd, {batch, (size_t)batch_count}, - {batch_events, (size_t)batch_count}, true); + {batch_events, (size_t)batch_count}); batch_count = 0; } } @@ -460,7 +468,7 @@ void Server::process_connection_io(std::unique_ptr &conn, void Server::process_connection_batch( int epollfd, std::span> batch, - std::span events, bool is_new) { + std::span events) { // First process I/O for each connection for (size_t i = 0; i < batch.size(); ++i) { if (batch[i]) { @@ -487,9 +495,8 @@ void Server::process_connection_batch( // Put connection back in registry since handler didn't take ownership. // Must happen before epoll_ctl connection_registry_.store(fd, std::move(conn_ptr)); - int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; - if (epoll_ctl(epollfd, epoll_op, fd, &event) == -1) { - perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD"); + if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) { + perror("epoll_ctl MOD"); (void)connection_registry_.remove(fd); } } diff --git a/src/server.hpp b/src/server.hpp index c5dd44e..268ec51 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -158,7 +158,7 @@ private: // Helper for processing a batch of connections with their events void process_connection_batch(int epollfd, std::span> batch, - std::span events, bool is_new); + std::span events); /** * Called internally to return ownership to the server.