Call epoll_ctl in release stage

This commit is contained in:
2025-09-14 16:28:12 -04:00
parent 16c7ee0408
commit 7ef54a2d08
3 changed files with 121 additions and 26 deletions

View File

@@ -122,19 +122,21 @@ void HttpHandler::on_batch_complete(std::span<Connection *const> batch) {
state->parsing_commit && state->basic_validation_passed) {
*out_iter++ =
CommitEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->commit_request.get()};
state->connection_close, state->commit_request.get(),
std::move(state->arena)};
}
// Create StatusEntry for status requests
else if (state->route == HttpRoute::GetStatus) {
*out_iter++ =
StatusEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close, state->status_request_id};
state->connection_close, state->status_request_id,
std::move(state->arena)};
}
// Create HealthCheckEntry for health check requests
else if (state->route == HttpRoute::GetOk) {
*out_iter++ =
HealthCheckEntry{conn->get_weak_ref(), state->http_request_id,
state->connection_close};
state->connection_close, std::move(state->arena)};
}
}
}
@@ -488,6 +490,57 @@ void HttpHandler::send_error_response(MessageSender &conn, int status_code,
http_request_id, close_connection);
}
std::span<std::string_view>
HttpHandler::format_response(int status_code, std::string_view content_type,
std::string_view body, Arena &response_arena,
int64_t http_request_id, bool close_connection) {
// Status text
std::string_view status_text;
switch (status_code) {
case 200:
status_text = "OK";
break;
case 400:
status_text = "Bad Request";
break;
case 404:
status_text = "Not Found";
break;
case 500:
status_text = "Internal Server Error";
break;
default:
status_text = "Unknown";
break;
}
const char *connection_header = close_connection ? "close" : "keep-alive";
auto response = std::span{response_arena.allocate<std::string_view>(1), 1};
response[0] =
format(response_arena,
"HTTP/1.1 %d %.*s\r\n"
"Content-Type: %.*s\r\n"
"Content-Length: %zu\r\n"
"X-Response-ID: %ld\r\n"
"Connection: %s\r\n"
"\r\n%.*s",
status_code, static_cast<int>(status_text.size()),
status_text.data(), static_cast<int>(content_type.size()),
content_type.data(), body.size(), http_request_id,
connection_header, static_cast<int>(body.size()), body.data());
return response;
}
std::span<std::string_view> HttpHandler::format_json_response(
int status_code, std::string_view json, Arena &response_arena,
int64_t http_request_id, bool close_connection) {
return format_response(status_code, "application/json", json, response_arena,
http_request_id, close_connection);
}
// llhttp callbacks
int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
auto *state = static_cast<HttpConnectionState *>(parser->data);
@@ -856,26 +909,26 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
const CommitRequest &commit_request = *commit_entry.commit_request;
Arena response_arena;
// Generate success response with actual assigned version using
// request arena
std::string_view response;
// Generate success response with actual assigned version
if (commit_request.request_id().has_value()) {
response = format(
response_arena,
commit_entry.request_arena,
R"({"request_id":"%.*s","status":"committed","version":%ld,"leader_id":"leader123"})",
static_cast<int>(commit_request.request_id().value().size()),
commit_request.request_id().value().data(),
commit_entry.assigned_version);
} else {
response = format(
response_arena,
commit_entry.request_arena,
R"({"status":"committed","version":%ld,"leader_id":"leader123"})",
commit_entry.assigned_version);
}
send_json_response(
*conn_ref, 200, response, std::move(response_arena),
// Format response but don't send yet - store for release stage
commit_entry.response_message = format_json_response(
200, response, commit_entry.request_arena,
commit_entry.http_request_id, commit_entry.connection_close);
return false; // Continue processing
@@ -898,10 +951,11 @@ bool HttpHandler::process_persist_batch(BatchType &batch) {
"http", "persist_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Generate OK response
send_response(*conn_ref, 200, "text/plain", "OK", Arena{},
health_check_entry.http_request_id,
health_check_entry.connection_close);
// Format OK response but don't send yet - store for release stage
health_check_entry.response_message = format_response(
200, "text/plain", "OK", health_check_entry.request_arena,
health_check_entry.http_request_id,
health_check_entry.connection_close);
return false; // Continue processing
}
@@ -944,6 +998,11 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
TRACE_EVENT("http", "release_commit",
perfetto::Flow::Global(commit_entry.http_request_id));
// Send the response that was formatted in persist stage
conn_ref->append_message(commit_entry.response_message,
std::move(commit_entry.request_arena),
commit_entry.connection_close);
return false; // Continue processing
} else if constexpr (std::is_same_v<T, StatusEntry>) {
// Process status entry: return connection to server
@@ -975,6 +1034,12 @@ bool HttpHandler::process_release_batch(BatchType &batch) {
"http", "release_health_check",
perfetto::Flow::Global(health_check_entry.http_request_id));
// Send the response that was formatted in persist stage
conn_ref->append_message(
health_check_entry.response_message,
std::move(health_check_entry.request_arena),
health_check_entry.connection_close);
return false; // Continue processing
}

