diff --git a/src/config.cpp b/src/config.cpp index 09e17f7..713c98e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -85,6 +85,7 @@ void ConfigParser::parse_server_config(const auto &toml_data, parse_field(srv, "accept_threads", config.accept_threads); parse_field(srv, "network_threads", config.network_threads); parse_field(srv, "event_batch_size", config.event_batch_size); + parse_field(srv, "max_connections", config.max_connections); }); } @@ -156,6 +157,14 @@ bool ConfigParser::validate_config(const Config &config) { valid = false; } + if (config.server.max_connections < 0 || + config.server.max_connections > 100000) { + std::cerr << "Configuration error: server.max_connections must be between " + "0 and 100000, got " + << config.server.max_connections << std::endl; + valid = false; + } + // Validate commit configuration if (config.commit.min_request_id_length < 8 || config.commit.min_request_id_length > 256) { diff --git a/src/config.hpp b/src/config.hpp index 7f19b84..da1210a 100644 --- a/src/config.hpp +++ b/src/config.hpp @@ -22,6 +22,8 @@ struct ServerConfig { int network_threads = 1; /// Event batch size for epoll processing int event_batch_size = 32; + /// Maximum number of concurrent connections (0 = unlimited) + int max_connections = 1000; }; /** diff --git a/src/main.cpp b/src/main.cpp index f8f474d..f1e413e 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -20,6 +20,7 @@ #include std::atomic shutdown_requested{false}; +std::atomic activeConnections{0}; #ifndef __has_feature #define __has_feature(x) 0 @@ -124,9 +125,12 @@ struct Connection { struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6 Connection(struct sockaddr_storage addr, int fd, int64_t id) - : fd(fd), id(id), addr(addr) {} + : fd(fd), id(id), addr(addr) { + activeConnections.fetch_add(1, std::memory_order_relaxed); + } ~Connection() { + activeConnections.fetch_sub(1, std::memory_order_relaxed); int e = close(fd); if (e == -1) { perror("close"); @@ -237,6 +241,8 @@ int main(int argc, char *argv[]) { << std::endl; std::cout << "Event batch size: " << config->server.event_batch_size << std::endl; + std::cout << "Max connections: " << config->server.max_connections + << std::endl; std::cout << "Min request ID length: " << config->commit.min_request_id_length << std::endl; std::cout << "Request ID retention: " @@ -262,6 +268,8 @@ int main(int argc, char *argv[]) { abort(); } + std::atomic connectionId{0}; + // Network threads from configuration int networkThreads = config->server.network_threads; @@ -340,12 +348,11 @@ int main(int argc, char *argv[]) { }); } - std::atomic connectionId{0}; - // Accept threads from configuration int acceptThreads = config->server.accept_threads; for (int i = 0; i < acceptThreads; ++i) { - threads.emplace_back([epollfd, i, sockfd, &connectionId]() { + threads.emplace_back([epollfd, i, sockfd, &connectionId, + max_connections = config->server.max_connections]() { pthread_setname_np(pthread_self(), ("accept-" + std::to_string(i)).c_str()); // Call accept in a loop @@ -361,6 +368,15 @@ int main(int argc, char *argv[]) { perror("accept4"); continue; } + + // Check connection limit (0 means unlimited) + if (max_connections > 0 && + activeConnections.load(std::memory_order_relaxed) >= + max_connections) { + // Reject connection by immediately closing it + close(fd); + continue; + } auto conn = std::make_unique( addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed)); // Transfer new connection to epoll ownership