From ee721c7753ef103c51b45a322293003b68c6043e Mon Sep 17 00:00:00 2001 From: Andrew Noyes Date: Sun, 24 Aug 2025 16:21:01 -0400 Subject: [PATCH] Use snake_case for Connection etc methods --- design.md | 28 +++++++++++----------- src/arena_allocator.hpp | 2 +- src/connection.cpp | 4 ++-- src/connection.hpp | 32 ++++++++++++------------- src/connection_handler.hpp | 4 ++-- src/http_handler.cpp | 10 ++++---- src/http_handler.hpp | 2 +- src/server.cpp | 12 +++++----- src/server.hpp | 4 ++-- style.md | 2 +- tests/test_http_handler.cpp | 4 ++-- tests/test_server_connection_return.cpp | 6 ++--- todo.md | 21 ++++++++++++++-- 13 files changed, 74 insertions(+), 57 deletions(-) diff --git a/design.md b/design.md index 2a70143..0bf6e5c 100644 --- a/design.md +++ b/design.md @@ -193,14 +193,14 @@ CommitRequest { 1. **Creation**: Accept threads create connections, transfer to epoll as raw pointers 2. **Processing**: Network threads claim ownership by wrapping in unique_ptr 3. **Handler Transfer**: Handlers can take ownership for async processing via unique_ptr.release() -4. **Return Path**: Handlers use Server::releaseBackToServer() to return connections +4. **Return Path**: Handlers use Server::release_back_to_server() to return connections 5. **Safety**: All transfers use weak_ptr to server for safe cleanup 6. **Cleanup**: RAII ensures proper resource cleanup in all scenarios #### Arena Memory Lifecycle -1. **Request Processing**: Handler uses `conn->getArena()` to allocate memory for parsing request data +1. **Request Processing**: Handler uses `conn->get_arena()` to allocate memory for parsing request data 2. **Response Generation**: Handler uses arena for temporary response construction (headers, JSON, etc.) -3. **Response Queuing**: Handler calls `conn->appendMessage()` which copies data to arena-backed message queue +3. **Response Queuing**: Handler calls `conn->append_message()` which copies data to arena-backed message queue 4. **Response Writing**: Server writes all queued messages to socket via `writeBytes()` > **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written. @@ -262,7 +262,7 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions. - **Connection Ownership**: Use unique_ptr semantics for safe ownership transfer between components - **Arena Allocator Pattern**: Always use `ArenaAllocator` for temporary allocations within request processing - **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory -- **Ownership Transfer**: Use `Server::releaseBackToServer()` for returning connections to server from handlers +- **Ownership Transfer**: Use `Server::release_back_to_server()` for returning connections to server from handlers - **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition - **Base64 Handling**: Always use simdutf for base64 encoding/decoding for performance - **Thread Safety**: Connection ownership transfers are designed to be thread-safe with proper RAII cleanup @@ -280,9 +280,9 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions. #### Adding New Protocol Handlers 1. Inherit from `ConnectionHandler` in `src/connection_handler.hpp` 2. Implement `on_data_arrived()` with proper ownership semantics -3. Use connection's arena allocator for temporary allocations: `conn->getArena()` +3. Use connection's arena allocator for temporary allocations: `conn->get_arena()` 4. Handle partial messages and streaming protocols appropriately -5. Use `Server::releaseBackToServer()` if taking ownership for async processing +5. Use `Server::release_back_to_server()` if taking ownership for async processing 6. Add corresponding test cases and integration tests 7. Consider performance implications of ownership transfers @@ -345,10 +345,10 @@ class HttpHandler : public ConnectionHandler { public: void on_data_arrived(std::string_view data, std::unique_ptr& conn_ptr) override { // Parse HTTP request using connection's arena - ArenaAllocator& arena = conn_ptr->getArena(); + ArenaAllocator& arena = conn_ptr->get_arena(); // Generate response - conn_ptr->appendMessage("HTTP/1.1 200 OK\r\n\r\nHello World"); + conn_ptr->append_message("HTTP/1.1 200 OK\r\n\r\nHello World"); // Server retains ownership } @@ -365,10 +365,10 @@ public: work_queue.push([connection = std::move(connection)](std::string_view data) mutable { // Process asynchronously - connection->appendMessage("Async response"); + connection->append_message("Async response"); // Return ownership to server when done - Server::releaseBackToServer(std::move(connection)); + Server::release_back_to_server(std::move(connection)); }); } }; @@ -419,7 +419,7 @@ class YesHandler : public ConnectionHandler { public: void on_connection_established(Connection &conn) override { // Write an initial "y\n" - conn.appendMessage("y\n"); + conn.append_message("y\n"); } void on_write_progress(std::unique_ptr &conn) override { @@ -427,7 +427,7 @@ public: // Don't use an unbounded amount of memory conn->reset(); // Write "y\n" repeatedly - conn->appendMessage("y\n"); + conn->append_message("y\n"); } } }; @@ -452,10 +452,10 @@ Connection* raw_conn = conn_ptr.release(); // Process on worker thread background_processor.submit([raw_conn]() { // Do work... - raw_conn->appendMessage("Background result"); + raw_conn->append_message("Background result"); // Return to server safely (handles server destruction) - Server::releaseBackToServer(std::unique_ptr(raw_conn)); + Server::release_back_to_server(std::unique_ptr(raw_conn)); }); ``` diff --git a/src/arena_allocator.hpp b/src/arena_allocator.hpp index 2c1eb5f..5ee50b5 100644 --- a/src/arena_allocator.hpp +++ b/src/arena_allocator.hpp @@ -71,7 +71,7 @@ * 3. **Background Thread**: Can receive ownership for async processing, uses * arena for temporary data structures * 4. **Return Path**: Connection (and its arena) safely returned via - * Server::releaseBackToServer() + * Server::release_back_to_server() * * ### Why This Design is Thread-Safe: * - **Exclusive Access**: Only the current owner thread should access the arena diff --git a/src/connection.cpp b/src/connection.cpp index b2353ae..ff14a7d 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -5,7 +5,7 @@ #include #include -#include "server.hpp" // Need this for releaseBackToServer implementation +#include "server.hpp" // Need this for release_back_to_server implementation // Static thread-local storage for iovec buffer static thread_local std::vector g_iovec_buffer{IOV_MAX}; @@ -35,7 +35,7 @@ Connection::~Connection() { // EINTR ignored - fd is guaranteed closed on Linux } -void Connection::appendMessage(std::string_view s, bool copy_to_arena) { +void Connection::append_message(std::string_view s, bool copy_to_arena) { if (copy_to_arena) { char *arena_str = arena_.allocate(s.size()); std::memcpy(arena_str, s.data(), s.size()); diff --git a/src/connection.hpp b/src/connection.hpp index d88bdf4..42b223e 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -83,12 +83,12 @@ struct Connection { * * Example usage: * ```cpp - * conn->appendMessage("HTTP/1.1 200 OK\r\n\r\n", false); // Static string - * conn->appendMessage(dynamic_response, true); // Dynamic data - * conn->appendMessage(arena_allocated_data, false); // Arena data + * conn->append_message("HTTP/1.1 200 OK\r\n\r\n", false); // Static string + * conn->append_message(dynamic_response, true); // Dynamic data + * conn->append_message(arena_allocated_data, false); // Arena data * ``` */ - void appendMessage(std::string_view s, bool copy_to_arena = true); + void append_message(std::string_view s, bool copy_to_arena = true); /** * @brief Mark the connection to be closed after sending all queued messages. @@ -108,11 +108,11 @@ struct Connection { * * Typical usage: * ```cpp - * conn->appendMessage("HTTP/1.1 200 OK\r\n\r\nBye!"); - * conn->closeAfterSend(); // Close after sending response + * conn->append_message("HTTP/1.1 200 OK\r\n\r\nBye!"); + * conn->close_after_send(); // Close after sending response * ``` */ - void closeAfterSend() { closeConnection_ = true; } + void close_after_send() { closeConnection_ = true; } /** * @brief Get access to the connection's arena allocator. @@ -135,7 +135,7 @@ struct Connection { * * Best practices: * ```cpp - * ArenaAllocator& arena = conn->getArena(); + * ArenaAllocator& arena = conn->get_arena(); * * // Allocate temporary parsing buffers * char* buffer = arena.allocate(1024); @@ -147,7 +147,7 @@ struct Connection { * std::vector> tokens{&arena}; * ``` */ - ArenaAllocator &getArena() { return arena_; } + ArenaAllocator &get_arena() { return arena_; } /** * @brief Get the unique identifier for this connection. @@ -168,19 +168,19 @@ struct Connection { * * Typical usage: * ```cpp - * std::cout << "Processing request on connection " << conn->getId() << - * std::endl; logger.info("Connection {} sent {} bytes", conn->getId(), + * std::cout << "Processing request on connection " << conn->get_id() << + * std::endl; logger.info("Connection {} sent {} bytes", conn->get_id(), * response.size()); * ``` */ - int64_t getId() const { return id_; } + int64_t get_id() const { return id_; } /** * @brief Get the number of bytes queued for transmission. * * Returns the total number of bytes in all messages currently * queued for transmission to the client. This includes all data added via - * appendMessage() that has not yet been sent over the network. + * append_message() that has not yet been sent over the network. * * @return Total bytes queued for transmission * @@ -206,7 +206,7 @@ struct Connection { * } * * // Logging/monitoring - * metrics.recordQueueDepth(conn->getId(), conn->outgoingBytesQueued()); + * metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued()); * ``` */ int64_t outgoingBytesQueued() const { @@ -248,7 +248,7 @@ struct Connection { * class HttpHandler : public ConnectionHandler { * void on_connection_established(Connection& conn) override { * // Allocate HTTP state in connection's arena or heap - * auto* state = conn.getArena().construct(); + * auto* state = conn.get_arena().construct(); * conn.user_data = state; * } * @@ -299,7 +299,7 @@ struct Connection { * @note Ownership Transfer: To release a connection back to the server for * continued processing, use the static method: * ```cpp - * Server::releaseBackToServer(std::move(connection_ptr)); + * Server::release_back_to_server(std::move(connection_ptr)); * ``` * * This is the correct way to return connection ownership when: diff --git a/src/connection_handler.hpp b/src/connection_handler.hpp index 9a992fe..8491255 100644 --- a/src/connection_handler.hpp +++ b/src/connection_handler.hpp @@ -30,10 +30,10 @@ public: * * Implementation should: * - Parse incoming data using arena allocator when needed - * - Use conn_ptr->appendMessage() to queue response data to be sent + * - Use conn_ptr->append_message() to queue response data to be sent * - Handle partial messages and streaming protocols appropriately * - Can take ownership by calling conn_ptr.release() to pass to other threads - * - If ownership is taken, handler must call Server::releaseBackToServer() + * - If ownership is taken, handler must call Server::release_back_to_server() * when done * @note `data` is *not* owned by the connection arena, and its lifetime ends * after the call to on_data_arrived. diff --git a/src/http_handler.cpp b/src/http_handler.cpp index d582a5e..1426a62 100644 --- a/src/http_handler.cpp +++ b/src/http_handler.cpp @@ -28,7 +28,7 @@ HttpConnectionState::HttpConnectionState(ArenaAllocator &arena) // HttpHandler implementation void HttpHandler::on_connection_established(Connection &conn) { // Allocate HTTP state in connection's arena - ArenaAllocator &arena = conn.getArena(); + ArenaAllocator &arena = conn.get_arena(); void *mem = arena.allocate_raw(sizeof(HttpConnectionState), alignof(HttpConnectionState)); auto *state = new (mem) HttpConnectionState(arena); @@ -249,7 +249,7 @@ void HttpHandler::handleNotFound(Connection &conn, void HttpHandler::sendResponse(Connection &conn, int status_code, std::string_view content_type, std::string_view body, bool close_connection) { - [[maybe_unused]] ArenaAllocator &arena = conn.getArena(); + [[maybe_unused]] ArenaAllocator &arena = conn.get_arena(); // Build HTTP response using arena std::string response; @@ -293,7 +293,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code, if (close_connection) { response += "Connection: close\r\n"; - conn.closeAfterSend(); // Signal connection should be closed after sending + conn.close_after_send(); // Signal connection should be closed after sending } else { response += "Connection: keep-alive\r\n"; } @@ -301,7 +301,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code, response += "\r\n"; response += body; - conn.appendMessage(response); + conn.append_message(response); } void HttpHandler::sendJsonResponse(Connection &conn, int status_code, @@ -313,7 +313,7 @@ void HttpHandler::sendJsonResponse(Connection &conn, int status_code, void HttpHandler::sendErrorResponse(Connection &conn, int status_code, std::string_view message, bool close_connection) { - [[maybe_unused]] ArenaAllocator &arena = conn.getArena(); + [[maybe_unused]] ArenaAllocator &arena = conn.get_arena(); std::string json = R"({"error":")"; json += message; diff --git a/src/http_handler.hpp b/src/http_handler.hpp index 009f869..59cdba7 100644 --- a/src/http_handler.hpp +++ b/src/http_handler.hpp @@ -79,7 +79,7 @@ struct HttpHandler : ConnectionHandler { auto *state = static_cast(c->user_data); TRACE_EVENT("http", "pipeline thread", perfetto::Flow::Global(state->request_id)); - Server::releaseBackToServer(std::move(c)); + Server::release_back_to_server(std::move(c)); } } } diff --git a/src/server.cpp b/src/server.cpp index 36cea2c..3a02a77 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -50,11 +50,11 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler, // Setup shutdown pipe for graceful shutdown setup_shutdown_pipe(); - // Create epoll instances immediately for createLocalConnection() support + // Create epoll instances immediately for create_local_connection() support create_epoll_instances(); // If empty vector provided, listen_fds_ will be empty (no listening) - // Server works purely with createLocalConnection() + // Server works purely with create_local_connection() } Server::~Server() { @@ -137,7 +137,7 @@ void Server::shutdown() { } } -void Server::releaseBackToServer(std::unique_ptr connection) { +void Server::release_back_to_server(std::unique_ptr connection) { if (!connection) { return; // Nothing to release } @@ -182,7 +182,7 @@ void Server::receiveConnectionBack(std::unique_ptr connection) { } } -int Server::createLocalConnection() { +int Server::create_local_connection() { int sockets[2]; if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) { perror("socketpair"); @@ -195,12 +195,12 @@ int Server::createLocalConnection() { int flags = fcntl(server_fd, F_GETFL, 0); if (flags == -1) { std::fprintf(stderr, - "Server::createLocalConnection: fcntl F_GETFL failed\n"); + "Server::create_local_connection: fcntl F_GETFL failed\n"); std::abort(); } if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) { std::fprintf(stderr, - "Server::createLocalConnection: fcntl F_SETFL failed\n"); + "Server::create_local_connection: fcntl F_SETFL failed\n"); std::abort(); } diff --git a/src/server.hpp b/src/server.hpp index d69f490..2db4710 100644 --- a/src/server.hpp +++ b/src/server.hpp @@ -92,7 +92,7 @@ struct Server : std::enable_shared_from_this { * @return File descriptor for the client end of the socketpair, or -1 on * error */ - int createLocalConnection(); + int create_local_connection(); /** * Release a connection back to its server for continued processing. @@ -105,7 +105,7 @@ struct Server : std::enable_shared_from_this { * * @param connection unique_ptr to the connection being released back */ - static void releaseBackToServer(std::unique_ptr connection); + static void release_back_to_server(std::unique_ptr connection); private: friend struct Connection; diff --git a/style.md b/style.md index aba5420..7a65358 100644 --- a/style.md +++ b/style.md @@ -278,7 +278,7 @@ auto connection = Connection::createForServer(addr, fd, connection_id, handler, // Friend-based factory for access control struct Connection { - void appendMessage(std::string_view message_data); + void append_message(std::string_view message_data); private: Connection(struct sockaddr_storage client_addr, int file_descriptor, int64_t connection_id, ConnectionHandler* request_handler, diff --git a/tests/test_http_handler.cpp b/tests/test_http_handler.cpp index d21b603..530c0d8 100644 --- a/tests/test_http_handler.cpp +++ b/tests/test_http_handler.cpp @@ -16,9 +16,9 @@ struct TestConnectionData { std::string message_buffer; void *user_data = nullptr; - void appendMessage(std::string_view data) { message_buffer += data; } + void append_message(std::string_view data) { message_buffer += data; } - ArenaAllocator &getArena() { return arena; } + ArenaAllocator &get_arena() { return arena; } const std::string &getResponse() const { return message_buffer; } void clearResponse() { message_buffer.clear(); } void reset() { diff --git a/tests/test_server_connection_return.cpp b/tests/test_server_connection_return.cpp index fe2d829..9b51ef7 100644 --- a/tests/test_server_connection_return.cpp +++ b/tests/test_server_connection_return.cpp @@ -53,8 +53,8 @@ TEST_CASE( return; } assert(message.conn); - message.conn->appendMessage(message.data); - Server::releaseBackToServer(std::move(message.conn)); + message.conn->append_message(message.data); + Server::release_back_to_server(std::move(message.conn)); } } }}; @@ -65,7 +65,7 @@ TEST_CASE( std::thread server_thread([&server]() { server->run(); }); // Create local connection - int client_fd = server->createLocalConnection(); + int client_fd = server->create_local_connection(); REQUIRE(client_fd > 0); // Write some test data diff --git a/todo.md b/todo.md index 51d363e..14a46c6 100644 --- a/todo.md +++ b/todo.md @@ -26,8 +26,25 @@ - [ ] Implement thread-safe Prometheus metrics library and serve `GET /metrics` endpoint - [ ] Implement gperf-based HTTP routing for efficient request dispatching - [ ] Implement HTTP client for S3 interactions + - [ ] Design `HttpClient` class following WeaselDB patterns (factory creation, arena allocation, RAII) + - [ ] Implement connection pool with configurable limits (max connections, idle timeout) + - [ ] Support HTTP/1.1 with Keep-Alive for connection reuse + - [ ] Use epoll-based async I/O model similar to Server architecture + - [ ] Implement request pipeline: `HttpRequest` -> `HttpResponse` with arena-backed memory + - [ ] Support streaming request bodies (chunked encoding) for large S3 uploads + - [ ] Support streaming response bodies with callback-based data consumption + - [ ] Add timeout handling (connect timeout, read timeout, total request timeout) + - [ ] Implement retry logic with exponential backoff and jitter + - [ ] Support custom headers and authentication (AWS Signature V4 for S3) + - [ ] Single-threaded event-driven design for integration with persistence pipeline + - [ ] Integration with WeaselDB's ThreadPipeline for request batching + - [ ] Comprehensive error handling with detailed error codes and descriptions + - [ ] Support for HTTP redirects (3xx responses) with redirect limits + - [ ] SSL/TLS support using OpenSSL for HTTPS connections + - [ ] Request/response logging and metrics integration + - [ ] Memory-efficient design with zero-copy where possible - [ ] Implement fake in-process S3 service using separate Server instance with S3 ConnectionHandler - - [ ] Use createLocalConnection to get fd for in-process communication + - [ ] Use create_local_connection to get fd for in-process communication - [ ] Implement `ListObjectsV2` API for object enumeration - [ ] Implement `PutObject` with chunked encoding support for streaming uploads - [ ] Add `If-None-Match` conditional header handling for `PutObject` @@ -52,7 +69,7 @@ - [ ] Test retention policy endpoints (`/v1/retention/*`) - [ ] Test `/metrics` Prometheus endpoint - [ ] Test error conditions and edge cases - - [ ] Test concurrent request handling and threading model + - [ ] Test single-threaded event-driven processing model ## ✅ Completed Tasks