Max connection limit
This commit is contained in:
@@ -85,6 +85,7 @@ void ConfigParser::parse_server_config(const auto &toml_data,
|
|||||||
parse_field(srv, "accept_threads", config.accept_threads);
|
parse_field(srv, "accept_threads", config.accept_threads);
|
||||||
parse_field(srv, "network_threads", config.network_threads);
|
parse_field(srv, "network_threads", config.network_threads);
|
||||||
parse_field(srv, "event_batch_size", config.event_batch_size);
|
parse_field(srv, "event_batch_size", config.event_batch_size);
|
||||||
|
parse_field(srv, "max_connections", config.max_connections);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -156,6 +157,14 @@ bool ConfigParser::validate_config(const Config &config) {
|
|||||||
valid = false;
|
valid = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.server.max_connections < 0 ||
|
||||||
|
config.server.max_connections > 100000) {
|
||||||
|
std::cerr << "Configuration error: server.max_connections must be between "
|
||||||
|
"0 and 100000, got "
|
||||||
|
<< config.server.max_connections << std::endl;
|
||||||
|
valid = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Validate commit configuration
|
// Validate commit configuration
|
||||||
if (config.commit.min_request_id_length < 8 ||
|
if (config.commit.min_request_id_length < 8 ||
|
||||||
config.commit.min_request_id_length > 256) {
|
config.commit.min_request_id_length > 256) {
|
||||||
|
|||||||
@@ -22,6 +22,8 @@ struct ServerConfig {
|
|||||||
int network_threads = 1;
|
int network_threads = 1;
|
||||||
/// Event batch size for epoll processing
|
/// Event batch size for epoll processing
|
||||||
int event_batch_size = 32;
|
int event_batch_size = 32;
|
||||||
|
/// Maximum number of concurrent connections (0 = unlimited)
|
||||||
|
int max_connections = 1000;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
24
src/main.cpp
24
src/main.cpp
@@ -20,6 +20,7 @@
|
|||||||
#include <vector>
|
#include <vector>
|
||||||
|
|
||||||
std::atomic<bool> shutdown_requested{false};
|
std::atomic<bool> shutdown_requested{false};
|
||||||
|
std::atomic<int> activeConnections{0};
|
||||||
|
|
||||||
#ifndef __has_feature
|
#ifndef __has_feature
|
||||||
#define __has_feature(x) 0
|
#define __has_feature(x) 0
|
||||||
@@ -124,9 +125,12 @@ struct Connection {
|
|||||||
struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6
|
struct sockaddr_storage addr; // sockaddr_storage handles IPv4/IPv6
|
||||||
|
|
||||||
Connection(struct sockaddr_storage addr, int fd, int64_t id)
|
Connection(struct sockaddr_storage addr, int fd, int64_t id)
|
||||||
: fd(fd), id(id), addr(addr) {}
|
: fd(fd), id(id), addr(addr) {
|
||||||
|
activeConnections.fetch_add(1, std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
~Connection() {
|
~Connection() {
|
||||||
|
activeConnections.fetch_sub(1, std::memory_order_relaxed);
|
||||||
int e = close(fd);
|
int e = close(fd);
|
||||||
if (e == -1) {
|
if (e == -1) {
|
||||||
perror("close");
|
perror("close");
|
||||||
@@ -237,6 +241,8 @@ int main(int argc, char *argv[]) {
|
|||||||
<< std::endl;
|
<< std::endl;
|
||||||
std::cout << "Event batch size: " << config->server.event_batch_size
|
std::cout << "Event batch size: " << config->server.event_batch_size
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
std::cout << "Max connections: " << config->server.max_connections
|
||||||
|
<< std::endl;
|
||||||
std::cout << "Min request ID length: " << config->commit.min_request_id_length
|
std::cout << "Min request ID length: " << config->commit.min_request_id_length
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
std::cout << "Request ID retention: "
|
std::cout << "Request ID retention: "
|
||||||
@@ -262,6 +268,8 @@ int main(int argc, char *argv[]) {
|
|||||||
abort();
|
abort();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::atomic<int64_t> connectionId{0};
|
||||||
|
|
||||||
// Network threads from configuration
|
// Network threads from configuration
|
||||||
int networkThreads = config->server.network_threads;
|
int networkThreads = config->server.network_threads;
|
||||||
|
|
||||||
@@ -340,12 +348,11 @@ int main(int argc, char *argv[]) {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
std::atomic<int64_t> connectionId{0};
|
|
||||||
|
|
||||||
// Accept threads from configuration
|
// Accept threads from configuration
|
||||||
int acceptThreads = config->server.accept_threads;
|
int acceptThreads = config->server.accept_threads;
|
||||||
for (int i = 0; i < acceptThreads; ++i) {
|
for (int i = 0; i < acceptThreads; ++i) {
|
||||||
threads.emplace_back([epollfd, i, sockfd, &connectionId]() {
|
threads.emplace_back([epollfd, i, sockfd, &connectionId,
|
||||||
|
max_connections = config->server.max_connections]() {
|
||||||
pthread_setname_np(pthread_self(),
|
pthread_setname_np(pthread_self(),
|
||||||
("accept-" + std::to_string(i)).c_str());
|
("accept-" + std::to_string(i)).c_str());
|
||||||
// Call accept in a loop
|
// Call accept in a loop
|
||||||
@@ -361,6 +368,15 @@ int main(int argc, char *argv[]) {
|
|||||||
perror("accept4");
|
perror("accept4");
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check connection limit (0 means unlimited)
|
||||||
|
if (max_connections > 0 &&
|
||||||
|
activeConnections.load(std::memory_order_relaxed) >=
|
||||||
|
max_connections) {
|
||||||
|
// Reject connection by immediately closing it
|
||||||
|
close(fd);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
auto conn = std::make_unique<Connection>(
|
auto conn = std::make_unique<Connection>(
|
||||||
addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed));
|
addr, fd, connectionId.fetch_add(1, std::memory_order_relaxed));
|
||||||
// Transfer new connection to epoll ownership
|
// Transfer new connection to epoll ownership
|
||||||
|
|||||||
Reference in New Issue
Block a user