View File

@@ -169,7 +169,7 @@ private:
// (string_views into arena)
// Main commit processing pipeline: sequence -> resolve -> persist -> release
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1, 1, 1,
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfStageEmpty, 1, 1, 1,
1>
commitPipeline{lg_size};
@@ -181,8 +181,8 @@ private:
// Pipeline stage processing methods (batch-based)
using BatchType =
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfUpstreamIdle, 1,
1, 1, 1>::Batch;
StaticThreadPipeline<PipelineEntry, WaitStrategy::WaitIfStageEmpty, 1, 1,
1, 1>::Batch;
bool process_sequence_batch(BatchType &batch);
bool process_resolve_batch(BatchType &batch);
bool process_persist_batch(BatchType &batch);
@@ -225,4 +225,14 @@ private:
std::string_view message,
Arena response_arena, int64_t http_request_id,
bool close_connection);
// Helper functions for formatting responses without sending
static std::span<std::string_view>
format_response(int status_code, std::string_view content_type,
std::string_view body, Arena &response_arena,
int64_t http_request_id, bool close_connection);
static std::span<std::string_view>
format_json_response(int status_code, std::string_view json,
Arena &response_arena, int64_t http_request_id,
bool close_connection);
};

View File

@@ -1,5 +1,6 @@
#pragma once
#include "arena.hpp"
#include "connection.hpp"
#include <variant>
@@ -19,14 +20,21 @@ struct CommitEntry {
// Copied HTTP state (pipeline threads cannot access connection user_data)
int64_t http_request_id = 0;
bool connection_close = false;
const CommitRequest *commit_request =
nullptr; // Points to connection's arena data
const CommitRequest *commit_request = nullptr; // Points to request_arena data
// Request arena contains parsed request data and formatted response
Arena request_arena;
// Response data (set by persist stage, consumed by release stage)
// Points to response formatted in request_arena
std::span<std::string_view> response_message;
CommitEntry() = default; // Default constructor for variant
explicit CommitEntry(WeakRef<MessageSender> conn, int64_t req_id,
bool close_conn, const CommitRequest *req)
bool close_conn, const CommitRequest *req, Arena arena)
: connection(std::move(conn)), http_request_id(req_id),
connection_close(close_conn), commit_request(req) {}
connection_close(close_conn), commit_request(req),
request_arena(std::move(arena)) {}
};
/**
@@ -40,13 +48,18 @@ struct StatusEntry {
// Copied HTTP state
int64_t http_request_id = 0;
bool connection_close = false;
std::string_view status_request_id; // Points to connection's arena data
std::string_view status_request_id; // Points to request_arena data
// Request arena for HTTP request data
Arena request_arena;
StatusEntry() = default; // Default constructor for variant
explicit StatusEntry(WeakRef<MessageSender> conn, int64_t req_id,
bool close_conn, std::string_view request_id)
bool close_conn, std::string_view request_id,
Arena arena)
: connection(std::move(conn)), http_request_id(req_id),
connection_close(close_conn), status_request_id(request_id) {}
connection_close(close_conn), status_request_id(request_id),
request_arena(std::move(arena)) {}
};
/**
@@ -61,11 +74,18 @@ struct HealthCheckEntry {
int64_t http_request_id = 0;
bool connection_close = false;
// Request arena for formatting response
Arena request_arena;
// Response data (set by persist stage, consumed by release stage)
// Points to response formatted in request_arena
std::span<std::string_view> response_message;
HealthCheckEntry() = default; // Default constructor for variant
explicit HealthCheckEntry(WeakRef<MessageSender> conn, int64_t req_id,
bool close_conn)
bool close_conn, Arena arena)
: connection(std::move(conn)), http_request_id(req_id),
connection_close(close_conn) {}
connection_close(close_conn), request_arena(std::move(arena)) {}
};
/**