Add one stage pipeline to /ok

This commit is contained in:
2025-08-22 14:28:17 -04:00
parent c536522f21
commit f43e623a7e
5 changed files with 52 additions and 12 deletions

View File

@@ -113,8 +113,12 @@ void HttpHandler::on_data_arrived(std::string_view data,
break; break;
} }
// Connection state will be reset when appropriate by the server {
// For now, we don't reset immediately since messages may still be queued auto guard = ok_pipeline.push(1, true);
for (auto &c : guard.batch) {
c = std::move(conn_ptr);
}
}
} }
} }

View File

@@ -1,10 +1,14 @@
#pragma once #pragma once
#include "ThreadPipeline.h"
#include "connection.hpp" #include "connection.hpp"
#include "connection_handler.hpp" #include "connection_handler.hpp"
#include "perfetto_categories.hpp"
#include "server.hpp"
#include <llhttp.h> #include <llhttp.h>
#include <memory> #include <memory>
#include <string_view> #include <string_view>
#include <thread>
/** /**
* HTTP routes supported by WeaselDB server. * HTTP routes supported by WeaselDB server.
@@ -57,8 +61,34 @@ struct HttpConnectionState {
* Supports the WeaselDB REST API endpoints with enum-based routing. * Supports the WeaselDB REST API endpoints with enum-based routing.
*/ */
class HttpHandler : public ConnectionHandler { class HttpHandler : public ConnectionHandler {
ThreadPipeline<std::unique_ptr<Connection>> ok_pipeline{10, {1}};
std::thread ok_thread{[this]() {
pthread_setname_np(pthread_self(), "stage-0");
for (;;) {
auto guard = ok_pipeline.acquire(0, 0);
for (auto &c : guard.batch) {
if (!c) {
return;
}
auto *state = static_cast<HttpConnectionState *>(c->user_data);
TRACE_EVENT("http", "pipeline thread",
perfetto::Flow::Global(state->request_id));
Server::releaseBackToServer(std::move(c));
}
}
}};
public: public:
HttpHandler() = default; HttpHandler() = default;
~HttpHandler() {
{
auto guard = ok_pipeline.push(1, true);
for (auto &c : guard.batch) {
c = {};
}
}
ok_thread.join();
}
void on_connection_established(Connection &conn) override; void on_connection_established(Connection &conn) override;
void on_connection_closed(Connection &conn) override; void on_connection_closed(Connection &conn) override;

View File

@@ -258,6 +258,5 @@ int main(int argc, char *argv[]) {
server->run(); server->run();
std::cout << "Server shutdown complete." << std::endl; std::cout << "Server shutdown complete." << std::endl;
g_server = nullptr;
return 0; return 0;
} }

View File

@@ -337,7 +337,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
// Process existing connections in batch // Process existing connections in batch
if (batch_count > 0) { if (batch_count > 0) {
process_connection_batch(epollfd, {batch, (size_t)batch_count}, process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, false); {batch_events, (size_t)batch_count});
} }
// Only accept on listen sockets that epoll indicates are ready // Only accept on listen sockets that epoll indicates are ready
@@ -372,6 +372,15 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
perror("setsockopt SO_KEEPALIVE"); perror("setsockopt SO_KEEPALIVE");
} }
// Add to epoll with no interests
struct epoll_event event{};
event.events = 0;
event.data.fd = fd;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, fd, &event) == -1) {
perror("epoll_ctl ADD");
(void)connection_registry_.remove(fd);
}
// Transfer ownership from registry to batch processing // Transfer ownership from registry to batch processing
size_t epoll_index = thread_id % epoll_fds_.size(); size_t epoll_index = thread_id % epoll_fds_.size();
batch[batch_count] = std::unique_ptr<Connection>(new Connection( batch[batch_count] = std::unique_ptr<Connection>(new Connection(
@@ -385,8 +394,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
// Process batch if full // Process batch if full
if (batch_count == config_.server.event_batch_size) { if (batch_count == config_.server.event_batch_size) {
process_connection_batch(epollfd, {batch, (size_t)batch_count}, process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, {batch_events, (size_t)batch_count});
true);
batch_count = 0; batch_count = 0;
} }
} // End inner accept loop } // End inner accept loop
@@ -395,7 +403,7 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
// Process remaining accepted connections // Process remaining accepted connections
if (batch_count > 0) { if (batch_count > 0) {
process_connection_batch(epollfd, {batch, (size_t)batch_count}, process_connection_batch(epollfd, {batch, (size_t)batch_count},
{batch_events, (size_t)batch_count}, true); {batch_events, (size_t)batch_count});
batch_count = 0; batch_count = 0;
} }
} }
@@ -460,7 +468,7 @@ void Server::process_connection_io(std::unique_ptr<Connection> &conn,
void Server::process_connection_batch( void Server::process_connection_batch(
int epollfd, std::span<std::unique_ptr<Connection>> batch, int epollfd, std::span<std::unique_ptr<Connection>> batch,
std::span<const int> events, bool is_new) { std::span<const int> events) {
// First process I/O for each connection // First process I/O for each connection
for (size_t i = 0; i < batch.size(); ++i) { for (size_t i = 0; i < batch.size(); ++i) {
if (batch[i]) { if (batch[i]) {
@@ -487,9 +495,8 @@ void Server::process_connection_batch(
// Put connection back in registry since handler didn't take ownership. // Put connection back in registry since handler didn't take ownership.
// Must happen before epoll_ctl // Must happen before epoll_ctl
connection_registry_.store(fd, std::move(conn_ptr)); connection_registry_.store(fd, std::move(conn_ptr));
int epoll_op = is_new ? EPOLL_CTL_ADD : EPOLL_CTL_MOD; if (epoll_ctl(epollfd, EPOLL_CTL_MOD, fd, &event) == -1) {
if (epoll_ctl(epollfd, epoll_op, fd, &event) == -1) { perror("epoll_ctl MOD");
perror(is_new ? "epoll_ctl ADD" : "epoll_ctl MOD");
(void)connection_registry_.remove(fd); (void)connection_registry_.remove(fd);
} }
} }

View File

@@ -158,7 +158,7 @@ private:
// Helper for processing a batch of connections with their events // Helper for processing a batch of connections with their events
void process_connection_batch(int epollfd, void process_connection_batch(int epollfd,
std::span<std::unique_ptr<Connection>> batch, std::span<std::unique_ptr<Connection>> batch,
std::span<const int> events, bool is_new); std::span<const int> events);
/** /**
* Called internally to return ownership to the server. * Called internally to return ownership to the server.