Files
weaseldb/src/main.cpp
2025-09-03 14:38:10 -04:00

271 lines
8.0 KiB
C++

#include "config.hpp"
#include "connection.hpp"
#include "http_handler.hpp"
#include "metric.hpp"
#include "perfetto_categories.hpp"
#include "process_collector.hpp"
#include "server.hpp"
#include <csignal>
#include <cstring>
#include <fcntl.h>
#include <iostream>
#include <netdb.h>
#include <netinet/tcp.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <unistd.h>
#include <vector>
PERFETTO_TRACK_EVENT_STATIC_STORAGE();
// Global server instance for signal handler access
static Server *g_server = nullptr;
void signal_handler(int sig) {
if (sig == SIGTERM || sig == SIGINT) {
if (g_server) {
g_server->shutdown();
}
}
}
std::vector<int> create_listen_sockets(const weaseldb::Config &config) {
std::vector<int> listen_fds;
// Check if unix socket path is specified
if (!config.server.unix_socket_path.empty()) {
// Create unix socket
int sfd = socket(AF_UNIX, SOCK_STREAM, 0);
if (sfd == -1) {
perror("socket");
std::abort();
}
// Remove existing socket file if it exists
unlink(config.server.unix_socket_path.c_str());
struct sockaddr_un addr;
std::memset(&addr, 0, sizeof(addr));
addr.sun_family = AF_UNIX;
if (config.server.unix_socket_path.length() >= sizeof(addr.sun_path)) {
std::fprintf(stderr, "Unix socket path too long\n");
std::abort();
}
std::strncpy(addr.sun_path, config.server.unix_socket_path.c_str(),
sizeof(addr.sun_path) - 1);
if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
perror("bind");
std::abort();
}
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
std::abort();
}
listen_fds.push_back(sfd);
return listen_fds;
}
// TCP socket creation
struct addrinfo hints;
struct addrinfo *result, *rp;
int s;
std::memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC; /* Allow IPv4 or IPv6 */
hints.ai_socktype = SOCK_STREAM; /* stream socket */
hints.ai_flags = AI_PASSIVE; /* For wildcard IP address */
hints.ai_protocol = 0; /* Any protocol */
hints.ai_canonname = nullptr;
hints.ai_addr = nullptr;
hints.ai_next = nullptr;
s = getaddrinfo(config.server.bind_address.c_str(),
std::to_string(config.server.port).c_str(), &hints, &result);
if (s != 0) {
std::fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(s));
std::abort();
}
int sfd = -1;
for (rp = result; rp != nullptr; rp = rp->ai_next) {
sfd = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1) {
continue;
}
int val = 1;
if (setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) == -1) {
perror("setsockopt SO_REUSEADDR");
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (SO_REUSEADDR failed)");
std::abort();
}
continue;
}
// Enable TCP_NODELAY for low latency (only for TCP sockets)
if (rp->ai_family == AF_INET || rp->ai_family == AF_INET6) {
if (setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val)) == -1) {
perror("setsockopt TCP_NODELAY");
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (TCP_NODELAY failed)");
std::abort();
}
continue;
}
}
if (bind(sfd, rp->ai_addr, rp->ai_addrlen) == 0) {
break; /* Success */
}
int e = close(sfd);
if (e == -1 && errno != EINTR) {
perror("close sfd (bind failed)");
std::abort();
}
sfd = -1;
}
freeaddrinfo(result);
if (rp == nullptr || sfd == -1) {
std::fprintf(stderr, "Could not bind to any address\n");
std::abort();
}
if (listen(sfd, SOMAXCONN) == -1) {
perror("listen");
std::abort();
}
listen_fds.push_back(sfd);
return listen_fds;
}
void print_help(const char *program_name) {
std::cout << "WeaselDB - High-performance write-side database component\n\n";
std::cout << "Usage: " << program_name << " [OPTIONS]\n\n";
std::cout << "Options:\n";
std::cout << " --config <file> Path to TOML configuration file (default: "
"config.toml)\n";
std::cout << " --help Show this help message and exit\n";
std::cout << " -h Show this help message and exit\n\n";
std::cout << "Examples:\n";
std::cout << " " << program_name
<< " # Use default config.toml\n";
std::cout << " " << program_name
<< " --config my.toml # Use custom config file\n";
std::cout << " " << program_name
<< " --help # Show this help\n\n";
std::cout << "For configuration documentation, see config.md\n";
}
int main(int argc, char *argv[]) {
#if ENABLE_PERFETTO
perfetto::TracingInitArgs args;
args.backends |= perfetto::kSystemBackend;
perfetto::Tracing::Initialize(args);
perfetto::TrackEvent::Register();
#endif
// Register the process collector for default metrics.
metric::register_collector(std::make_shared<ProcessCollector>());
std::string config_file = "config.toml";
// Parse command line arguments
for (int i = 1; i < argc; i++) {
std::string arg = argv[i];
if (arg == "--help" || arg == "-h") {
print_help(argv[0]);
return 0;
} else if (arg == "--config") {
if (i + 1 >= argc) {
std::cerr << "Error: --config requires a filename argument\n";
print_help(argv[0]);
return 1;
}
config_file = argv[++i];
} else if (arg.starts_with("--config=")) {
config_file = arg.substr(9);
} else {
// For backward compatibility, treat lone argument as config file
if (argc == 2) {
config_file = arg;
} else {
std::cerr << "Error: Unknown argument '" << arg << "'\n";
print_help(argv[0]);
return 1;
}
}
}
auto config = weaseldb::ConfigParser::load_from_file(config_file);
if (!config) {
std::cerr << "Failed to load config from: " << config_file << std::endl;
std::cerr << "Using default configuration..." << std::endl;
config = weaseldb::Config{};
}
std::cout << "Configuration loaded successfully:" << std::endl;
if (!config->server.unix_socket_path.empty()) {
std::cout << "Unix socket path: " << config->server.unix_socket_path
<< std::endl;
} else {
std::cout << "Server bind address: " << config->server.bind_address
<< std::endl;
std::cout << "Server port: " << config->server.port << std::endl;
}
std::cout << "Max request size: " << config->server.max_request_size_bytes
<< " bytes" << std::endl;
std::cout << "I/O threads: " << config->server.io_threads << std::endl;
std::cout << "Epoll instances: " << config->server.epoll_instances
<< std::endl;
std::cout << "Event batch size: " << config->server.event_batch_size
<< std::endl;
std::cout << "Max connections: " << config->server.max_connections
<< std::endl;
std::cout << "Read buffer size: " << config->server.read_buffer_size
<< " bytes" << std::endl;
std::cout << "Min request ID length: " << config->commit.min_request_id_length
<< std::endl;
std::cout << "Request ID retention: "
<< config->commit.request_id_retention_hours.count() << " hours"
<< std::endl;
std::cout << "Subscription buffer size: "
<< config->subscription.max_buffer_size_bytes << " bytes"
<< std::endl;
std::cout << "Keepalive interval: "
<< config->subscription.keepalive_interval.count() << " seconds"
<< std::endl;
// Create listen sockets
std::vector<int> listen_fds = create_listen_sockets(*config);
// Create handler and server
HttpHandler http_handler;
auto server = Server::create(*config, http_handler, listen_fds);
g_server = server.get();
// Setup signal handling
std::signal(SIGPIPE, SIG_IGN);
std::signal(SIGTERM, signal_handler);
std::signal(SIGINT, signal_handler);
std::cout << "Starting WeaselDB HTTP server..." << std::endl;
server->run();
std::cout << "Server shutdown complete." << std::endl;
return 0;
}