#include "json_commit_request_parser.hpp" #include "json_token_enum.hpp" #include #include #include #include #include // Global callbacks for JSON parsing const WeaselJsonCallbacks JsonCommitRequestParser::json_callbacks = { .on_begin_object = JsonCommitRequestParser::on_begin_object, .on_end_object = JsonCommitRequestParser::on_end_object, .on_string_data = JsonCommitRequestParser::on_string_data, .on_key_data = JsonCommitRequestParser::on_key_data, .on_begin_array = JsonCommitRequestParser::on_begin_array, .on_end_array = JsonCommitRequestParser::on_end_array, .on_number_data = JsonCommitRequestParser::on_number_data, .on_true_literal = JsonCommitRequestParser::on_true_literal, .on_false_literal = JsonCommitRequestParser::on_false_literal, .on_null_literal = JsonCommitRequestParser::on_null_literal, }; JsonCommitRequestParser::JsonCommitRequestParser() : json_parser_(WeaselJsonParser_create(64, &json_callbacks, this, 0)), parser_context_(nullptr), current_request_(nullptr) {} JsonCommitRequestParser::~JsonCommitRequestParser() { if (json_parser_) { WeaselJsonParser_destroy(json_parser_); } } JsonCommitRequestParser::JsonCommitRequestParser( JsonCommitRequestParser &&other) noexcept : json_parser_(other.json_parser_), parser_context_(std::move(other.parser_context_)), current_request_(other.current_request_) { other.json_parser_ = nullptr; other.current_request_ = nullptr; } JsonCommitRequestParser & JsonCommitRequestParser::operator=(JsonCommitRequestParser &&other) noexcept { if (this != &other) { if (json_parser_) { WeaselJsonParser_destroy(json_parser_); } json_parser_ = other.json_parser_; parser_context_ = std::move(other.parser_context_); current_request_ = other.current_request_; other.json_parser_ = nullptr; other.current_request_ = nullptr; } return *this; } void JsonCommitRequestParser::on_complete() { if (current_request_) { current_request_->finalize(); } } std::string_view JsonCommitRequestParser::decode_base64(std::string_view base64_str) { if (base64_str.empty() || !current_request_) { return {}; } // Calculate maximum possible output size size_t max_output_len = simdutf::maximal_binary_length_from_base64( base64_str.data(), base64_str.size()); if (max_output_len == 0) { return {}; } char *output = current_request_->arena().allocate(max_output_len); if (!output) { return {}; } // Use simdutf to decode base64 simdutf::result result = simdutf::base64_to_binary( base64_str.data(), base64_str.size(), output, simdutf::base64_default); if (result.error != simdutf::error_code::SUCCESS) { parser_context_->parse_error = "Decoding base64 failed"; return {}; } return std::string_view(output, result.count); } void JsonCommitRequestParser::on_begin_object(void *userdata) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) return; ParseState current_state = ctx.current_state; switch (current_state) { case ParseState::Root: // Expected - this is the main object break; case ParseState::PreconditionsArray: ctx.current_state = ParseState::PreconditionObject; ctx.current_precondition = PreconditionParseState{}; break; case ParseState::OperationsArray: ctx.current_state = ParseState::OperationObject; ctx.current_operation = OperationParseState{}; break; default: ctx.parse_error = "Unexpected object in invalid parse state"; break; } } void JsonCommitRequestParser::on_end_object(void *userdata) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) { return; } ParseState current_state = ctx.current_state; // Handle state transitions on object end if (current_state == ParseState::PreconditionObject) { ctx.current_state = ParseState::PreconditionsArray; } else if (current_state == ParseState::OperationObject) { ctx.current_state = ParseState::OperationsArray; } switch (current_state) { case ParseState::Root: // We're done parsing the root object ctx.parse_complete = true; self->on_complete(); break; case ParseState::PreconditionObject: switch (ctx.current_precondition.type) { case Precondition::Type::PointRead: if (!ctx.current_precondition.key.has_value()) { ctx.parse_error = "point_read precondition missing required 'key' field"; } else { self->current_request_->add_precondition( ctx.current_precondition.type, ctx.current_precondition.version.value_or(0), ctx.current_precondition.key.value(), {}); } break; case Precondition::Type::RangeRead: if (!ctx.current_precondition.begin.has_value() || !ctx.current_precondition.end.has_value()) { ctx.parse_error = "range_read precondition missing required 'begin' " "and/or 'end' fields"; } else { self->current_request_->add_precondition( ctx.current_precondition.type, ctx.current_precondition.version.value_or(0), ctx.current_precondition.begin.value(), ctx.current_precondition.end.value()); } break; } break; case ParseState::OperationObject: switch (ctx.current_operation.type) { case Operation::Type::Write: if (!ctx.current_operation.key.has_value() || !ctx.current_operation.value.has_value()) { ctx.parse_error = "write operation missing required 'key' and/or 'value' fields"; } else { self->current_request_->add_operation( ctx.current_operation.type, ctx.current_operation.key.value(), ctx.current_operation.value.value()); } break; case Operation::Type::Delete: if (!ctx.current_operation.key.has_value()) { ctx.parse_error = "delete operation missing required 'key' field"; } else { self->current_request_->add_operation( ctx.current_operation.type, ctx.current_operation.key.value(), {}); } break; case Operation::Type::RangeDelete: if (!ctx.current_operation.begin.has_value() || !ctx.current_operation.end.has_value()) { ctx.parse_error = "range_delete operation missing required 'begin' " "and/or 'end' fields"; } else { self->current_request_->add_operation( ctx.current_operation.type, ctx.current_operation.begin.value(), ctx.current_operation.end.value()); } break; } break; default: break; } } void JsonCommitRequestParser::on_string_data(void *userdata, const char *buf, int len, int done) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) return; if (done && ctx.current_string.empty()) { self->handle_completed_string(std::string_view(buf, len)); } else { ctx.current_string.append(buf, len); if (done) { self->handle_completed_string(ctx.current_string); ctx.current_string.clear(); } } } void JsonCommitRequestParser::on_key_data(void *userdata, const char *buf, int len, int done) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) return; if (done && ctx.current_key.empty()) { ctx.current_key_token = get_json_token_type(std::string_view(buf, len)); } else { ctx.current_key.append(buf, len); if (done) { ctx.current_key_token = get_json_token_type(ctx.current_key); ctx.current_key.clear(); } } } void JsonCommitRequestParser::on_begin_array(void *userdata) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) return; switch (ctx.current_key_token) { case JsonTokenType::Preconditions: ctx.current_state = ParseState::PreconditionsArray; break; case JsonTokenType::Operations: ctx.current_state = ParseState::OperationsArray; break; default: ctx.parse_error = "Invalid array field - only 'preconditions' and " "'operations' arrays are allowed"; break; } } void JsonCommitRequestParser::on_end_array(void *userdata) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) { return; } // Transition back to Root state when arrays end if (ctx.current_state == ParseState::PreconditionsArray || ctx.current_state == ParseState::OperationsArray) { ctx.current_state = ParseState::Root; } } void JsonCommitRequestParser::on_number_data(void *userdata, const char *buf, int len, int done) { auto *self = static_cast(userdata); auto &ctx = *self->parser_context_; if (ctx.parse_error) return; if (done && ctx.current_number.empty()) { self->handle_completed_number(std::string_view(buf, len)); } else { ctx.current_number.append(buf, len); if (done) { self->handle_completed_number(ctx.current_number); ctx.current_number.clear(); } } } void JsonCommitRequestParser::on_true_literal(void *) { // Not used in this API } void JsonCommitRequestParser::on_false_literal(void *) { // Not used in this API } void JsonCommitRequestParser::on_null_literal(void *) { // Not used in this API } void JsonCommitRequestParser::handle_completed_string(std::string_view s) { auto &ctx = *parser_context_; ParseState current_state = ctx.current_state; switch (current_state) { case ParseState::Root: { switch (ctx.current_key_token) { case JsonTokenType::RequestId: current_request_->set_request_id(current_request_->copy_to_arena(s)); break; case JsonTokenType::LeaderId: current_request_->set_leader_id(current_request_->copy_to_arena(s)); break; case JsonTokenType::ReadVersion: // read_version should be a number, not a string ctx.parse_error = "read_version field must be a number, not a string"; break; default: break; } break; } case ParseState::PreconditionObject: { switch (ctx.current_key_token) { case JsonTokenType::Type: { JsonTokenType type_token = get_json_token_type(s); switch (type_token) { case JsonTokenType::PointRead: ctx.current_precondition.type = Precondition::Type::PointRead; break; case JsonTokenType::RangeRead: ctx.current_precondition.type = Precondition::Type::RangeRead; break; default: ctx.parse_error = "Invalid precondition type - must be 'point_read' or 'range_read'"; break; } break; } case JsonTokenType::Key: ctx.current_precondition.key = decode_base64(s); break; case JsonTokenType::Begin: ctx.current_precondition.begin = decode_base64(s); break; case JsonTokenType::End: ctx.current_precondition.end = decode_base64(s); break; default: break; } break; } case ParseState::OperationObject: { switch (ctx.current_key_token) { case JsonTokenType::Type: { JsonTokenType type_token = get_json_token_type(s); switch (type_token) { case JsonTokenType::Write: ctx.current_operation.type = Operation::Type::Write; break; case JsonTokenType::Delete: ctx.current_operation.type = Operation::Type::Delete; break; case JsonTokenType::RangeDelete: ctx.current_operation.type = Operation::Type::RangeDelete; break; default: ctx.parse_error = "Invalid operation type - must be 'write', 'delete', " "or 'range_delete'"; break; } break; } case JsonTokenType::Key: ctx.current_operation.key = decode_base64(s); break; case JsonTokenType::Value: ctx.current_operation.value = decode_base64(s); break; case JsonTokenType::Begin: ctx.current_operation.begin = decode_base64(s); break; case JsonTokenType::End: ctx.current_operation.end = decode_base64(s); break; default: break; } break; } default: break; } } void JsonCommitRequestParser::handle_completed_number(std::string_view s) { auto &ctx = *parser_context_; ParseState current_state = ctx.current_state; switch (current_state) { case ParseState::Root: { if (ctx.current_key_token == JsonTokenType::ReadVersion) { int64_t version; auto result = std::from_chars(s.data(), s.data() + s.size(), version); if (result.ec == std::errc{}) { current_request_->set_read_version(version); ctx.has_read_version_been_set = true; } else { ctx.parse_error = "Invalid number format for read_version field"; } } break; } case ParseState::PreconditionObject: { if (ctx.current_key_token == JsonTokenType::Version) { int64_t version; auto result = std::from_chars(s.data(), s.data() + s.size(), version); if (result.ec == std::errc{}) { ctx.current_precondition.version = version; } else { ctx.parse_error = "Invalid number format for precondition version field"; } } break; } default: break; } } CommitRequestParser::ParseResult JsonCommitRequestParser::parse(CommitRequest &request, char *data, int64_t len) { if (!begin_streaming_parse(request)) { return ParseResult::OutOfMemory; } ParseStatus status = parse_chunk(data, len); if (status == ParseStatus::Error) { return has_parse_error() ? ParseResult::InvalidJson : ParseResult::InvalidField; } status = finish_streaming_parse(); if (status == ParseStatus::Error) { return has_parse_error() ? ParseResult::InvalidJson : ParseResult::InvalidField; } if (request.leader_id().empty()) { return ParseResult::MissingField; } if (!parser_context_->has_read_version_been_set) { return ParseResult::MissingField; } return ParseResult::Success; } bool JsonCommitRequestParser::begin_streaming_parse(CommitRequest &request) { request.reset(); current_request_ = &request; if (!parser_context_) { parser_context_ = std::make_unique(&request.arena()); } else { parser_context_->attach_arena(&request.arena()); parser_context_->parse_error = nullptr; parser_context_->parse_complete = false; parser_context_->has_read_version_been_set = false; } if (!json_parser_) { json_parser_ = WeaselJsonParser_create(64, &json_callbacks, this, 0); } else { WeaselJsonParser_reset(json_parser_); } return json_parser_ != nullptr; } JsonCommitRequestParser::ParseStatus JsonCommitRequestParser::parse_chunk(char *data, int64_t len) { assert(len != 0); if (!json_parser_ || !parser_context_) { return ParseStatus::Error; } if (parser_context_->parse_error) { return ParseStatus::Error; } if (parser_context_->parse_complete) { return ParseStatus::Complete; } WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, data, len); switch (status) { case WeaselJson_OK: // WeaselJson_OK means parsing is complete return ParseStatus::Complete; case WeaselJson_AGAIN: return ParseStatus::Incomplete; case WeaselJson_REJECT: parser_context_->parse_error = "JSON parsing failed - Invalid JSON"; return ParseStatus::Error; case WeaselJson_OVERFLOW: parser_context_->parse_error = "JSON parsing failed - JSON nested too deep"; return ParseStatus::Error; } parser_context_->parse_error = "JSON parsing failed - unknown weasel json status"; return ParseStatus::Error; } JsonCommitRequestParser::ParseStatus JsonCommitRequestParser::finish_streaming_parse() { if (!json_parser_ || !parser_context_) { return ParseStatus::Error; } if (parser_context_->parse_error) { return ParseStatus::Error; } // Signal end of input WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, nullptr, 0); if (status == WeaselJson_OK && parser_context_->parse_complete && !parser_context_->parse_error) { return ParseStatus::Complete; } else { parser_context_->parse_error = "JSON parsing incomplete or failed during finalization"; return ParseStatus::Error; } } const char *JsonCommitRequestParser::get_parse_error() const { return parser_context_ ? parser_context_->parse_error : nullptr; } bool JsonCommitRequestParser::has_read_version_been_set() const { return parser_context_ && parser_context_->has_read_version_been_set; }