diff --git a/CMakeLists.txt b/CMakeLists.txt index 437ffe3..d5e705f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,10 +38,12 @@ FetchContent_MakeAvailable(nanobench) include_directories(src) -set(SOURCES src/main.cpp src/config.cpp) +find_package(weaseljson REQUIRED) + +set(SOURCES src/main.cpp src/config.cpp src/commit_request.cpp) add_executable(weaseldb ${SOURCES}) -target_link_libraries(weaseldb Threads::Threads toml11::toml11) +target_link_libraries(weaseldb Threads::Threads toml11::toml11 weaseljson) enable_testing() @@ -49,9 +51,15 @@ add_executable(test_arena_allocator tests/test_arena_allocator.cpp) target_link_libraries(test_arena_allocator doctest::doctest) target_include_directories(test_arena_allocator PRIVATE src) +add_executable(test_commit_request tests/test_commit_request.cpp + src/commit_request.cpp) +target_link_libraries(test_commit_request doctest::doctest weaseljson) +target_include_directories(test_commit_request PRIVATE src) + add_executable(bench_arena_allocator benchmarks/bench_arena_allocator.cpp) target_link_libraries(bench_arena_allocator nanobench) target_include_directories(bench_arena_allocator PRIVATE src) add_test(NAME arena_allocator_tests COMMAND test_arena_allocator) +add_test(NAME commit_request_tests COMMAND test_commit_request) add_test(NAME arena_allocator_benchmarks COMMAND bench_arena_allocator) diff --git a/src/commit_request.cpp b/src/commit_request.cpp new file mode 100644 index 0000000..7c0706b --- /dev/null +++ b/src/commit_request.cpp @@ -0,0 +1,443 @@ +#include "commit_request.hpp" +#include +#include +#include + +namespace { +// Global callbacks for JSON parsing +const WeaselJsonCallbacks json_callbacks = { + .on_begin_object = CommitRequest::on_begin_object, + .on_end_object = CommitRequest::on_end_object, + .on_string_data = CommitRequest::on_string_data, + .on_key_data = CommitRequest::on_key_data, + .on_begin_array = CommitRequest::on_begin_array, + .on_end_array = CommitRequest::on_end_array, + .on_number_data = CommitRequest::on_number_data, + .on_true_literal = CommitRequest::on_true_literal, + .on_false_literal = CommitRequest::on_false_literal, + .on_null_literal = CommitRequest::on_null_literal, +}; +// Base64 decoding table +constexpr std::array make_base64_decode_table() { + std::array table{}; + for (int i = 0; i < 256; ++i) { + table[i] = -1; + } + for (int i = 0; i < 26; ++i) { + table['A' + i] = i; + table['a' + i] = i + 26; + } + for (int i = 0; i < 10; ++i) { + table['0' + i] = i + 52; + } + table['+'] = 62; + table['/'] = 63; + table['='] = -2; // Padding + return table; +} + +constexpr auto base64_decode_table = make_base64_decode_table(); +} // namespace + +std::string_view CommitRequest::store_string(std::string_view str) { + if (str.empty()) { + return {}; + } + + char *arena_str = static_cast(arena_.allocate(str.size())); + std::memcpy(arena_str, str.data(), str.size()); + + return std::string_view(arena_str, str.size()); +} + +std::string_view CommitRequest::decode_base64(std::string_view base64_str) { + if (base64_str.empty()) { + return {}; + } + + // Remove padding for size calculation + size_t input_len = base64_str.size(); + while (input_len > 0 && base64_str[input_len - 1] == '=') { + input_len--; + } + + // Calculate output size + size_t output_len = (input_len * 3) / 4; + + if (output_len == 0) { + return {}; + } + + char *output = static_cast(arena_.allocate(output_len)); + if (!output) { + return {}; + } + + size_t out_pos = 0; + int bits_collected = 0; + int accumulator = 0; + + for (char c : base64_str.substr(0, input_len)) { + int value = base64_decode_table[static_cast(c)]; + if (value < 0) { + return {}; // Invalid character + } + + accumulator = (accumulator << 6) | value; + bits_collected += 6; + + if (bits_collected >= 8) { + bits_collected -= 8; + if (out_pos < output_len) { + output[out_pos++] = + static_cast((accumulator >> bits_collected) & 0xFF); + } + } + } + + return std::string_view(output, out_pos); +} + +void CommitRequest::on_begin_object(void *userdata) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error) + return; + + if (ctx.state_stack.empty()) { + ctx.parse_error = true; + return; + } + + ParseState current_state = ctx.state_stack.top(); + + switch (current_state) { + case ParseState::Root: + // Expected - this is the main object + break; + case ParseState::PreconditionsArray: + ctx.state_stack.push(ParseState::PreconditionObject); + ctx.current_precondition = Precondition{}; + break; + case ParseState::OperationsArray: + ctx.state_stack.push(ParseState::OperationObject); + ctx.current_operation = Operation{}; + break; + default: + ctx.parse_error = true; + break; + } +} + +void CommitRequest::on_end_object(void *userdata) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error || ctx.state_stack.empty()) { + ctx.parse_error = true; + return; + } + + ParseState current_state = ctx.state_stack.top(); + ctx.state_stack.pop(); + + switch (current_state) { + case ParseState::Root: + // We're done parsing the root object + ctx.parse_complete = true; + break; + case ParseState::PreconditionObject: + self->preconditions_.push_back(ctx.current_precondition); + break; + case ParseState::OperationObject: + self->operations_.push_back(ctx.current_operation); + break; + default: + break; + } +} + +void CommitRequest::on_string_data(void *userdata, const char *buf, int len, + int done) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error) + return; + + ctx.current_string.append(buf, len); + + if (done) { + self->handle_completed_string(); + ctx.current_string.clear(); + ctx.current_key.clear(); // Clear key after processing value + } +} + +void CommitRequest::on_key_data(void *userdata, const char *buf, int len, + int done) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error) + return; + + if (!done) { + ctx.current_key.append(buf, len); + } else { + ctx.current_key.append(buf, len); + self->handle_completed_key(); + } +} + +void CommitRequest::on_begin_array(void *userdata) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error) + return; + + if (ctx.current_key == "preconditions") { + ctx.state_stack.push(ParseState::PreconditionsArray); + } else if (ctx.current_key == "operations") { + ctx.state_stack.push(ParseState::OperationsArray); + } else { + ctx.parse_error = true; + } +} + +void CommitRequest::on_end_array(void *userdata) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error || ctx.state_stack.empty()) { + ctx.parse_error = true; + return; + } + + ctx.state_stack.pop(); +} + +void CommitRequest::on_number_data(void *userdata, const char *buf, int len, + int done) { + auto *self = static_cast(userdata); + auto &ctx = self->parser_context_; + + if (ctx.parse_error) + return; + + ctx.current_number.append(buf, len); + + if (done) { + self->handle_completed_number(); + ctx.current_number.clear(); + ctx.current_key.clear(); // Clear key after processing value + } +} + +void CommitRequest::on_true_literal(void *userdata) { + // Not used in this API +} + +void CommitRequest::on_false_literal(void *userdata) { + // Not used in this API +} + +void CommitRequest::on_null_literal(void *userdata) { + // Not used in this API +} + +void CommitRequest::handle_completed_string() { + auto &ctx = parser_context_; + + if (ctx.state_stack.empty()) { + ctx.parse_error = true; + return; + } + + ParseState current_state = ctx.state_stack.top(); + + switch (current_state) { + case ParseState::Root: + if (ctx.current_key == "request_id") { + request_id_ = store_string(ctx.current_string); + } else if (ctx.current_key == "leader_id") { + leader_id_ = store_string(ctx.current_string); + } else if (ctx.current_key == "read_version") { + // read_version should be a number, not a string + ctx.parse_error = true; + } + break; + case ParseState::PreconditionObject: + if (ctx.current_key == "type") { + if (ctx.current_string == "point_read") { + ctx.current_precondition.type = Precondition::Type::PointRead; + } else if (ctx.current_string == "range_read") { + ctx.current_precondition.type = Precondition::Type::RangeRead; + } else { + ctx.parse_error = true; + } + } else if (ctx.current_key == "key") { + ctx.current_precondition.key = decode_base64(ctx.current_string); + } else if (ctx.current_key == "begin") { + ctx.current_precondition.begin = decode_base64(ctx.current_string); + } else if (ctx.current_key == "end") { + ctx.current_precondition.end = decode_base64(ctx.current_string); + } + break; + case ParseState::OperationObject: + if (ctx.current_key == "type") { + if (ctx.current_string == "write") { + ctx.current_operation.type = Operation::Type::Write; + } else if (ctx.current_string == "delete") { + ctx.current_operation.type = Operation::Type::Delete; + } else if (ctx.current_string == "range_delete") { + ctx.current_operation.type = Operation::Type::RangeDelete; + } else { + ctx.parse_error = true; + } + } else if (ctx.current_key == "key") { + ctx.current_operation.key = decode_base64(ctx.current_string); + } else if (ctx.current_key == "value") { + ctx.current_operation.value = decode_base64(ctx.current_string); + } else if (ctx.current_key == "begin") { + ctx.current_operation.begin = decode_base64(ctx.current_string); + } else if (ctx.current_key == "end") { + ctx.current_operation.end = decode_base64(ctx.current_string); + } + break; + default: + break; + } +} + +void CommitRequest::handle_completed_number() { + auto &ctx = parser_context_; + + if (ctx.state_stack.empty()) { + ctx.parse_error = true; + return; + } + + ParseState current_state = ctx.state_stack.top(); + + switch (current_state) { + case ParseState::Root: + if (ctx.current_key == "read_version") { + uint64_t version; + auto result = std::from_chars( + ctx.current_number.data(), + ctx.current_number.data() + ctx.current_number.size(), version); + if (result.ec == std::errc{}) { + read_version_ = version; + } else { + ctx.parse_error = true; + } + } + break; + case ParseState::PreconditionObject: + if (ctx.current_key == "version") { + uint64_t version; + auto result = std::from_chars( + ctx.current_number.data(), + ctx.current_number.data() + ctx.current_number.size(), version); + if (result.ec == std::errc{}) { + ctx.current_precondition.version = version; + } else { + ctx.parse_error = true; + } + } + break; + default: + break; + } +} + +void CommitRequest::handle_completed_key() { + // Key is now available in current_key for the next value + // No immediate action needed as we'll use it when processing values +} + +bool CommitRequest::parse_json(std::string_view json_str) { + reset(); + + WeaselJsonParser *parser = + WeaselJsonParser_create(64, &json_callbacks, this, 0); + if (!parser) { + return false; + } + + // Make a mutable copy of the JSON string for parsing + std::string mutable_json(json_str); + + WeaselJsonStatus status = + WeaselJsonParser_parse(parser, mutable_json.data(), mutable_json.size()); + if (status == WeaselJson_OK || status == WeaselJson_AGAIN) { + // End of input + status = WeaselJsonParser_parse(parser, nullptr, 0); + } + + bool success = (status == WeaselJson_OK) && !parser_context_.parse_error; + + WeaselJsonParser_destroy(parser); + + return success; +} + +bool CommitRequest::begin_streaming_parse() { + reset(); + + json_parser_ = WeaselJsonParser_create(64, &json_callbacks, this, 0); + return json_parser_ != nullptr; +} + +CommitRequest::ParseStatus CommitRequest::parse_chunk(char *data, size_t len) { + if (!json_parser_) { + return ParseStatus::Error; + } + + if (parser_context_.parse_error) { + return ParseStatus::Error; + } + + if (parser_context_.parse_complete) { + return ParseStatus::Complete; + } + + WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, data, len); + + switch (status) { + case WeaselJson_OK: + // WeaselJson_OK means parsing is complete + return ParseStatus::Complete; + case WeaselJson_AGAIN: + return ParseStatus::Incomplete; + case WeaselJson_REJECT: + case WeaselJson_OVERFLOW: + default: + parser_context_.parse_error = true; + return ParseStatus::Error; + } +} + +CommitRequest::ParseStatus CommitRequest::finish_streaming_parse() { + if (!json_parser_) { + return ParseStatus::Error; + } + + if (parser_context_.parse_error) { + return ParseStatus::Error; + } + + // Signal end of input + WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, nullptr, 0); + + if (status == WeaselJson_OK && parser_context_.parse_complete && + !parser_context_.parse_error) { + return ParseStatus::Complete; + } else { + parser_context_.parse_error = true; + return ParseStatus::Error; + } +} \ No newline at end of file diff --git a/src/commit_request.hpp b/src/commit_request.hpp new file mode 100644 index 0000000..511c7e5 --- /dev/null +++ b/src/commit_request.hpp @@ -0,0 +1,282 @@ +#pragma once + +#include "arena_allocator.hpp" +#include +#include +#include +#include +#include +#include + +/** + * @brief Represents a precondition for a commit request. + */ +struct Precondition { + enum class Type { PointRead, RangeRead }; + + Type type; + std::optional version; + std::string_view key; + std::optional begin; + std::optional end; +}; + +/** + * @brief Represents an operation in a commit request. + */ +struct Operation { + enum class Type { Write, Delete, RangeDelete }; + + Type type; + std::string_view key; + std::optional value; + std::optional begin; + std::optional end; +}; + +/** + * @brief Represents a commit request as described in the API specification. + * + * All string data is stored in the arena allocator to ensure efficient + * memory management and ownership. + */ +class CommitRequest { +public: + // Parser state + enum class ParseState { + Root, + RequestId, + LeaderId, + ReadVersion, + PreconditionsArray, + PreconditionObject, + OperationsArray, + OperationObject + }; + + enum class ParseStatus { + Incomplete, // Still need more data + Complete, // Successfully parsed complete JSON + Error // Parse error occurred + }; + + struct ParserContext { + std::stack state_stack; + std::string current_key; + std::string current_string; + std::string current_number; + bool in_key = false; + bool parse_error = false; + bool parse_complete = false; + + // Current objects being parsed + Precondition current_precondition{}; + Operation current_operation{}; + + // Parsing state for nested structures + std::string precondition_type; + std::string operation_type; + }; + +private: + ArenaAllocator arena_; + std::optional request_id_; + std::string_view leader_id_; + uint64_t read_version_ = 0; + std::vector preconditions_; + std::vector operations_; + ParserContext parser_context_; + WeaselJsonParser *json_parser_ = nullptr; + +public: + /** + * @brief Construct a new CommitRequest with the given initial arena size. + * @param arena_size Initial size for the arena allocator + */ + explicit CommitRequest(size_t arena_size = 4096) : arena_(arena_size) {} + + /** + * @brief Destructor - cleans up any active parser. + */ + ~CommitRequest() { + if (json_parser_) { + WeaselJsonParser_destroy(json_parser_); + } + } + + // Move constructor + CommitRequest(CommitRequest &&other) noexcept + : arena_(std::move(other.arena_)), request_id_(other.request_id_), + leader_id_(other.leader_id_), read_version_(other.read_version_), + preconditions_(std::move(other.preconditions_)), + operations_(std::move(other.operations_)), + parser_context_(std::move(other.parser_context_)), + json_parser_(other.json_parser_) { + other.json_parser_ = nullptr; + } + + // Move assignment operator + CommitRequest &operator=(CommitRequest &&other) noexcept { + if (this != &other) { + if (json_parser_) { + WeaselJsonParser_destroy(json_parser_); + } + + arena_ = std::move(other.arena_); + request_id_ = other.request_id_; + leader_id_ = other.leader_id_; + read_version_ = other.read_version_; + preconditions_ = std::move(other.preconditions_); + operations_ = std::move(other.operations_); + parser_context_ = std::move(other.parser_context_); + json_parser_ = other.json_parser_; + + other.json_parser_ = nullptr; + } + return *this; + } + + // Copy constructor and assignment are deleted (not safe with parser state) + CommitRequest(const CommitRequest &) = delete; + CommitRequest &operator=(const CommitRequest &) = delete; + + /** + * @brief Parse a JSON string into a CommitRequest object (one-shot parsing). + * @param json_str The JSON string to parse + * @return true if parsing succeeded, false otherwise + */ + bool parse_json(std::string_view json_str); + + /** + * @brief Initialize streaming JSON parsing. + * @return true if initialization succeeded, false otherwise + */ + bool begin_streaming_parse(); + + /** + * @brief Parse additional JSON data incrementally. + * @param data Pointer to the data buffer + * @param len Length of the data + * @return ParseStatus indicating current parse state + */ + ParseStatus parse_chunk(char *data, size_t len); + + /** + * @brief Finish streaming parse (call when no more data is available). + * @return ParseStatus indicating final parse result + */ + ParseStatus finish_streaming_parse(); + + /** + * @brief Check if parsing is complete and successful. + * @return true if parsing is complete and successful + */ + bool is_parse_complete() const { + return parser_context_.parse_complete && !parser_context_.parse_error; + } + + /** + * @brief Check if there was a parse error. + * @return true if there was a parse error + */ + bool has_parse_error() const { return parser_context_.parse_error; } + + /** + * @brief Get the request ID if present. + * @return Optional request ID + */ + const std::optional &request_id() const { + return request_id_; + } + + /** + * @brief Get the leader ID. + * @return Leader ID string view + */ + std::string_view leader_id() const { return leader_id_; } + + /** + * @brief Get the read version. + * @return Read version number + */ + uint64_t read_version() const { return read_version_; } + + /** + * @brief Get the preconditions. + * @return Vector of preconditions + */ + const std::vector &preconditions() const { + return preconditions_; + } + + /** + * @brief Get the operations. + * @return Vector of operations + */ + const std::vector &operations() const { return operations_; } + + /** + * @brief Get the total allocated bytes in the arena. + * @return Total allocated bytes + */ + size_t total_allocated() const { return arena_.total_allocated(); } + + /** + * @brief Get the used bytes in the arena. + * @return Used bytes + */ + size_t used_bytes() const { return arena_.used_bytes(); } + + /** + * @brief Reset the commit request for reuse. + */ + void reset() { + arena_.reset(); + request_id_.reset(); + leader_id_ = {}; + read_version_ = 0; + preconditions_.clear(); + operations_.clear(); + + // Reset parser state + if (json_parser_) { + WeaselJsonParser_destroy(json_parser_); + json_parser_ = nullptr; + } + parser_context_ = ParserContext{}; + parser_context_.state_stack.push(ParseState::Root); + } + + // Weaseljson callbacks (public for global callbacks) + static void on_begin_object(void *userdata); + static void on_end_object(void *userdata); + static void on_string_data(void *userdata, const char *buf, int len, + int done); + static void on_key_data(void *userdata, const char *buf, int len, int done); + static void on_begin_array(void *userdata); + static void on_end_array(void *userdata); + static void on_number_data(void *userdata, const char *buf, int len, + int done); + static void on_true_literal(void *userdata); + static void on_false_literal(void *userdata); + static void on_null_literal(void *userdata); + +private: + /** + * @brief Copy a string into the arena and return a string_view. + * @param str The string to copy + * @return String view pointing to arena-allocated memory + */ + std::string_view store_string(std::string_view str); + + /** + * @brief Decode a base64 string and store it in the arena. + * @param base64_str The base64 encoded string + * @return String view of decoded data, or empty view if decoding failed + */ + std::string_view decode_base64(std::string_view base64_str); + + void handle_completed_string(); + void handle_completed_number(); + void handle_completed_key(); +}; diff --git a/src/main.cpp b/src/main.cpp index 6057e86..bc99b87 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -1,3 +1,4 @@ +#include "commit_request.hpp" #include "config.hpp" #include @@ -34,5 +35,109 @@ int main(int argc, char *argv[]) { << config->subscription.keepalive_interval.count() << " seconds" << std::endl; + // Demonstrate CommitRequest functionality + std::cout << "\n--- CommitRequest Demo ---" << std::endl; + + CommitRequest request; + + std::string sample_json = R"({ + "request_id": "demo-12345", + "leader_id": "leader-abc", + "read_version": 42, + "preconditions": [ + { + "type": "point_read", + "version": 41, + "key": "dGVzdEtleQ==" + } + ], + "operations": [ + { + "type": "write", + "key": "dGVzdEtleQ==", + "value": "dGVzdFZhbHVl" + } + ] + })"; + + if (request.parse_json(sample_json)) { + std::cout << "✓ Successfully parsed commit request:" << std::endl; + std::cout << " Request ID: " + << (request.request_id() ? request.request_id().value() : "none") + << std::endl; + std::cout << " Leader ID: " << request.leader_id() << std::endl; + std::cout << " Read Version: " << request.read_version() << std::endl; + std::cout << " Preconditions: " << request.preconditions().size() + << std::endl; + std::cout << " Operations: " << request.operations().size() << std::endl; + std::cout << " Arena memory used: " << request.used_bytes() << " bytes" + << std::endl; + + if (!request.operations().empty()) { + const auto &op = request.operations()[0]; + std::cout << " First operation: " + << (op.type == Operation::Type::Write ? "write" : "other") + << " key=" << op.key + << " value=" << (op.value ? op.value.value() : "none") + << std::endl; + } + } else { + std::cout << "✗ Failed to parse commit request" << std::endl; + } + + // Demonstrate streaming parsing + std::cout << "\n--- Streaming Parse Demo ---" << std::endl; + + CommitRequest streaming_request; + + if (streaming_request.begin_streaming_parse()) { + std::cout << "✓ Initialized streaming parser" << std::endl; + + // Simulate receiving data in small chunks like from a network socket + std::string streaming_json = + R"({"request_id": "stream-456", "leader_id": "stream-leader", "read_version": 999})"; + + size_t chunk_size = 15; // Small chunks to simulate network packets + size_t offset = 0; + int chunk_count = 0; + + CommitRequest::ParseStatus status = CommitRequest::ParseStatus::Incomplete; + + while (offset < streaming_json.size() && + status == CommitRequest::ParseStatus::Incomplete) { + size_t len = std::min(chunk_size, streaming_json.size() - offset); + std::string chunk = streaming_json.substr(offset, len); + + std::cout << " Chunk " << ++chunk_count << " (" << len << " bytes): \"" + << chunk << "\"" << std::endl; + + // Need mutable data for weaseljson + std::string mutable_chunk = chunk; + status = streaming_request.parse_chunk(mutable_chunk.data(), + mutable_chunk.size()); + + offset += len; + } + + if (status == CommitRequest::ParseStatus::Incomplete) { + std::cout << " Finalizing parse..." << std::endl; + status = streaming_request.finish_streaming_parse(); + } + + if (status == CommitRequest::ParseStatus::Complete) { + std::cout << "✓ Streaming parse successful:" << std::endl; + std::cout << " Request ID: " << streaming_request.request_id().value() + << std::endl; + std::cout << " Leader ID: " << streaming_request.leader_id() + << std::endl; + std::cout << " Read Version: " << streaming_request.read_version() + << std::endl; + } else { + std::cout << "✗ Streaming parse failed" << std::endl; + } + } else { + std::cout << "✗ Failed to initialize streaming parser" << std::endl; + } + return 0; } diff --git a/tests/test_commit_request.cpp b/tests/test_commit_request.cpp new file mode 100644 index 0000000..d3987b2 --- /dev/null +++ b/tests/test_commit_request.cpp @@ -0,0 +1,257 @@ +#define DOCTEST_CONFIG_IMPLEMENT_WITH_MAIN +#include "commit_request.hpp" +#include + +TEST_CASE("CommitRequest basic parsing") { + CommitRequest request; + + SUBCASE("Simple commit request") { + std::string json = R"({ + "request_id": "test123", + "leader_id": "leader456", + "read_version": 12345 + })"; + + REQUIRE(request.parse_json(json)); + REQUIRE(request.request_id().has_value()); + REQUIRE(request.request_id().value() == "test123"); + REQUIRE(request.leader_id() == "leader456"); + REQUIRE(request.read_version() == 12345); + } + + SUBCASE("With preconditions") { + std::string json = R"({ + "leader_id": "leader456", + "read_version": 12345, + "preconditions": [ + { + "type": "point_read", + "version": 12340, + "key": "dGVzdA==" + } + ] + })"; + + REQUIRE(request.parse_json(json)); + REQUIRE(request.preconditions().size() == 1); + REQUIRE(request.preconditions()[0].type == Precondition::Type::PointRead); + REQUIRE(request.preconditions()[0].version.has_value()); + REQUIRE(request.preconditions()[0].version.value() == 12340); + REQUIRE(request.preconditions()[0].key == "test"); // "dGVzdA==" decoded + } + + SUBCASE("With operations") { + std::string json = R"({ + "leader_id": "leader456", + "read_version": 12345, + "operations": [ + { + "type": "write", + "key": "dGVzdA==", + "value": "dmFsdWU=" + }, + { + "type": "delete", + "key": "dGVzdDI=" + } + ] + })"; + + REQUIRE(request.parse_json(json)); + REQUIRE(request.operations().size() == 2); + + REQUIRE(request.operations()[0].type == Operation::Type::Write); + REQUIRE(request.operations()[0].key == "test"); + REQUIRE(request.operations()[0].value.has_value()); + REQUIRE(request.operations()[0].value.value() == "value"); + + REQUIRE(request.operations()[1].type == Operation::Type::Delete); + REQUIRE(request.operations()[1].key == "test2"); + } + + SUBCASE("Invalid JSON") { + std::string json = R"({ + "leader_id": "leader456", + "read_version": "not_a_number" + })"; + + REQUIRE_FALSE(request.parse_json(json)); + } +} + +TEST_CASE("CommitRequest memory management") { + CommitRequest request; + + std::string json = R"({ + "request_id": "test123", + "leader_id": "leader456", + "read_version": 12345, + "operations": [ + { + "type": "write", + "key": "dGVzdA==", + "value": "dmFsdWU=" + } + ] + })"; + + REQUIRE(request.parse_json(json)); + + // Check that arena allocation worked + REQUIRE(request.total_allocated() > 0); + REQUIRE(request.used_bytes() > 0); + + // Test reset + request.reset(); + REQUIRE(request.request_id().has_value() == false); + REQUIRE(request.leader_id().empty()); + REQUIRE(request.read_version() == 0); + REQUIRE(request.operations().empty()); +} + +TEST_CASE("CommitRequest streaming parsing") { + CommitRequest request; + + SUBCASE("Simple streaming parse") { + std::string json = R"({ + "request_id": "test123", + "leader_id": "leader456", + "read_version": 12345 + })"; + + REQUIRE(request.begin_streaming_parse()); + + // Parse in small chunks to simulate network reception + std::string mutable_json = json; + size_t chunk_size = 10; + size_t offset = 0; + + CommitRequest::ParseStatus status = CommitRequest::ParseStatus::Incomplete; + + while (offset < mutable_json.size() && + status == CommitRequest::ParseStatus::Incomplete) { + size_t len = std::min(chunk_size, mutable_json.size() - offset); + status = request.parse_chunk(mutable_json.data() + offset, len); + offset += len; + } + + if (status == CommitRequest::ParseStatus::Incomplete) { + status = request.finish_streaming_parse(); + } + + REQUIRE(status == CommitRequest::ParseStatus::Complete); + REQUIRE(request.is_parse_complete()); + REQUIRE_FALSE(request.has_parse_error()); + + REQUIRE(request.request_id().has_value()); + REQUIRE(request.request_id().value() == "test123"); + REQUIRE(request.leader_id() == "leader456"); + REQUIRE(request.read_version() == 12345); + } + + SUBCASE("Streaming parse with complex data") { + std::string json = R"({ + "request_id": "streaming-test", + "leader_id": "leader789", + "read_version": 98765, + "preconditions": [ + { + "type": "point_read", + "version": 98764, + "key": "dGVzdEtleQ==" + } + ], + "operations": [ + { + "type": "write", + "key": "dGVzdEtleQ==", + "value": "dGVzdFZhbHVl" + }, + { + "type": "delete", + "key": "ZGVsZXRlS2V5" + } + ] + })"; + + REQUIRE(request.begin_streaming_parse()); + + // Parse one character at a time to really stress test streaming + std::string mutable_json = json; + CommitRequest::ParseStatus status = CommitRequest::ParseStatus::Incomplete; + + for (size_t i = 0; i < mutable_json.size() && + status == CommitRequest::ParseStatus::Incomplete; + ++i) { + status = request.parse_chunk(mutable_json.data() + i, 1); + } + + if (status == CommitRequest::ParseStatus::Incomplete) { + status = request.finish_streaming_parse(); + } + + REQUIRE(status == CommitRequest::ParseStatus::Complete); + REQUIRE(request.is_parse_complete()); + + REQUIRE(request.request_id().value() == "streaming-test"); + REQUIRE(request.leader_id() == "leader789"); + REQUIRE(request.read_version() == 98765); + REQUIRE(request.preconditions().size() == 1); + REQUIRE(request.operations().size() == 2); + + // Verify precondition was parsed correctly + REQUIRE(request.preconditions()[0].type == Precondition::Type::PointRead); + REQUIRE(request.preconditions()[0].version.value() == 98764); + REQUIRE(request.preconditions()[0].key == "testKey"); + + // Verify operations were parsed correctly + REQUIRE(request.operations()[0].type == Operation::Type::Write); + REQUIRE(request.operations()[0].key == "testKey"); + REQUIRE(request.operations()[0].value.value() == "testValue"); + + REQUIRE(request.operations()[1].type == Operation::Type::Delete); + REQUIRE(request.operations()[1].key == "deleteKey"); + } + + SUBCASE("Streaming parse error handling") { + std::string invalid_json = R"({ + "leader_id": "leader456", + "read_version": "invalid_number" + })"; + + REQUIRE(request.begin_streaming_parse()); + + std::string mutable_json = invalid_json; + CommitRequest::ParseStatus status = + request.parse_chunk(mutable_json.data(), mutable_json.size()); + + if (status == CommitRequest::ParseStatus::Incomplete) { + status = request.finish_streaming_parse(); + } + + REQUIRE(status == CommitRequest::ParseStatus::Error); + REQUIRE(request.has_parse_error()); + REQUIRE_FALSE(request.is_parse_complete()); + } + + SUBCASE("Complete document in single chunk") { + std::string json = R"({"leader_id": "test", "read_version": 123})"; + + REQUIRE(request.begin_streaming_parse()); + + std::string mutable_json = json; + CommitRequest::ParseStatus status = + request.parse_chunk(mutable_json.data(), mutable_json.size()); + + // Should still be incomplete (streaming parser doesn't know if more data is + // coming) + REQUIRE(status == CommitRequest::ParseStatus::Incomplete); + + // Signal end of input to complete parsing + status = request.finish_streaming_parse(); + REQUIRE(status == CommitRequest::ParseStatus::Complete); + REQUIRE(request.is_parse_complete()); + REQUIRE(request.leader_id() == "test"); + REQUIRE(request.read_version() == 123); + } +} \ No newline at end of file