Use snake_case for Connection etc methods
This commit is contained in:
28
design.md
28
design.md
@@ -193,14 +193,14 @@ CommitRequest {
|
||||
1. **Creation**: Accept threads create connections, transfer to epoll as raw pointers
|
||||
2. **Processing**: Network threads claim ownership by wrapping in unique_ptr
|
||||
3. **Handler Transfer**: Handlers can take ownership for async processing via unique_ptr.release()
|
||||
4. **Return Path**: Handlers use Server::releaseBackToServer() to return connections
|
||||
4. **Return Path**: Handlers use Server::release_back_to_server() to return connections
|
||||
5. **Safety**: All transfers use weak_ptr to server for safe cleanup
|
||||
6. **Cleanup**: RAII ensures proper resource cleanup in all scenarios
|
||||
|
||||
#### Arena Memory Lifecycle
|
||||
1. **Request Processing**: Handler uses `conn->getArena()` to allocate memory for parsing request data
|
||||
1. **Request Processing**: Handler uses `conn->get_arena()` to allocate memory for parsing request data
|
||||
2. **Response Generation**: Handler uses arena for temporary response construction (headers, JSON, etc.)
|
||||
3. **Response Queuing**: Handler calls `conn->appendMessage()` which copies data to arena-backed message queue
|
||||
3. **Response Queuing**: Handler calls `conn->append_message()` which copies data to arena-backed message queue
|
||||
4. **Response Writing**: Server writes all queued messages to socket via `writeBytes()`
|
||||
|
||||
> **Note**: Call `conn->reset()` periodically to reclaim arena memory. Best practice is after all outgoing bytes have been written.
|
||||
@@ -262,7 +262,7 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions.
|
||||
- **Connection Ownership**: Use unique_ptr semantics for safe ownership transfer between components
|
||||
- **Arena Allocator Pattern**: Always use `ArenaAllocator` for temporary allocations within request processing
|
||||
- **String View Usage**: Prefer `std::string_view` over `std::string` when pointing to arena-allocated memory
|
||||
- **Ownership Transfer**: Use `Server::releaseBackToServer()` for returning connections to server from handlers
|
||||
- **Ownership Transfer**: Use `Server::release_back_to_server()` for returning connections to server from handlers
|
||||
- **JSON Token Lookup**: Use the gperf-generated perfect hash table in `json_tokens.hpp` for O(1) key recognition
|
||||
- **Base64 Handling**: Always use simdutf for base64 encoding/decoding for performance
|
||||
- **Thread Safety**: Connection ownership transfers are designed to be thread-safe with proper RAII cleanup
|
||||
@@ -280,9 +280,9 @@ See [style.md](style.md) for comprehensive C++ coding standards and conventions.
|
||||
#### Adding New Protocol Handlers
|
||||
1. Inherit from `ConnectionHandler` in `src/connection_handler.hpp`
|
||||
2. Implement `on_data_arrived()` with proper ownership semantics
|
||||
3. Use connection's arena allocator for temporary allocations: `conn->getArena()`
|
||||
3. Use connection's arena allocator for temporary allocations: `conn->get_arena()`
|
||||
4. Handle partial messages and streaming protocols appropriately
|
||||
5. Use `Server::releaseBackToServer()` if taking ownership for async processing
|
||||
5. Use `Server::release_back_to_server()` if taking ownership for async processing
|
||||
6. Add corresponding test cases and integration tests
|
||||
7. Consider performance implications of ownership transfers
|
||||
|
||||
@@ -345,10 +345,10 @@ class HttpHandler : public ConnectionHandler {
|
||||
public:
|
||||
void on_data_arrived(std::string_view data, std::unique_ptr<Connection>& conn_ptr) override {
|
||||
// Parse HTTP request using connection's arena
|
||||
ArenaAllocator& arena = conn_ptr->getArena();
|
||||
ArenaAllocator& arena = conn_ptr->get_arena();
|
||||
|
||||
// Generate response
|
||||
conn_ptr->appendMessage("HTTP/1.1 200 OK\r\n\r\nHello World");
|
||||
conn_ptr->append_message("HTTP/1.1 200 OK\r\n\r\nHello World");
|
||||
|
||||
// Server retains ownership
|
||||
}
|
||||
@@ -365,10 +365,10 @@ public:
|
||||
|
||||
work_queue.push([connection = std::move(connection)](std::string_view data) mutable {
|
||||
// Process asynchronously
|
||||
connection->appendMessage("Async response");
|
||||
connection->append_message("Async response");
|
||||
|
||||
// Return ownership to server when done
|
||||
Server::releaseBackToServer(std::move(connection));
|
||||
Server::release_back_to_server(std::move(connection));
|
||||
});
|
||||
}
|
||||
};
|
||||
@@ -419,7 +419,7 @@ class YesHandler : public ConnectionHandler {
|
||||
public:
|
||||
void on_connection_established(Connection &conn) override {
|
||||
// Write an initial "y\n"
|
||||
conn.appendMessage("y\n");
|
||||
conn.append_message("y\n");
|
||||
}
|
||||
|
||||
void on_write_progress(std::unique_ptr<Connection> &conn) override {
|
||||
@@ -427,7 +427,7 @@ public:
|
||||
// Don't use an unbounded amount of memory
|
||||
conn->reset();
|
||||
// Write "y\n" repeatedly
|
||||
conn->appendMessage("y\n");
|
||||
conn->append_message("y\n");
|
||||
}
|
||||
}
|
||||
};
|
||||
@@ -452,10 +452,10 @@ Connection* raw_conn = conn_ptr.release();
|
||||
// Process on worker thread
|
||||
background_processor.submit([raw_conn]() {
|
||||
// Do work...
|
||||
raw_conn->appendMessage("Background result");
|
||||
raw_conn->append_message("Background result");
|
||||
|
||||
// Return to server safely (handles server destruction)
|
||||
Server::releaseBackToServer(std::unique_ptr<Connection>(raw_conn));
|
||||
Server::release_back_to_server(std::unique_ptr<Connection>(raw_conn));
|
||||
});
|
||||
```
|
||||
|
||||
|
||||
@@ -71,7 +71,7 @@
|
||||
* 3. **Background Thread**: Can receive ownership for async processing, uses
|
||||
* arena for temporary data structures
|
||||
* 4. **Return Path**: Connection (and its arena) safely returned via
|
||||
* Server::releaseBackToServer()
|
||||
* Server::release_back_to_server()
|
||||
*
|
||||
* ### Why This Design is Thread-Safe:
|
||||
* - **Exclusive Access**: Only the current owner thread should access the arena
|
||||
|
||||
@@ -5,7 +5,7 @@
|
||||
#include <cstdio>
|
||||
#include <cstdlib>
|
||||
|
||||
#include "server.hpp" // Need this for releaseBackToServer implementation
|
||||
#include "server.hpp" // Need this for release_back_to_server implementation
|
||||
|
||||
// Static thread-local storage for iovec buffer
|
||||
static thread_local std::vector<struct iovec> g_iovec_buffer{IOV_MAX};
|
||||
@@ -35,7 +35,7 @@ Connection::~Connection() {
|
||||
// EINTR ignored - fd is guaranteed closed on Linux
|
||||
}
|
||||
|
||||
void Connection::appendMessage(std::string_view s, bool copy_to_arena) {
|
||||
void Connection::append_message(std::string_view s, bool copy_to_arena) {
|
||||
if (copy_to_arena) {
|
||||
char *arena_str = arena_.allocate<char>(s.size());
|
||||
std::memcpy(arena_str, s.data(), s.size());
|
||||
|
||||
@@ -83,12 +83,12 @@ struct Connection {
|
||||
*
|
||||
* Example usage:
|
||||
* ```cpp
|
||||
* conn->appendMessage("HTTP/1.1 200 OK\r\n\r\n", false); // Static string
|
||||
* conn->appendMessage(dynamic_response, true); // Dynamic data
|
||||
* conn->appendMessage(arena_allocated_data, false); // Arena data
|
||||
* conn->append_message("HTTP/1.1 200 OK\r\n\r\n", false); // Static string
|
||||
* conn->append_message(dynamic_response, true); // Dynamic data
|
||||
* conn->append_message(arena_allocated_data, false); // Arena data
|
||||
* ```
|
||||
*/
|
||||
void appendMessage(std::string_view s, bool copy_to_arena = true);
|
||||
void append_message(std::string_view s, bool copy_to_arena = true);
|
||||
|
||||
/**
|
||||
* @brief Mark the connection to be closed after sending all queued messages.
|
||||
@@ -108,11 +108,11 @@ struct Connection {
|
||||
*
|
||||
* Typical usage:
|
||||
* ```cpp
|
||||
* conn->appendMessage("HTTP/1.1 200 OK\r\n\r\nBye!");
|
||||
* conn->closeAfterSend(); // Close after sending response
|
||||
* conn->append_message("HTTP/1.1 200 OK\r\n\r\nBye!");
|
||||
* conn->close_after_send(); // Close after sending response
|
||||
* ```
|
||||
*/
|
||||
void closeAfterSend() { closeConnection_ = true; }
|
||||
void close_after_send() { closeConnection_ = true; }
|
||||
|
||||
/**
|
||||
* @brief Get access to the connection's arena allocator.
|
||||
@@ -135,7 +135,7 @@ struct Connection {
|
||||
*
|
||||
* Best practices:
|
||||
* ```cpp
|
||||
* ArenaAllocator& arena = conn->getArena();
|
||||
* ArenaAllocator& arena = conn->get_arena();
|
||||
*
|
||||
* // Allocate temporary parsing buffers
|
||||
* char* buffer = arena.allocate<char>(1024);
|
||||
@@ -147,7 +147,7 @@ struct Connection {
|
||||
* std::vector<Token, ArenaStlAllocator<Token>> tokens{&arena};
|
||||
* ```
|
||||
*/
|
||||
ArenaAllocator &getArena() { return arena_; }
|
||||
ArenaAllocator &get_arena() { return arena_; }
|
||||
|
||||
/**
|
||||
* @brief Get the unique identifier for this connection.
|
||||
@@ -168,19 +168,19 @@ struct Connection {
|
||||
*
|
||||
* Typical usage:
|
||||
* ```cpp
|
||||
* std::cout << "Processing request on connection " << conn->getId() <<
|
||||
* std::endl; logger.info("Connection {} sent {} bytes", conn->getId(),
|
||||
* std::cout << "Processing request on connection " << conn->get_id() <<
|
||||
* std::endl; logger.info("Connection {} sent {} bytes", conn->get_id(),
|
||||
* response.size());
|
||||
* ```
|
||||
*/
|
||||
int64_t getId() const { return id_; }
|
||||
int64_t get_id() const { return id_; }
|
||||
|
||||
/**
|
||||
* @brief Get the number of bytes queued for transmission.
|
||||
*
|
||||
* Returns the total number of bytes in all messages currently
|
||||
* queued for transmission to the client. This includes all data added via
|
||||
* appendMessage() that has not yet been sent over the network.
|
||||
* append_message() that has not yet been sent over the network.
|
||||
*
|
||||
* @return Total bytes queued for transmission
|
||||
*
|
||||
@@ -206,7 +206,7 @@ struct Connection {
|
||||
* }
|
||||
*
|
||||
* // Logging/monitoring
|
||||
* metrics.recordQueueDepth(conn->getId(), conn->outgoingBytesQueued());
|
||||
* metrics.recordQueueDepth(conn->get_id(), conn->outgoingBytesQueued());
|
||||
* ```
|
||||
*/
|
||||
int64_t outgoingBytesQueued() const {
|
||||
@@ -248,7 +248,7 @@ struct Connection {
|
||||
* class HttpHandler : public ConnectionHandler {
|
||||
* void on_connection_established(Connection& conn) override {
|
||||
* // Allocate HTTP state in connection's arena or heap
|
||||
* auto* state = conn.getArena().construct<HttpConnectionState>();
|
||||
* auto* state = conn.get_arena().construct<HttpConnectionState>();
|
||||
* conn.user_data = state;
|
||||
* }
|
||||
*
|
||||
@@ -299,7 +299,7 @@ struct Connection {
|
||||
* @note Ownership Transfer: To release a connection back to the server for
|
||||
* continued processing, use the static method:
|
||||
* ```cpp
|
||||
* Server::releaseBackToServer(std::move(connection_ptr));
|
||||
* Server::release_back_to_server(std::move(connection_ptr));
|
||||
* ```
|
||||
*
|
||||
* This is the correct way to return connection ownership when:
|
||||
|
||||
@@ -30,10 +30,10 @@ public:
|
||||
*
|
||||
* Implementation should:
|
||||
* - Parse incoming data using arena allocator when needed
|
||||
* - Use conn_ptr->appendMessage() to queue response data to be sent
|
||||
* - Use conn_ptr->append_message() to queue response data to be sent
|
||||
* - Handle partial messages and streaming protocols appropriately
|
||||
* - Can take ownership by calling conn_ptr.release() to pass to other threads
|
||||
* - If ownership is taken, handler must call Server::releaseBackToServer()
|
||||
* - If ownership is taken, handler must call Server::release_back_to_server()
|
||||
* when done
|
||||
* @note `data` is *not* owned by the connection arena, and its lifetime ends
|
||||
* after the call to on_data_arrived.
|
||||
|
||||
@@ -28,7 +28,7 @@ HttpConnectionState::HttpConnectionState(ArenaAllocator &arena)
|
||||
// HttpHandler implementation
|
||||
void HttpHandler::on_connection_established(Connection &conn) {
|
||||
// Allocate HTTP state in connection's arena
|
||||
ArenaAllocator &arena = conn.getArena();
|
||||
ArenaAllocator &arena = conn.get_arena();
|
||||
void *mem = arena.allocate_raw(sizeof(HttpConnectionState),
|
||||
alignof(HttpConnectionState));
|
||||
auto *state = new (mem) HttpConnectionState(arena);
|
||||
@@ -249,7 +249,7 @@ void HttpHandler::handleNotFound(Connection &conn,
|
||||
void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
std::string_view content_type,
|
||||
std::string_view body, bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
||||
|
||||
// Build HTTP response using arena
|
||||
std::string response;
|
||||
@@ -293,7 +293,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
|
||||
if (close_connection) {
|
||||
response += "Connection: close\r\n";
|
||||
conn.closeAfterSend(); // Signal connection should be closed after sending
|
||||
conn.close_after_send(); // Signal connection should be closed after sending
|
||||
} else {
|
||||
response += "Connection: keep-alive\r\n";
|
||||
}
|
||||
@@ -301,7 +301,7 @@ void HttpHandler::sendResponse(Connection &conn, int status_code,
|
||||
response += "\r\n";
|
||||
response += body;
|
||||
|
||||
conn.appendMessage(response);
|
||||
conn.append_message(response);
|
||||
}
|
||||
|
||||
void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
|
||||
@@ -313,7 +313,7 @@ void HttpHandler::sendJsonResponse(Connection &conn, int status_code,
|
||||
void HttpHandler::sendErrorResponse(Connection &conn, int status_code,
|
||||
std::string_view message,
|
||||
bool close_connection) {
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.getArena();
|
||||
[[maybe_unused]] ArenaAllocator &arena = conn.get_arena();
|
||||
|
||||
std::string json = R"({"error":")";
|
||||
json += message;
|
||||
|
||||
@@ -79,7 +79,7 @@ struct HttpHandler : ConnectionHandler {
|
||||
auto *state = static_cast<HttpConnectionState *>(c->user_data);
|
||||
TRACE_EVENT("http", "pipeline thread",
|
||||
perfetto::Flow::Global(state->request_id));
|
||||
Server::releaseBackToServer(std::move(c));
|
||||
Server::release_back_to_server(std::move(c));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -50,11 +50,11 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
||||
// Setup shutdown pipe for graceful shutdown
|
||||
setup_shutdown_pipe();
|
||||
|
||||
// Create epoll instances immediately for createLocalConnection() support
|
||||
// Create epoll instances immediately for create_local_connection() support
|
||||
create_epoll_instances();
|
||||
|
||||
// If empty vector provided, listen_fds_ will be empty (no listening)
|
||||
// Server works purely with createLocalConnection()
|
||||
// Server works purely with create_local_connection()
|
||||
}
|
||||
|
||||
Server::~Server() {
|
||||
@@ -137,7 +137,7 @@ void Server::shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
void Server::releaseBackToServer(std::unique_ptr<Connection> connection) {
|
||||
void Server::release_back_to_server(std::unique_ptr<Connection> connection) {
|
||||
if (!connection) {
|
||||
return; // Nothing to release
|
||||
}
|
||||
@@ -182,7 +182,7 @@ void Server::receiveConnectionBack(std::unique_ptr<Connection> connection) {
|
||||
}
|
||||
}
|
||||
|
||||
int Server::createLocalConnection() {
|
||||
int Server::create_local_connection() {
|
||||
int sockets[2];
|
||||
if (socketpair(AF_UNIX, SOCK_STREAM, 0, sockets) != 0) {
|
||||
perror("socketpair");
|
||||
@@ -195,12 +195,12 @@ int Server::createLocalConnection() {
|
||||
int flags = fcntl(server_fd, F_GETFL, 0);
|
||||
if (flags == -1) {
|
||||
std::fprintf(stderr,
|
||||
"Server::createLocalConnection: fcntl F_GETFL failed\n");
|
||||
"Server::create_local_connection: fcntl F_GETFL failed\n");
|
||||
std::abort();
|
||||
}
|
||||
if (fcntl(server_fd, F_SETFL, flags | O_NONBLOCK) == -1) {
|
||||
std::fprintf(stderr,
|
||||
"Server::createLocalConnection: fcntl F_SETFL failed\n");
|
||||
"Server::create_local_connection: fcntl F_SETFL failed\n");
|
||||
std::abort();
|
||||
}
|
||||
|
||||
|
||||
@@ -92,7 +92,7 @@ struct Server : std::enable_shared_from_this<Server> {
|
||||
* @return File descriptor for the client end of the socketpair, or -1 on
|
||||
* error
|
||||
*/
|
||||
int createLocalConnection();
|
||||
int create_local_connection();
|
||||
|
||||
/**
|
||||
* Release a connection back to its server for continued processing.
|
||||
@@ -105,7 +105,7 @@ struct Server : std::enable_shared_from_this<Server> {
|
||||
*
|
||||
* @param connection unique_ptr to the connection being released back
|
||||
*/
|
||||
static void releaseBackToServer(std::unique_ptr<Connection> connection);
|
||||
static void release_back_to_server(std::unique_ptr<Connection> connection);
|
||||
|
||||
private:
|
||||
friend struct Connection;
|
||||
|
||||
2
style.md
2
style.md
@@ -278,7 +278,7 @@ auto connection = Connection::createForServer(addr, fd, connection_id, handler,
|
||||
|
||||
// Friend-based factory for access control
|
||||
struct Connection {
|
||||
void appendMessage(std::string_view message_data);
|
||||
void append_message(std::string_view message_data);
|
||||
private:
|
||||
Connection(struct sockaddr_storage client_addr, int file_descriptor,
|
||||
int64_t connection_id, ConnectionHandler* request_handler,
|
||||
|
||||
@@ -16,9 +16,9 @@ struct TestConnectionData {
|
||||
std::string message_buffer;
|
||||
void *user_data = nullptr;
|
||||
|
||||
void appendMessage(std::string_view data) { message_buffer += data; }
|
||||
void append_message(std::string_view data) { message_buffer += data; }
|
||||
|
||||
ArenaAllocator &getArena() { return arena; }
|
||||
ArenaAllocator &get_arena() { return arena; }
|
||||
const std::string &getResponse() const { return message_buffer; }
|
||||
void clearResponse() { message_buffer.clear(); }
|
||||
void reset() {
|
||||
|
||||
@@ -53,8 +53,8 @@ TEST_CASE(
|
||||
return;
|
||||
}
|
||||
assert(message.conn);
|
||||
message.conn->appendMessage(message.data);
|
||||
Server::releaseBackToServer(std::move(message.conn));
|
||||
message.conn->append_message(message.data);
|
||||
Server::release_back_to_server(std::move(message.conn));
|
||||
}
|
||||
}
|
||||
}};
|
||||
@@ -65,7 +65,7 @@ TEST_CASE(
|
||||
std::thread server_thread([&server]() { server->run(); });
|
||||
|
||||
// Create local connection
|
||||
int client_fd = server->createLocalConnection();
|
||||
int client_fd = server->create_local_connection();
|
||||
REQUIRE(client_fd > 0);
|
||||
|
||||
// Write some test data
|
||||
|
||||
21
todo.md
21
todo.md
@@ -26,8 +26,25 @@
|
||||
- [ ] Implement thread-safe Prometheus metrics library and serve `GET /metrics` endpoint
|
||||
- [ ] Implement gperf-based HTTP routing for efficient request dispatching
|
||||
- [ ] Implement HTTP client for S3 interactions
|
||||
- [ ] Design `HttpClient` class following WeaselDB patterns (factory creation, arena allocation, RAII)
|
||||
- [ ] Implement connection pool with configurable limits (max connections, idle timeout)
|
||||
- [ ] Support HTTP/1.1 with Keep-Alive for connection reuse
|
||||
- [ ] Use epoll-based async I/O model similar to Server architecture
|
||||
- [ ] Implement request pipeline: `HttpRequest` -> `HttpResponse` with arena-backed memory
|
||||
- [ ] Support streaming request bodies (chunked encoding) for large S3 uploads
|
||||
- [ ] Support streaming response bodies with callback-based data consumption
|
||||
- [ ] Add timeout handling (connect timeout, read timeout, total request timeout)
|
||||
- [ ] Implement retry logic with exponential backoff and jitter
|
||||
- [ ] Support custom headers and authentication (AWS Signature V4 for S3)
|
||||
- [ ] Single-threaded event-driven design for integration with persistence pipeline
|
||||
- [ ] Integration with WeaselDB's ThreadPipeline for request batching
|
||||
- [ ] Comprehensive error handling with detailed error codes and descriptions
|
||||
- [ ] Support for HTTP redirects (3xx responses) with redirect limits
|
||||
- [ ] SSL/TLS support using OpenSSL for HTTPS connections
|
||||
- [ ] Request/response logging and metrics integration
|
||||
- [ ] Memory-efficient design with zero-copy where possible
|
||||
- [ ] Implement fake in-process S3 service using separate Server instance with S3 ConnectionHandler
|
||||
- [ ] Use createLocalConnection to get fd for in-process communication
|
||||
- [ ] Use create_local_connection to get fd for in-process communication
|
||||
- [ ] Implement `ListObjectsV2` API for object enumeration
|
||||
- [ ] Implement `PutObject` with chunked encoding support for streaming uploads
|
||||
- [ ] Add `If-None-Match` conditional header handling for `PutObject`
|
||||
@@ -52,7 +69,7 @@
|
||||
- [ ] Test retention policy endpoints (`/v1/retention/*`)
|
||||
- [ ] Test `/metrics` Prometheus endpoint
|
||||
- [ ] Test error conditions and edge cases
|
||||
- [ ] Test concurrent request handling and threading model
|
||||
- [ ] Test single-threaded event-driven processing model
|
||||
|
||||
## ✅ Completed Tasks
|
||||
|
||||
|
||||
Reference in New Issue
Block a user