Compare commits
2 Commits
5b4a28a8ca
...
7db0e331e4
| Author | SHA1 | Date | |
|---|---|---|---|
| 7db0e331e4 | |||
| a820efa2e6 |
@@ -28,7 +28,10 @@ Connection::~Connection() {
|
|||||||
if (auto server_ptr = server_.lock()) {
|
if (auto server_ptr = server_.lock()) {
|
||||||
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
|
server_ptr->active_connections_.fetch_sub(1, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
int e = close(fd_);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(fd_);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
if (e == -1) {
|
if (e == -1) {
|
||||||
perror("close");
|
perror("close");
|
||||||
std::abort();
|
std::abort();
|
||||||
|
|||||||
35
src/main.cpp
35
src/main.cpp
@@ -48,7 +48,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
addr.sun_family = AF_UNIX;
|
addr.sun_family = AF_UNIX;
|
||||||
|
|
||||||
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
|
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
std::fprintf(stderr, "Unix socket path too long\n");
|
std::fprintf(stderr, "Unix socket path too long\n");
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
@@ -58,13 +61,19 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
|
|
||||||
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
|
||||||
perror("bind");
|
perror("bind");
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (listen(sfd, SOMAXCONN) == -1) {
|
if (listen(sfd, SOMAXCONN) == -1) {
|
||||||
perror("listen");
|
perror("listen");
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -103,7 +112,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
int val = 1;
|
int val = 1;
|
||||||
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
|
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
|
||||||
perror("setsockopt SO_REUSEADDR");
|
perror("setsockopt SO_REUSEADDR");
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,7 +123,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
|
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
|
||||||
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
|
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
|
||||||
perror("setsockopt TCP_NODELAY");
|
perror("setsockopt TCP_NODELAY");
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -120,7 +135,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
break; /* Success */
|
break; /* Success */
|
||||||
}
|
}
|
||||||
|
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
sfd = -1;
|
sfd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -133,7 +151,10 @@ std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
|
|||||||
|
|
||||||
if (listen(sfd, SOMAXCONN) == -1) {
|
if (listen(sfd, SOMAXCONN) == -1) {
|
||||||
perror("listen");
|
perror("listen");
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
std::abort();
|
std::abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -59,18 +59,27 @@ Server::Server(const weaseldb::Config &config, ConnectionHandler &handler,
|
|||||||
|
|
||||||
Server::~Server() {
|
Server::~Server() {
|
||||||
if (shutdown_pipe_[0] != -1) {
|
if (shutdown_pipe_[0] != -1) {
|
||||||
close(shutdown_pipe_[0]);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(shutdown_pipe_[0]);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
shutdown_pipe_[0] = -1;
|
shutdown_pipe_[0] = -1;
|
||||||
}
|
}
|
||||||
if (shutdown_pipe_[1] != -1) {
|
if (shutdown_pipe_[1] != -1) {
|
||||||
close(shutdown_pipe_[1]);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(shutdown_pipe_[1]);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
shutdown_pipe_[1] = -1;
|
shutdown_pipe_[1] = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close all epoll instances
|
// Close all epoll instances
|
||||||
for (int epollfd : epoll_fds_) {
|
for (int epollfd : epoll_fds_) {
|
||||||
if (epollfd != -1) {
|
if (epollfd != -1) {
|
||||||
close(epollfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(epollfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
epoll_fds_.clear();
|
epoll_fds_.clear();
|
||||||
@@ -78,7 +87,10 @@ Server::~Server() {
|
|||||||
// Close all listen sockets (Server always owns them)
|
// Close all listen sockets (Server always owns them)
|
||||||
for (int fd : listen_fds_) {
|
for (int fd : listen_fds_) {
|
||||||
if (fd != -1) {
|
if (fd != -1) {
|
||||||
close(fd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -214,8 +226,13 @@ int Server::createLocalConnection() {
|
|||||||
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
|
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
|
||||||
perror("epoll_ctl ADD local connection");
|
perror("epoll_ctl ADD local connection");
|
||||||
connection_registry_.remove(server_fd);
|
connection_registry_.remove(server_fd);
|
||||||
close(server_fd);
|
int e;
|
||||||
close(client_fd);
|
do {
|
||||||
|
e = close(server_fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
|
do {
|
||||||
|
e = close(client_fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -367,7 +384,10 @@ void Server::start_io_threads(std::vector<std::thread> &threads) {
|
|||||||
if (config_.server.max_connections > 0 &&
|
if (config_.server.max_connections > 0 &&
|
||||||
active_connections_.load(std::memory_order_relaxed) >=
|
active_connections_.load(std::memory_order_relaxed) >=
|
||||||
config_.server.max_connections) {
|
config_.server.max_connections) {
|
||||||
close(fd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
35
style.md
35
style.md
@@ -329,6 +329,41 @@ static_assert(std::is_trivially_destructible_v<T>, "Arena requires trivially des
|
|||||||
// assert(file_exists(path)); // File might legitimately not exist - use return code instead
|
// assert(file_exists(path)); // File might legitimately not exist - use return code instead
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### System Call Error Handling
|
||||||
|
|
||||||
|
When a system call is interrupted by a signal (`EINTR`), it is usually necessary to retry the call. This is especially true for "slow" system calls that can block for a long time, such as `read`, `write`, `accept`, `connect`, `close`, `sem_wait`, and `epoll_wait`.
|
||||||
|
|
||||||
|
**Rule:** Always wrap potentially interruptible system calls in a `do-while` loop that checks for `EINTR`.
|
||||||
|
|
||||||
|
**Example:**
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
int fd;
|
||||||
|
do {
|
||||||
|
fd = accept(listen_fd, nullptr, nullptr);
|
||||||
|
} while (fd == -1 && errno == EINTR);
|
||||||
|
|
||||||
|
if (fd == -1) {
|
||||||
|
// Handle other errors
|
||||||
|
perror("accept");
|
||||||
|
abort();
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
**Non-interruptible calls:**
|
||||||
|
|
||||||
|
Most system calls are not interruptible in practice. For these, it is not necessary to add a retry loop. This includes:
|
||||||
|
|
||||||
|
* `fcntl` (with `F_GETFL`, `F_SETFL`, `F_GETFD`, `F_SETFD` - note: `F_SETLKW` and `F_OFD_SETLKW` CAN return EINTR)
|
||||||
|
* `epoll_ctl`
|
||||||
|
* `socketpair`
|
||||||
|
* `pipe`
|
||||||
|
* `setsockopt`
|
||||||
|
* `epoll_create1`
|
||||||
|
|
||||||
|
When in doubt, consult the `man` page for the specific system call to see if it can return `EINTR`.
|
||||||
|
|
||||||
---
|
---
|
||||||
|
|
||||||
## Documentation
|
## Documentation
|
||||||
|
|||||||
@@ -69,12 +69,18 @@ TEST_CASE(
|
|||||||
|
|
||||||
// Write some test data
|
// Write some test data
|
||||||
const char *test_message = "Hello, World!";
|
const char *test_message = "Hello, World!";
|
||||||
ssize_t bytes_written = write(client_fd, test_message, strlen(test_message));
|
ssize_t bytes_written;
|
||||||
|
do {
|
||||||
|
bytes_written = write(client_fd, test_message, strlen(test_message));
|
||||||
|
} while (bytes_written == -1 && errno == EINTR);
|
||||||
REQUIRE(bytes_written == strlen(test_message));
|
REQUIRE(bytes_written == strlen(test_message));
|
||||||
|
|
||||||
// Read the echoed response
|
// Read the echoed response
|
||||||
char buffer[1024] = {0};
|
char buffer[1024] = {0};
|
||||||
ssize_t bytes_read = read(client_fd, buffer, sizeof(buffer) - 1);
|
ssize_t bytes_read;
|
||||||
|
do {
|
||||||
|
bytes_read = read(client_fd, buffer, sizeof(buffer) - 1);
|
||||||
|
} while (bytes_read == -1 && errno == EINTR);
|
||||||
if (bytes_read == -1) {
|
if (bytes_read == -1) {
|
||||||
perror("read failed");
|
perror("read failed");
|
||||||
}
|
}
|
||||||
@@ -84,7 +90,10 @@ TEST_CASE(
|
|||||||
CHECK(std::string(buffer, bytes_read) == std::string(test_message));
|
CHECK(std::string(buffer, bytes_read) == std::string(test_message));
|
||||||
|
|
||||||
// Cleanup
|
// Cleanup
|
||||||
close(client_fd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(client_fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
server->shutdown();
|
server->shutdown();
|
||||||
server_thread.join();
|
server_thread.join();
|
||||||
{
|
{
|
||||||
|
|||||||
66
todo.md
Normal file
66
todo.md
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
# WeaselDB Todo List
|
||||||
|
|
||||||
|
## 📋 Planned Tasks
|
||||||
|
|
||||||
|
### Core Database Features
|
||||||
|
- [ ] Design commit pipeline architecture with three-stage processing
|
||||||
|
- [ ] Stage 1: Version assignment and precondition validation thread
|
||||||
|
- [ ] Stage 2: Transaction persistence and subscriber streaming thread
|
||||||
|
- [ ] Stage 3: Connection return to server thread
|
||||||
|
- [ ] Use ThreadPipeline for inter-stage communication
|
||||||
|
- [ ] Design persistence interface for pluggable storage backends (S3, local disk)
|
||||||
|
- [ ] Integrate https://git.weaselab.dev/weaselab/conflict-set for optimistic concurrency control
|
||||||
|
- [ ] Design and architect the subscription component for change streams
|
||||||
|
|
||||||
|
### API Endpoints Implementation
|
||||||
|
- [ ] Implement `GET /v1/version` endpoint to return latest committed version and leader
|
||||||
|
- [ ] Implement `POST /v1/commit` endpoint for transaction submission with precondition validation
|
||||||
|
- [ ] Implement `GET /v1/status` endpoint for commit request status lookup by request_id
|
||||||
|
- [ ] Implement `GET /v1/subscribe` endpoint for Server-Sent Events transaction streaming
|
||||||
|
- [ ] Implement `PUT /v1/retention/<policy_id>` endpoint for retention policy management
|
||||||
|
- [ ] Implement `GET /v1/retention/<policy_id>` endpoint for retention policy retrieval
|
||||||
|
- [ ] Implement `GET /v1/retention/` endpoint for listing all retention policies
|
||||||
|
- [ ] Implement `DELETE /v1/retention/<policy_id>` endpoint for retention policy removal
|
||||||
|
|
||||||
|
### Infrastructure & Tooling
|
||||||
|
- [ ] Implement thread-safe Prometheus metrics library and serve `GET /metrics` endpoint
|
||||||
|
- [ ] Implement gperf-based HTTP routing for efficient request dispatching
|
||||||
|
- [ ] Implement HTTP client for S3 interactions
|
||||||
|
- [ ] Implement fake in-process S3 service using separate Server instance with S3 ConnectionHandler
|
||||||
|
- [ ] Use createLocalConnection to get fd for in-process communication
|
||||||
|
- [ ] Implement `ListObjectsV2` API for object enumeration
|
||||||
|
- [ ] Implement `PutObject` with chunked encoding support for streaming uploads
|
||||||
|
- [ ] Add `If-None-Match` conditional header handling for `PutObject`
|
||||||
|
- [ ] Implement `GetObject` for object retrieval
|
||||||
|
- [ ] Add byte range support for `GetObject` (Range header handling)
|
||||||
|
- [ ] Implement `DeleteObjects` for batch object deletion
|
||||||
|
|
||||||
|
### Client Libraries
|
||||||
|
- [ ] Implement high-level Python client library for WeaselDB REST API
|
||||||
|
- [ ] Wrap `/v1/version`, `/v1/commit`, `/v1/status` endpoints
|
||||||
|
- [ ] Handle `/v1/subscribe` SSE streaming with reconnection logic
|
||||||
|
- [ ] Provide idiomatic error handling and retry logic
|
||||||
|
- [ ] Implement Python administrative/operator library for WeaselDB management
|
||||||
|
- [ ] Support retention policy management (`/v1/retention/*` endpoints)
|
||||||
|
- [ ] Include metrics querying and monitoring tools
|
||||||
|
- [ ] Provide CLI tooling for database administration
|
||||||
|
|
||||||
|
### Testing & Validation
|
||||||
|
- [ ] Build out-of-process API test suite using client library over real TCP
|
||||||
|
- [ ] Test all `/v1/version`, `/v1/commit`, `/v1/status` endpoints
|
||||||
|
- [ ] Test `/v1/subscribe` Server-Sent Events streaming
|
||||||
|
- [ ] Test retention policy endpoints (`/v1/retention/*`)
|
||||||
|
- [ ] Test `/metrics` Prometheus endpoint
|
||||||
|
- [ ] Test error conditions and edge cases
|
||||||
|
- [ ] Test concurrent request handling and threading model
|
||||||
|
|
||||||
|
## ✅ Completed Tasks
|
||||||
|
|
||||||
|
*Most recent completions at the top*
|
||||||
|
|
||||||
|
- [x] Built streaming JSON parser for commit requests with high-performance parsing
|
||||||
|
- [x] Implemented HTTP server with multi-threaded networking using multiple epoll instances
|
||||||
|
- [x] Created threading model with pipeline for serial request processing for optimistic concurrency control
|
||||||
|
- [x] Designed connection ownership transfer system to enable the serial processing model
|
||||||
|
- [x] Implemented arena-per-connection memory model for clean memory lifetime management
|
||||||
|
- [x] Built TOML configuration system for server settings
|
||||||
@@ -85,11 +85,19 @@ int getConnectFd(const char *node, const char *service) {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (connect(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
|
int conn_result;
|
||||||
|
do {
|
||||||
|
conn_result = connect(sfd, rp->ai_addr, rp->ai_addrlen);
|
||||||
|
} while (conn_result == -1 && errno == EINTR);
|
||||||
|
|
||||||
|
if (conn_result == 0) {
|
||||||
break; /* Success */
|
break; /* Success */
|
||||||
}
|
}
|
||||||
|
|
||||||
close(sfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(sfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
}
|
}
|
||||||
|
|
||||||
freeaddrinfo(result); /* No longer needed */
|
freeaddrinfo(result); /* No longer needed */
|
||||||
@@ -113,7 +121,10 @@ int getConnectFdUnix(const char *socket_name) {
|
|||||||
memset(&addr, 0, sizeof(addr));
|
memset(&addr, 0, sizeof(addr));
|
||||||
addr.sun_family = AF_UNIX;
|
addr.sun_family = AF_UNIX;
|
||||||
strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path) - 1);
|
strncpy(addr.sun_path, socket_name, sizeof(addr.sun_path) - 1);
|
||||||
int e = connect(sfd, (struct sockaddr *)&addr, sizeof(addr));
|
int e;
|
||||||
|
do {
|
||||||
|
e = connect(sfd, (struct sockaddr *)&addr, sizeof(addr));
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
if (e == -1) {
|
if (e == -1) {
|
||||||
perror("connect");
|
perror("connect");
|
||||||
abort();
|
abort();
|
||||||
@@ -241,7 +252,10 @@ struct Connection {
|
|||||||
bool error = false;
|
bool error = false;
|
||||||
|
|
||||||
~Connection() {
|
~Connection() {
|
||||||
int e = close(fd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(fd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
if (e == -1) {
|
if (e == -1) {
|
||||||
perror("close");
|
perror("close");
|
||||||
abort();
|
abort();
|
||||||
@@ -711,7 +725,9 @@ int main(int argc, char *argv[]) {
|
|||||||
while (!g_shutdown.load(std::memory_order_relaxed)) {
|
while (!g_shutdown.load(std::memory_order_relaxed)) {
|
||||||
int e;
|
int e;
|
||||||
{
|
{
|
||||||
|
do {
|
||||||
e = sem_wait(&connectionLimit);
|
e = sem_wait(&connectionLimit);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
if (e == -1) {
|
if (e == -1) {
|
||||||
perror("sem_wait");
|
perror("sem_wait");
|
||||||
abort();
|
abort();
|
||||||
@@ -814,6 +830,9 @@ int main(int argc, char *argv[]) {
|
|||||||
|
|
||||||
// Clean up epoll file descriptors
|
// Clean up epoll file descriptors
|
||||||
for (int epollfd : g_epoll_fds) {
|
for (int epollfd : g_epoll_fds) {
|
||||||
close(epollfd);
|
int e;
|
||||||
|
do {
|
||||||
|
e = close(epollfd);
|
||||||
|
} while (e == -1 && errno == EINTR);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user