Configurable read buffer size
This commit is contained in:
@@ -86,6 +86,27 @@ FetchContent_MakeAvailable(llhttp)
|
|||||||
|
|
||||||
include_directories(src)
|
include_directories(src)
|
||||||
|
|
||||||
|
# Check for VLA (Variable Length Array) support
|
||||||
|
include(CheckCXXSourceCompiles)
|
||||||
|
check_cxx_source_compiles(
|
||||||
|
"
|
||||||
|
int main() {
|
||||||
|
int n = 10;
|
||||||
|
char arr[n];
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
"
|
||||||
|
HAVE_VLA_SUPPORT)
|
||||||
|
|
||||||
|
if(NOT HAVE_VLA_SUPPORT)
|
||||||
|
message(
|
||||||
|
FATAL_ERROR
|
||||||
|
"Compiler must support Variable Length Arrays (VLA). Please use GCC or Clang."
|
||||||
|
)
|
||||||
|
endif()
|
||||||
|
|
||||||
|
message(STATUS "Compiler supports Variable Length Arrays")
|
||||||
|
|
||||||
find_package(weaseljson REQUIRED)
|
find_package(weaseljson REQUIRED)
|
||||||
|
|
||||||
# Generate JSON token hash table using gperf
|
# Generate JSON token hash table using gperf
|
||||||
|
|||||||
@@ -86,6 +86,7 @@ void ConfigParser::parse_server_config(const auto &toml_data,
|
|||||||
parse_field(srv, "network_threads", config.network_threads);
|
parse_field(srv, "network_threads", config.network_threads);
|
||||||
parse_field(srv, "event_batch_size", config.event_batch_size);
|
parse_field(srv, "event_batch_size", config.event_batch_size);
|
||||||
parse_field(srv, "max_connections", config.max_connections);
|
parse_field(srv, "max_connections", config.max_connections);
|
||||||
|
parse_field(srv, "read_buffer_size", config.read_buffer_size);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -165,6 +166,14 @@ bool ConfigParser::validate_config(const Config &config) {
|
|||||||
valid = false;
|
valid = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.server.read_buffer_size < 1024 ||
|
||||||
|
config.server.read_buffer_size > 1024 * 1024) { // 1KB to 1MB
|
||||||
|
std::cerr << "Configuration error: server.read_buffer_size must be between "
|
||||||
|
"1024 and 1048576 bytes, got "
|
||||||
|
<< config.server.read_buffer_size << std::endl;
|
||||||
|
valid = false;
|
||||||
|
}
|
||||||
|
|
||||||
// Validate commit configuration
|
// Validate commit configuration
|
||||||
if (config.commit.min_request_id_length < 8 ||
|
if (config.commit.min_request_id_length < 8 ||
|
||||||
config.commit.min_request_id_length > 256) {
|
config.commit.min_request_id_length > 256) {
|
||||||
|
|||||||
@@ -24,6 +24,8 @@ struct ServerConfig {
|
|||||||
int event_batch_size = 32;
|
int event_batch_size = 32;
|
||||||
/// Maximum number of concurrent connections (0 = unlimited)
|
/// Maximum number of concurrent connections (0 = unlimited)
|
||||||
int max_connections = 1000;
|
int max_connections = 1000;
|
||||||
|
/// Buffer size for reading from socket connections (default: 16KB)
|
||||||
|
size_t read_buffer_size = 16 * 1024;
|
||||||
};
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
64
src/main.cpp
64
src/main.cpp
@@ -161,13 +161,12 @@ struct Connection {
|
|||||||
// response
|
// response
|
||||||
bool closeConnection{false};
|
bool closeConnection{false};
|
||||||
|
|
||||||
bool readBytes(size_t max_request_size) {
|
bool readBytes(size_t max_request_size, size_t buffer_size) {
|
||||||
// Use smaller buffer size but respect max request size
|
// Use Variable Length Array for optimal stack allocation
|
||||||
// TODO revisit
|
char buf[buffer_size];
|
||||||
size_t buf_size = std::min(size_t(4096), max_request_size);
|
|
||||||
std::vector<char> buf(buf_size);
|
|
||||||
for (;;) {
|
for (;;) {
|
||||||
int r = read(fd, buf.data(), buf.size());
|
int r = read(fd, buf, buffer_size);
|
||||||
if (r == -1) {
|
if (r == -1) {
|
||||||
if (errno == EINTR) {
|
if (errno == EINTR) {
|
||||||
continue;
|
continue;
|
||||||
@@ -183,7 +182,7 @@ struct Connection {
|
|||||||
}
|
}
|
||||||
// "pump parser"
|
// "pump parser"
|
||||||
// TODO revisit
|
// TODO revisit
|
||||||
appendMessage({buf.data(), size_t(r)});
|
appendMessage({buf, size_t(r)});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -261,11 +260,53 @@ struct Connection {
|
|||||||
#endif
|
#endif
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void print_help(const char *program_name) {
|
||||||
|
std::cout << "WeaselDB - High-performance write-side database component\n\n";
|
||||||
|
std::cout << "Usage: " << program_name << " [OPTIONS]\n\n";
|
||||||
|
std::cout << "Options:\n";
|
||||||
|
std::cout << " --config <file> Path to TOML configuration file (default: "
|
||||||
|
"config.toml)\n";
|
||||||
|
std::cout << " --help Show this help message and exit\n";
|
||||||
|
std::cout << " -h Show this help message and exit\n\n";
|
||||||
|
std::cout << "Examples:\n";
|
||||||
|
std::cout << " " << program_name
|
||||||
|
<< " # Use default config.toml\n";
|
||||||
|
std::cout << " " << program_name
|
||||||
|
<< " --config my.toml # Use custom config file\n";
|
||||||
|
std::cout << " " << program_name
|
||||||
|
<< " --help # Show this help\n\n";
|
||||||
|
std::cout << "For configuration documentation, see config.md\n";
|
||||||
|
}
|
||||||
|
|
||||||
int main(int argc, char *argv[]) {
|
int main(int argc, char *argv[]) {
|
||||||
std::string config_file = "config.toml";
|
std::string config_file = "config.toml";
|
||||||
|
|
||||||
if (argc > 1) {
|
// Parse command line arguments
|
||||||
config_file = argv[1];
|
for (int i = 1; i < argc; i++) {
|
||||||
|
std::string arg = argv[i];
|
||||||
|
|
||||||
|
if (arg == "--help" || arg == "-h") {
|
||||||
|
print_help(argv[0]);
|
||||||
|
return 0;
|
||||||
|
} else if (arg == "--config") {
|
||||||
|
if (i + 1 >= argc) {
|
||||||
|
std::cerr << "Error: --config requires a filename argument\n";
|
||||||
|
print_help(argv[0]);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
config_file = argv[++i];
|
||||||
|
} else if (arg.starts_with("--config=")) {
|
||||||
|
config_file = arg.substr(9);
|
||||||
|
} else {
|
||||||
|
// For backward compatibility, treat lone argument as config file
|
||||||
|
if (argc == 2) {
|
||||||
|
config_file = arg;
|
||||||
|
} else {
|
||||||
|
std::cerr << "Error: Unknown argument '" << arg << "'\n";
|
||||||
|
print_help(argv[0]);
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
auto config = weaseldb::ConfigParser::load_from_file(config_file);
|
auto config = weaseldb::ConfigParser::load_from_file(config_file);
|
||||||
@@ -289,6 +330,8 @@ int main(int argc, char *argv[]) {
|
|||||||
<< std::endl;
|
<< std::endl;
|
||||||
std::cout << "Max connections: " << config->server.max_connections
|
std::cout << "Max connections: " << config->server.max_connections
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
|
std::cout << "Read buffer size: " << config->server.read_buffer_size
|
||||||
|
<< " bytes" << std::endl;
|
||||||
std::cout << "Min request ID length: " << config->commit.min_request_id_length
|
std::cout << "Min request ID length: " << config->commit.min_request_id_length
|
||||||
<< std::endl;
|
<< std::endl;
|
||||||
std::cout << "Request ID retention: "
|
std::cout << "Request ID retention: "
|
||||||
@@ -345,6 +388,7 @@ int main(int argc, char *argv[]) {
|
|||||||
threads.emplace_back(
|
threads.emplace_back(
|
||||||
[network_epollfd, networkThreadId,
|
[network_epollfd, networkThreadId,
|
||||||
max_request_size = config->server.max_request_size_bytes,
|
max_request_size = config->server.max_request_size_bytes,
|
||||||
|
read_buffer_size = config->server.read_buffer_size,
|
||||||
event_batch_size = config->server.event_batch_size]() {
|
event_batch_size = config->server.event_batch_size]() {
|
||||||
pthread_setname_np(
|
pthread_setname_np(
|
||||||
pthread_self(),
|
pthread_self(),
|
||||||
@@ -388,7 +432,7 @@ int main(int argc, char *argv[]) {
|
|||||||
(events[i].events & EPOLLOUT)));
|
(events[i].events & EPOLLOUT)));
|
||||||
|
|
||||||
if (events[i].events & EPOLLIN) {
|
if (events[i].events & EPOLLIN) {
|
||||||
bool done = conn->readBytes(max_request_size);
|
bool done = conn->readBytes(max_request_size, read_buffer_size);
|
||||||
if (done) {
|
if (done) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user