Skip accumulating if all data is available

This commit is contained in:
2025-08-15 19:19:49 -04:00
parent a119f5232b
commit 9d18c0fcda
2 changed files with 38 additions and 46 deletions

View File

@@ -2,6 +2,7 @@
#include <charconv>
#include <cstring>
#include <simdutf.h>
#include <string_view>
// Global callbacks for JSON parsing
const WeaselJsonCallbacks CommitRequest::json_callbacks = {
@@ -195,13 +196,16 @@ void CommitRequest::on_string_data(void *userdata, const char *buf, int len,
if (ctx.parse_error)
return;
if (ctx.current_string.size() == 0 && done) {
self->handle_completed_string(std::string_view(buf, len));
} else {
ctx.current_string.append(buf, len);
if (done) {
self->handle_completed_string();
self->handle_completed_string(ctx.current_string);
ctx.current_string.clear();
}
}
}
void CommitRequest::on_key_data(void *userdata, const char *buf, int len,
int done) {
@@ -211,12 +215,7 @@ void CommitRequest::on_key_data(void *userdata, const char *buf, int len,
if (ctx.parse_error)
return;
if (!done) {
ctx.current_key.append(buf, len);
} else {
ctx.current_key.append(buf, len);
self->handle_completed_key();
}
}
void CommitRequest::on_begin_array(void *userdata) {
@@ -260,13 +259,16 @@ void CommitRequest::on_number_data(void *userdata, const char *buf, int len,
if (ctx.parse_error)
return;
if (ctx.current_number.size() == 0 && done) {
self->handle_completed_number(std::string_view(buf, len));
} else {
ctx.current_number.append(buf, len);
if (done) {
self->handle_completed_number();
self->handle_completed_number(ctx.current_number);
ctx.current_number.clear();
}
}
}
void CommitRequest::on_true_literal(void *) {
// Not used in this API
@@ -280,7 +282,7 @@ void CommitRequest::on_null_literal(void *) {
// Not used in this API
}
void CommitRequest::handle_completed_string() {
void CommitRequest::handle_completed_string(std::string_view s) {
auto &ctx = parser_context_;
if (ctx.state_stack.empty()) {
@@ -294,10 +296,10 @@ void CommitRequest::handle_completed_string() {
case ParseState::Root:
if (ctx.current_key == "request_id") {
ctx.current_key.clear();
request_id_ = store_string(ctx.current_string);
request_id_ = store_string(s);
} else if (ctx.current_key == "leader_id") {
ctx.current_key.clear();
leader_id_ = store_string(ctx.current_string);
leader_id_ = store_string(s);
} else if (ctx.current_key == "read_version") {
ctx.current_key.clear();
// read_version should be a number, not a string
@@ -307,9 +309,9 @@ void CommitRequest::handle_completed_string() {
case ParseState::PreconditionObject:
if (ctx.current_key == "type") {
ctx.current_key.clear();
if (ctx.current_string == "point_read") {
if (s == "point_read") {
ctx.current_precondition.type = Precondition::Type::PointRead;
} else if (ctx.current_string == "range_read") {
} else if (s == "range_read") {
ctx.current_precondition.type = Precondition::Type::RangeRead;
} else {
ctx.parse_error =
@@ -317,23 +319,23 @@ void CommitRequest::handle_completed_string() {
}
} else if (ctx.current_key == "key") {
ctx.current_key.clear();
ctx.current_precondition.key = decode_base64(ctx.current_string);
ctx.current_precondition.key = decode_base64(s);
} else if (ctx.current_key == "begin") {
ctx.current_key.clear();
ctx.current_precondition.begin = decode_base64(ctx.current_string);
ctx.current_precondition.begin = decode_base64(s);
} else if (ctx.current_key == "end") {
ctx.current_key.clear();
ctx.current_precondition.end = decode_base64(ctx.current_string);
ctx.current_precondition.end = decode_base64(s);
}
break;
case ParseState::OperationObject:
if (ctx.current_key == "type") {
ctx.current_key.clear();
if (ctx.current_string == "write") {
if (s == "write") {
ctx.current_operation.type = Operation::Type::Write;
} else if (ctx.current_string == "delete") {
} else if (s == "delete") {
ctx.current_operation.type = Operation::Type::Delete;
} else if (ctx.current_string == "range_delete") {
} else if (s == "range_delete") {
ctx.current_operation.type = Operation::Type::RangeDelete;
} else {
ctx.parse_error = "Invalid operation type - must be 'write', 'delete', "
@@ -341,16 +343,16 @@ void CommitRequest::handle_completed_string() {
}
} else if (ctx.current_key == "key") {
ctx.current_key.clear();
ctx.current_operation.key = decode_base64(ctx.current_string);
ctx.current_operation.key = decode_base64(s);
} else if (ctx.current_key == "value") {
ctx.current_key.clear();
ctx.current_operation.value = decode_base64(ctx.current_string);
ctx.current_operation.value = decode_base64(s);
} else if (ctx.current_key == "begin") {
ctx.current_key.clear();
ctx.current_operation.begin = decode_base64(ctx.current_string);
ctx.current_operation.begin = decode_base64(s);
} else if (ctx.current_key == "end") {
ctx.current_key.clear();
ctx.current_operation.end = decode_base64(ctx.current_string);
ctx.current_operation.end = decode_base64(s);
}
break;
default:
@@ -358,7 +360,7 @@ void CommitRequest::handle_completed_string() {
}
}
void CommitRequest::handle_completed_number() {
void CommitRequest::handle_completed_number(std::string_view s) {
auto &ctx = parser_context_;
if (ctx.state_stack.empty()) {
@@ -373,9 +375,7 @@ void CommitRequest::handle_completed_number() {
if (ctx.current_key == "read_version") {
ctx.current_key.clear();
uint64_t version;
auto result = std::from_chars(
ctx.current_number.data(),
ctx.current_number.data() + ctx.current_number.size(), version);
auto result = std::from_chars(s.data(), s.data() + s.size(), version);
if (result.ec == std::errc{}) {
read_version_ = version;
has_read_version_been_set_ = true;
@@ -388,9 +388,7 @@ void CommitRequest::handle_completed_number() {
if (ctx.current_key == "version") {
ctx.current_key.clear();
uint64_t version;
auto result = std::from_chars(
ctx.current_number.data(),
ctx.current_number.data() + ctx.current_number.size(), version);
auto result = std::from_chars(s.data(), s.data() + s.size(), version);
if (result.ec == std::errc{}) {
ctx.current_precondition.version = version;
} else {
@@ -404,11 +402,6 @@ void CommitRequest::handle_completed_number() {
}
}
void CommitRequest::handle_completed_key() {
// Key is now available in current_key for the next value
// No immediate action needed as we'll use it when processing values
}
bool CommitRequest::parse_json(char *data, size_t len) {
if (!begin_streaming_parse()) {
return false;

View File

@@ -349,7 +349,6 @@ private:
*/
std::string_view decode_base64(std::string_view base64_str);
void handle_completed_string();
void handle_completed_number();
void handle_completed_key();
void handle_completed_string(std::string_view s);
void handle_completed_number(std::string_view s);
};