Decouple parser from CommitRequest

This commit is contained in:
2025-08-17 13:36:53 -04:00
parent db2285dfda
commit fa2a2e4427
10 changed files with 636 additions and 460 deletions

View File

@@ -0,0 +1,547 @@
#include "json_commit_request_parser.hpp"
#include "json_token_enum.hpp"
#include <charconv>
#include <cstring>
#include <simdutf.h>
#include <string_view>
// Global callbacks for JSON parsing
const WeaselJsonCallbacks JsonCommitRequestParser::json_callbacks = {
.on_begin_object = JsonCommitRequestParser::on_begin_object,
.on_end_object = JsonCommitRequestParser::on_end_object,
.on_string_data = JsonCommitRequestParser::on_string_data,
.on_key_data = JsonCommitRequestParser::on_key_data,
.on_begin_array = JsonCommitRequestParser::on_begin_array,
.on_end_array = JsonCommitRequestParser::on_end_array,
.on_number_data = JsonCommitRequestParser::on_number_data,
.on_true_literal = JsonCommitRequestParser::on_true_literal,
.on_false_literal = JsonCommitRequestParser::on_false_literal,
.on_null_literal = JsonCommitRequestParser::on_null_literal,
};
JsonCommitRequestParser::JsonCommitRequestParser()
: json_parser_(WeaselJsonParser_create(64, &json_callbacks, this, 0)),
parser_context_(nullptr), current_request_(nullptr) {}
JsonCommitRequestParser::~JsonCommitRequestParser() {
if (json_parser_) {
WeaselJsonParser_destroy(json_parser_);
}
}
JsonCommitRequestParser::JsonCommitRequestParser(
JsonCommitRequestParser &&other) noexcept
: json_parser_(other.json_parser_),
parser_context_(std::move(other.parser_context_)),
current_request_(other.current_request_) {
other.json_parser_ = nullptr;
other.current_request_ = nullptr;
}
JsonCommitRequestParser &
JsonCommitRequestParser::operator=(JsonCommitRequestParser &&other) noexcept {
if (this != &other) {
if (json_parser_) {
WeaselJsonParser_destroy(json_parser_);
}
json_parser_ = other.json_parser_;
parser_context_ = std::move(other.parser_context_);
current_request_ = other.current_request_;
other.json_parser_ = nullptr;
other.current_request_ = nullptr;
}
return *this;
}
void JsonCommitRequestParser::on_complete() {
if (current_request_) {
current_request_->finalize();
}
}
std::string_view
JsonCommitRequestParser::decode_base64(std::string_view base64_str) {
if (base64_str.empty() || !current_request_) {
return {};
}
// Calculate maximum possible output size
size_t max_output_len = simdutf::maximal_binary_length_from_base64(
base64_str.data(), base64_str.size());
if (max_output_len == 0) {
return {};
}
char *output = current_request_->arena().allocate<char>(max_output_len);
if (!output) {
return {};
}
// Use simdutf to decode base64
simdutf::result result = simdutf::base64_to_binary(
base64_str.data(), base64_str.size(), output, simdutf::base64_default);
if (result.error != simdutf::error_code::SUCCESS) {
parser_context_->parse_error = "Decoding base64 failed";
return {};
}
return std::string_view(output, result.count);
}
void JsonCommitRequestParser::on_begin_object(void *userdata) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error)
return;
ParseState current_state = ctx.current_state;
switch (current_state) {
case ParseState::Root:
// Expected - this is the main object
break;
case ParseState::PreconditionsArray:
ctx.current_state = ParseState::PreconditionObject;
ctx.current_precondition = PreconditionParseState{};
break;
case ParseState::OperationsArray:
ctx.current_state = ParseState::OperationObject;
ctx.current_operation = OperationParseState{};
break;
default:
ctx.parse_error = "Unexpected object in invalid parse state";
break;
}
}
void JsonCommitRequestParser::on_end_object(void *userdata) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error) {
return;
}
ParseState current_state = ctx.current_state;
// Handle state transitions on object end
if (current_state == ParseState::PreconditionObject) {
ctx.current_state = ParseState::PreconditionsArray;
} else if (current_state == ParseState::OperationObject) {
ctx.current_state = ParseState::OperationsArray;
}
switch (current_state) {
case ParseState::Root:
// We're done parsing the root object
ctx.parse_complete = true;
self->on_complete();
break;
case ParseState::PreconditionObject:
switch (ctx.current_precondition.type) {
case Precondition::Type::PointRead:
if (!ctx.current_precondition.key.has_value()) {
ctx.parse_error =
"point_read precondition missing required 'key' field";
} else {
self->current_request_->add_precondition(
ctx.current_precondition.type,
ctx.current_precondition.version.value_or(0),
ctx.current_precondition.key.value(), {});
}
break;
case Precondition::Type::RangeRead:
if (!ctx.current_precondition.begin.has_value() ||
!ctx.current_precondition.end.has_value()) {
ctx.parse_error = "range_read precondition missing required 'begin' "
"and/or 'end' fields";
} else {
self->current_request_->add_precondition(
ctx.current_precondition.type,
ctx.current_precondition.version.value_or(0),
ctx.current_precondition.begin.value(),
ctx.current_precondition.end.value());
}
break;
}
break;
case ParseState::OperationObject:
switch (ctx.current_operation.type) {
case Operation::Type::Write:
if (!ctx.current_operation.key.has_value() ||
!ctx.current_operation.value.has_value()) {
ctx.parse_error =
"write operation missing required 'key' and/or 'value' fields";
} else {
self->current_request_->add_operation(
ctx.current_operation.type, ctx.current_operation.key.value(),
ctx.current_operation.value.value());
}
break;
case Operation::Type::Delete:
if (!ctx.current_operation.key.has_value()) {
ctx.parse_error = "delete operation missing required 'key' field";
} else {
self->current_request_->add_operation(
ctx.current_operation.type, ctx.current_operation.key.value(), {});
}
break;
case Operation::Type::RangeDelete:
if (!ctx.current_operation.begin.has_value() ||
!ctx.current_operation.end.has_value()) {
ctx.parse_error = "range_delete operation missing required 'begin' "
"and/or 'end' fields";
} else {
self->current_request_->add_operation(
ctx.current_operation.type, ctx.current_operation.begin.value(),
ctx.current_operation.end.value());
}
break;
}
break;
default:
break;
}
}
void JsonCommitRequestParser::on_string_data(void *userdata, const char *buf,
int len, int done) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error)
return;
if (done && ctx.current_string.empty()) {
self->handle_completed_string(std::string_view(buf, len));
} else {
ctx.current_string.append(buf, len);
if (done) {
self->handle_completed_string(ctx.current_string);
ctx.current_string.clear();
}
}
}
void JsonCommitRequestParser::on_key_data(void *userdata, const char *buf,
int len, int done) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error)
return;
if (done && ctx.current_key.empty()) {
ctx.current_key_token = get_json_token_type(std::string_view(buf, len));
} else {
ctx.current_key.append(buf, len);
if (done) {
ctx.current_key_token = get_json_token_type(ctx.current_key);
ctx.current_key.clear();
}
}
}
void JsonCommitRequestParser::on_begin_array(void *userdata) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error)
return;
switch (ctx.current_key_token) {
case JsonTokenType::Preconditions:
ctx.current_state = ParseState::PreconditionsArray;
break;
case JsonTokenType::Operations:
ctx.current_state = ParseState::OperationsArray;
break;
default:
ctx.parse_error = "Invalid array field - only 'preconditions' and "
"'operations' arrays are allowed";
break;
}
}
void JsonCommitRequestParser::on_end_array(void *userdata) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error) {
return;
}
// Transition back to Root state when arrays end
if (ctx.current_state == ParseState::PreconditionsArray ||
ctx.current_state == ParseState::OperationsArray) {
ctx.current_state = ParseState::Root;
}
}
void JsonCommitRequestParser::on_number_data(void *userdata, const char *buf,
int len, int done) {
auto *self = static_cast<JsonCommitRequestParser *>(userdata);
auto &ctx = *self->parser_context_;
if (ctx.parse_error)
return;
if (done && ctx.current_number.empty()) {
self->handle_completed_number(std::string_view(buf, len));
} else {
ctx.current_number.append(buf, len);
if (done) {
self->handle_completed_number(ctx.current_number);
ctx.current_number.clear();
}
}
}
void JsonCommitRequestParser::on_true_literal(void *) {
// Not used in this API
}
void JsonCommitRequestParser::on_false_literal(void *) {
// Not used in this API
}
void JsonCommitRequestParser::on_null_literal(void *) {
// Not used in this API
}
void JsonCommitRequestParser::handle_completed_string(std::string_view s) {
auto &ctx = *parser_context_;
ParseState current_state = ctx.current_state;
switch (current_state) {
case ParseState::Root: {
switch (ctx.current_key_token) {
case JsonTokenType::RequestId:
current_request_->set_request_id(current_request_->copy_to_arena(s));
break;
case JsonTokenType::LeaderId:
current_request_->set_leader_id(current_request_->copy_to_arena(s));
break;
case JsonTokenType::ReadVersion:
// read_version should be a number, not a string
ctx.parse_error = "read_version field must be a number, not a string";
break;
default:
break;
}
break;
}
case ParseState::PreconditionObject: {
switch (ctx.current_key_token) {
case JsonTokenType::Type: {
JsonTokenType type_token = get_json_token_type(s);
switch (type_token) {
case JsonTokenType::PointRead:
ctx.current_precondition.type = Precondition::Type::PointRead;
break;
case JsonTokenType::RangeRead:
ctx.current_precondition.type = Precondition::Type::RangeRead;
break;
default:
ctx.parse_error =
"Invalid precondition type - must be 'point_read' or 'range_read'";
break;
}
break;
}
case JsonTokenType::Key:
ctx.current_precondition.key = decode_base64(s);
break;
case JsonTokenType::Begin:
ctx.current_precondition.begin = decode_base64(s);
break;
case JsonTokenType::End:
ctx.current_precondition.end = decode_base64(s);
break;
default:
break;
}
break;
}
case ParseState::OperationObject: {
switch (ctx.current_key_token) {
case JsonTokenType::Type: {
JsonTokenType type_token = get_json_token_type(s);
switch (type_token) {
case JsonTokenType::Write:
ctx.current_operation.type = Operation::Type::Write;
break;
case JsonTokenType::Delete:
ctx.current_operation.type = Operation::Type::Delete;
break;
case JsonTokenType::RangeDelete:
ctx.current_operation.type = Operation::Type::RangeDelete;
break;
default:
ctx.parse_error = "Invalid operation type - must be 'write', 'delete', "
"or 'range_delete'";
break;
}
break;
}
case JsonTokenType::Key:
ctx.current_operation.key = decode_base64(s);
break;
case JsonTokenType::Value:
ctx.current_operation.value = decode_base64(s);
break;
case JsonTokenType::Begin:
ctx.current_operation.begin = decode_base64(s);
break;
case JsonTokenType::End:
ctx.current_operation.end = decode_base64(s);
break;
default:
break;
}
break;
}
default:
break;
}
}
void JsonCommitRequestParser::handle_completed_number(std::string_view s) {
auto &ctx = *parser_context_;
ParseState current_state = ctx.current_state;
switch (current_state) {
case ParseState::Root: {
if (ctx.current_key_token == JsonTokenType::ReadVersion) {
uint64_t version;
auto result = std::from_chars(s.data(), s.data() + s.size(), version);
if (result.ec == std::errc{}) {
current_request_->set_read_version(version);
} else {
ctx.parse_error = "Invalid number format for read_version field";
}
}
break;
}
case ParseState::PreconditionObject: {
if (ctx.current_key_token == JsonTokenType::Version) {
uint64_t version;
auto result = std::from_chars(s.data(), s.data() + s.size(), version);
if (result.ec == std::errc{}) {
ctx.current_precondition.version = version;
} else {
ctx.parse_error =
"Invalid number format for precondition version field";
}
}
break;
}
default:
break;
}
}
bool JsonCommitRequestParser::parse(CommitRequest &request, char *data,
size_t len) {
if (!begin_streaming_parse(request)) {
return false;
}
parse_chunk(request, data, len);
finish_streaming_parse(request);
return !has_parse_error() && !request.leader_id().empty() &&
request.has_read_version_been_set();
}
bool JsonCommitRequestParser::begin_streaming_parse(CommitRequest &request) {
request.reset();
current_request_ = &request;
if (!parser_context_) {
parser_context_ = std::make_unique<ParserContext>(&request.arena());
} else {
parser_context_->reset_arena_memory(&request.arena());
parser_context_->parse_error = nullptr;
parser_context_->parse_complete = false;
}
if (json_parser_) {
WeaselJsonParser_reset(json_parser_);
}
return json_parser_ != nullptr;
}
JsonCommitRequestParser::ParseStatus
JsonCommitRequestParser::parse_chunk(CommitRequest &request, char *data,
size_t len) {
if (!json_parser_ || !parser_context_) {
return ParseStatus::Error;
}
if (parser_context_->parse_error) {
return ParseStatus::Error;
}
if (parser_context_->parse_complete) {
return ParseStatus::Complete;
}
current_request_ = &request;
WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, data, len);
switch (status) {
case WeaselJson_OK:
// WeaselJson_OK means parsing is complete
return ParseStatus::Complete;
case WeaselJson_AGAIN:
return ParseStatus::Incomplete;
case WeaselJson_REJECT:
case WeaselJson_OVERFLOW:
default:
parser_context_->parse_error =
"JSON parsing failed - invalid or oversized JSON";
return ParseStatus::Error;
}
}
JsonCommitRequestParser::ParseStatus
JsonCommitRequestParser::finish_streaming_parse(CommitRequest &request) {
if (!json_parser_ || !parser_context_) {
return ParseStatus::Error;
}
if (parser_context_->parse_error) {
return ParseStatus::Error;
}
current_request_ = &request;
// Signal end of input
WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, nullptr, 0);
if (status == WeaselJson_OK && parser_context_->parse_complete &&
!parser_context_->parse_error) {
// Clear the memory used only during parsing
parser_context_->reset_arena_memory(&request.arena());
return ParseStatus::Complete;
} else {
parser_context_->parse_error =
"JSON parsing incomplete or failed during finalization";
return ParseStatus::Error;
}
}
bool JsonCommitRequestParser::has_parse_error() const {
return parser_context_ && parser_context_->parse_error != nullptr;
}
const char *JsonCommitRequestParser::get_parse_error() const {
return parser_context_ ? parser_context_->parse_error : nullptr;
}