Add unix socket listening mode
This commit is contained in:
@@ -2,6 +2,7 @@
|
||||
#include "arena_allocator.hpp"
|
||||
#include <cstring>
|
||||
#include <string>
|
||||
#include <strings.h>
|
||||
|
||||
// HttpConnectionState implementation
|
||||
HttpConnectionState::HttpConnectionState(
|
||||
@@ -45,7 +46,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
||||
std::unique_ptr<Connection> &conn_ptr) {
|
||||
auto *state = static_cast<HttpConnectionState *>(conn_ptr->user_data);
|
||||
if (!state) {
|
||||
sendErrorResponse(*conn_ptr, 500, "Internal server error");
|
||||
sendErrorResponse(*conn_ptr, 500, "Internal server error", true);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -59,7 +60,7 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
||||
llhttp_execute(&state->parser, data.data(), data.size());
|
||||
|
||||
if (err != HPE_OK) {
|
||||
sendErrorResponse(*conn_ptr, 400, "Bad request");
|
||||
sendErrorResponse(*conn_ptr, 400, "Bad request", true);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -94,6 +95,9 @@ void HttpHandler::on_data_arrived(std::string_view data,
|
||||
case HttpRoute::GET_metrics:
|
||||
handleGetMetrics(*conn_ptr, *state);
|
||||
break;
|
||||
case HttpRoute::GET_ok:
|
||||
handleGetOk(*conn_ptr, *state);
|
||||
break;
|
||||
case HttpRoute::NotFound:
|
||||
default:
|
||||
handleNotFound(*conn_ptr, *state);
|
||||
@@ -127,6 +131,8 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
|
||||
}
|
||||
if (url == "/metrics")
|
||||
return HttpRoute::GET_metrics;
|
||||
if (url == "/ok")
|
||||
return HttpRoute::GET_ok;
|
||||
} else if (method == "POST") {
|
||||
if (url == "/v1/commit")
|
||||
return HttpRoute::POST_commit;
|
||||
@@ -142,71 +148,83 @@ HttpRoute HttpHandler::parseRoute(std::string_view method,
|
||||
}
|
||||
|
||||
// Route handlers (basic implementations)
|
||||
void HttpHandler::handleGetVersion(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleGetVersion(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"version":"0.0.1","leader":"node-1","committed_version":42})");
|
||||
R"({"version":"0.0.1","leader":"node-1","committed_version":42})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handlePostCommit(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handlePostCommit(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Parse commit request from state.body and process
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"request_id":"example","status":"committed","version":43})");
|
||||
R"({"request_id":"example","status":"committed","version":43})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetSubscribe(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleGetSubscribe(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Implement subscription streaming
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"message":"Subscription endpoint - streaming not yet implemented"})");
|
||||
R"({"message":"Subscription endpoint - streaming not yet implemented"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetStatus(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleGetStatus(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract request_id from URL and check status
|
||||
sendJsonResponse(
|
||||
conn, 200,
|
||||
R"({"request_id":"example","status":"committed","version":43})");
|
||||
R"({"request_id":"example","status":"committed","version":43})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handlePutRetention(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handlePutRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Parse retention policy from body and store
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})");
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"created"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetRetention(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleGetRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract policy_id from URL or return all policies
|
||||
sendJsonResponse(conn, 200, R"({"policies":[]})");
|
||||
sendJsonResponse(conn, 200, R"({"policies":[]})", state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleDeleteRetention(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleDeleteRetention(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Extract policy_id from URL and delete
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})");
|
||||
sendJsonResponse(conn, 200, R"({"policy_id":"example","status":"deleted"})",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleGetMetrics(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
void HttpHandler::handleGetMetrics(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
// TODO: Implement metrics collection and formatting
|
||||
sendResponse(conn, 200, "text/plain",
|
||||
"# WeaselDB metrics\nweaseldb_requests_total 0\n");
|
||||
"# WeaselDB metrics\nweaseldb_requests_total 0\n",
|
||||
state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleNotFound(
|
||||
Connection &conn, [[maybe_unused]] const HttpConnectionState &state) {
|
||||
sendErrorResponse(conn, 404, "Not found");
|
||||
void HttpHandler::handleGetOk(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
sendResponse(conn, 200, "text/plain", "OK", state.connection_close);
|
||||
}
|
||||
|
||||
void HttpHandler::handleNotFound(Connection &conn,
|
||||
const HttpConnectionState &state) {
|
||||
sendErrorResponse(conn, 404, "Not found", state.connection_close);
|
||||
}
|
||||
|
||||
// HTTP utility methods
|
||||
void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
std::string_view content_type,
|
||||
std::string_view body) {
|
||||
std::string_view body, bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
|
||||
|
||||
// Build HTTP response using arena
|
||||
@@ -243,7 +261,14 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
response += "Content-Length: ";
|
||||
response += std::to_string(body.size());
|
||||
response += "\r\n";
|
||||
response += "Connection: keep-alive\r\n";
|
||||
|
||||
if (close_connection) {
|
||||
response += "Connection: close\r\n";
|
||||
conn.closeAfterSend(); // Signal connection should be closed after sending
|
||||
} else {
|
||||
response += "Connection: keep-alive\r\n";
|
||||
}
|
||||
|
||||
response += "\r\n";
|
||||
response += body;
|
||||
|
||||
@@ -251,19 +276,21 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
}
|
||||
|
||||
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
|
||||
std::string_view json) {
|
||||
sendResponse(conn, status_code, "application/json", json);
|
||||
std::string_view json,
|
||||
bool close_connection) {
|
||||
sendResponse(conn, status_code, "application/json", json, close_connection);
|
||||
}
|
||||
|
||||
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
|
||||
std::string_view message) {
|
||||
std::string_view message,
|
||||
bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
|
||||
|
||||
std::string json = R"({"error":")";
|
||||
json += message;
|
||||
json += R"("})";
|
||||
|
||||
sendJsonResponse(conn, status_code, json);
|
||||
sendJsonResponse(conn, status_code, json, close_connection);
|
||||
}
|
||||
|
||||
// llhttp callbacks
|
||||
@@ -274,17 +301,27 @@ int HttpHandler::onUrl(llhttp_t *parser, const char *at, size_t length) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int HttpHandler::onHeaderField([[maybe_unused]] llhttp_t *parser,
|
||||
[[maybe_unused]] const char *at,
|
||||
[[maybe_unused]] size_t length) {
|
||||
// TODO: Store headers if needed
|
||||
int HttpHandler::onHeaderField(llhttp_t *parser, const char *at,
|
||||
size_t length) {
|
||||
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||
// Store current header field name for processing in onHeaderValue
|
||||
state->current_header_field = std::string_view(at, length);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int HttpHandler::onHeaderValue([[maybe_unused]] llhttp_t *parser,
|
||||
[[maybe_unused]] const char *at,
|
||||
[[maybe_unused]] size_t length) {
|
||||
// TODO: Store header values if needed
|
||||
int HttpHandler::onHeaderValue(llhttp_t *parser, const char *at,
|
||||
size_t length) {
|
||||
auto *state = static_cast<HttpConnectionState *>(parser->data);
|
||||
std::string_view value(at, length);
|
||||
|
||||
// Check for Connection header
|
||||
if (state->current_header_field.size() == 10 &&
|
||||
strncasecmp(state->current_header_field.data(), "connection", 10) == 0) {
|
||||
if (value.size() == 5 && strncasecmp(value.data(), "close", 5) == 0) {
|
||||
state->connection_close = true;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user