Only pass CommitRequest to begin_streaming_parse

This commit is contained in:
2025-08-17 15:20:37 -04:00
parent 032a4184cc
commit 519c9457cd
6 changed files with 27 additions and 36 deletions

View File

@@ -93,13 +93,12 @@ int main() {
status == CommitRequestParser::ParseStatus::Incomplete) { status == CommitRequestParser::ParseStatus::Incomplete) {
size_t len = std::min(static_cast<size_t>(chunk_size), size_t len = std::min(static_cast<size_t>(chunk_size),
mutable_json.size() - offset); mutable_json.size() - offset);
status = status = parser.parse_chunk(mutable_json.data() + offset, len);
parser.parse_chunk(request, mutable_json.data() + offset, len);
offset += len; offset += len;
} }
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
ankerl::nanobench::doNotOptimizeAway(status); ankerl::nanobench::doNotOptimizeAway(status);

View File

@@ -453,8 +453,8 @@ bool JsonCommitRequestParser::parse(CommitRequest &request, char *data,
if (!begin_streaming_parse(request)) { if (!begin_streaming_parse(request)) {
return false; return false;
} }
parse_chunk(request, data, len); parse_chunk(data, len);
finish_streaming_parse(request); finish_streaming_parse();
return !has_parse_error() && !request.leader_id().empty() && return !has_parse_error() && !request.leader_id().empty() &&
parser_context_->has_read_version_been_set; parser_context_->has_read_version_been_set;
@@ -481,8 +481,7 @@ bool JsonCommitRequestParser::begin_streaming_parse(CommitRequest &request) {
} }
JsonCommitRequestParser::ParseStatus JsonCommitRequestParser::ParseStatus
JsonCommitRequestParser::parse_chunk(CommitRequest &request, char *data, JsonCommitRequestParser::parse_chunk(char *data, size_t len) {
size_t len) {
if (!json_parser_ || !parser_context_) { if (!json_parser_ || !parser_context_) {
return ParseStatus::Error; return ParseStatus::Error;
} }
@@ -495,7 +494,6 @@ JsonCommitRequestParser::parse_chunk(CommitRequest &request, char *data,
return ParseStatus::Complete; return ParseStatus::Complete;
} }
current_request_ = &request;
WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, data, len); WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, data, len);
switch (status) { switch (status) {
@@ -514,7 +512,7 @@ JsonCommitRequestParser::parse_chunk(CommitRequest &request, char *data,
} }
JsonCommitRequestParser::ParseStatus JsonCommitRequestParser::ParseStatus
JsonCommitRequestParser::finish_streaming_parse(CommitRequest &request) { JsonCommitRequestParser::finish_streaming_parse() {
if (!json_parser_ || !parser_context_) { if (!json_parser_ || !parser_context_) {
return ParseStatus::Error; return ParseStatus::Error;
} }
@@ -523,15 +521,13 @@ JsonCommitRequestParser::finish_streaming_parse(CommitRequest &request) {
return ParseStatus::Error; return ParseStatus::Error;
} }
current_request_ = &request;
// Signal end of input // Signal end of input
WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, nullptr, 0); WeaselJsonStatus status = WeaselJsonParser_parse(json_parser_, nullptr, 0);
if (status == WeaselJson_OK && parser_context_->parse_complete && if (status == WeaselJson_OK && parser_context_->parse_complete &&
!parser_context_->parse_error) { !parser_context_->parse_error) {
// Clear the memory used only during parsing // Clear the memory used only during parsing
parser_context_->reset_arena_memory(&request.arena()); parser_context_->reset_arena_memory(&current_request_->arena());
return ParseStatus::Complete; return ParseStatus::Complete;
} else { } else {
parser_context_->parse_error = parser_context_->parse_error =

View File

@@ -116,9 +116,8 @@ public:
// CommitRequestParser interface implementation // CommitRequestParser interface implementation
bool parse(CommitRequest &request, char *data, size_t len) override; bool parse(CommitRequest &request, char *data, size_t len) override;
bool begin_streaming_parse(CommitRequest &request) override; bool begin_streaming_parse(CommitRequest &request) override;
ParseStatus parse_chunk(CommitRequest &request, char *data, ParseStatus parse_chunk(char *data, size_t len) override;
size_t len) override; ParseStatus finish_streaming_parse() override;
ParseStatus finish_streaming_parse(CommitRequest &request) override;
bool has_parse_error() const override; bool has_parse_error() const override;
const char *get_parse_error() const override; const char *get_parse_error() const override;

View File

@@ -122,15 +122,15 @@ int main(int argc, char *argv[]) {
// Need mutable data for weaseljson // Need mutable data for weaseljson
std::string mutable_chunk = chunk; std::string mutable_chunk = chunk;
status = streaming_parser.parse_chunk( status = streaming_parser.parse_chunk(mutable_chunk.data(),
streaming_request, mutable_chunk.data(), mutable_chunk.size()); mutable_chunk.size());
offset += len; offset += len;
} }
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
std::cout << " Finalizing parse..." << std::endl; std::cout << " Finalizing parse..." << std::endl;
status = streaming_parser.finish_streaming_parse(streaming_request); status = streaming_parser.finish_streaming_parse();
} }
if (status == CommitRequestParser::ParseStatus::Complete) { if (status == CommitRequestParser::ParseStatus::Complete) {

View File

@@ -37,20 +37,17 @@ public:
/** /**
* @brief Parse additional data incrementally. * @brief Parse additional data incrementally.
* @param request The CommitRequest object to populate
* @param data Pointer to the data buffer * @param data Pointer to the data buffer
* @param len Length of the data * @param len Length of the data
* @return ParseStatus indicating current parse state * @return ParseStatus indicating current parse state
*/ */
virtual ParseStatus parse_chunk(CommitRequest &request, char *data, virtual ParseStatus parse_chunk(char *data, size_t len) = 0;
size_t len) = 0;
/** /**
* @brief Finish streaming parse (call when no more data is available). * @brief Finish streaming parse (call when no more data is available).
* @param request The CommitRequest object to populate
* @return ParseStatus indicating final parse result * @return ParseStatus indicating final parse result
*/ */
virtual ParseStatus finish_streaming_parse(CommitRequest &request) = 0; virtual ParseStatus finish_streaming_parse() = 0;
/** /**
* @brief Check if there was a parse error. * @brief Check if there was a parse error.

View File

@@ -499,12 +499,12 @@ TEST_CASE("CommitRequest streaming parsing") {
while (offset < mutable_json.size() && while (offset < mutable_json.size() &&
status == CommitRequestParser::ParseStatus::Incomplete) { status == CommitRequestParser::ParseStatus::Incomplete) {
size_t len = std::min(chunk_size, mutable_json.size() - offset); size_t len = std::min(chunk_size, mutable_json.size() - offset);
status = parser.parse_chunk(request, mutable_json.data() + offset, len); status = parser.parse_chunk(mutable_json.data() + offset, len);
offset += len; offset += len;
} }
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
REQUIRE(status == CommitRequestParser::ParseStatus::Complete); REQUIRE(status == CommitRequestParser::ParseStatus::Complete);
@@ -551,11 +551,11 @@ TEST_CASE("CommitRequest streaming parsing") {
for (size_t i = 0; i < mutable_json.size() && for (size_t i = 0; i < mutable_json.size() &&
status == CommitRequestParser::ParseStatus::Incomplete; status == CommitRequestParser::ParseStatus::Incomplete;
++i) { ++i) {
status = parser.parse_chunk(request, mutable_json.data() + i, 1); status = parser.parse_chunk(mutable_json.data() + i, 1);
} }
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
REQUIRE(status == CommitRequestParser::ParseStatus::Complete); REQUIRE(status == CommitRequestParser::ParseStatus::Complete);
@@ -593,10 +593,10 @@ TEST_CASE("CommitRequest streaming parsing") {
std::string mutable_json = invalid_json; std::string mutable_json = invalid_json;
CommitRequestParser::ParseStatus status = CommitRequestParser::ParseStatus status =
parser.parse_chunk(request, mutable_json.data(), mutable_json.size()); parser.parse_chunk(mutable_json.data(), mutable_json.size());
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
REQUIRE(status == CommitRequestParser::ParseStatus::Error); REQUIRE(status == CommitRequestParser::ParseStatus::Error);
@@ -610,14 +610,14 @@ TEST_CASE("CommitRequest streaming parsing") {
std::string mutable_json = json; std::string mutable_json = json;
CommitRequestParser::ParseStatus status = CommitRequestParser::ParseStatus status =
parser.parse_chunk(request, mutable_json.data(), mutable_json.size()); parser.parse_chunk(mutable_json.data(), mutable_json.size());
// Should still be incomplete (streaming parser doesn't know if more data is // Should still be incomplete (streaming parser doesn't know if more data is
// coming) // coming)
REQUIRE(status == CommitRequestParser::ParseStatus::Incomplete); REQUIRE(status == CommitRequestParser::ParseStatus::Incomplete);
// Signal end of input to complete parsing // Signal end of input to complete parsing
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
REQUIRE(status == CommitRequestParser::ParseStatus::Complete); REQUIRE(status == CommitRequestParser::ParseStatus::Complete);
REQUIRE(request.leader_id() == "test"); REQUIRE(request.leader_id() == "test");
REQUIRE(request.read_version() == 123); REQUIRE(request.read_version() == 123);
@@ -630,10 +630,10 @@ TEST_CASE("CommitRequest streaming parsing") {
std::string mutable_json = json; std::string mutable_json = json;
CommitRequestParser::ParseStatus status = CommitRequestParser::ParseStatus status =
parser.parse_chunk(request, mutable_json.data(), mutable_json.size()); parser.parse_chunk(mutable_json.data(), mutable_json.size());
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
REQUIRE(status == CommitRequestParser::ParseStatus::Complete); REQUIRE(status == CommitRequestParser::ParseStatus::Complete);
@@ -648,10 +648,10 @@ TEST_CASE("CommitRequest streaming parsing") {
std::string mutable_json = json; std::string mutable_json = json;
CommitRequestParser::ParseStatus status = CommitRequestParser::ParseStatus status =
parser.parse_chunk(request, mutable_json.data(), mutable_json.size()); parser.parse_chunk(mutable_json.data(), mutable_json.size());
if (status == CommitRequestParser::ParseStatus::Incomplete) { if (status == CommitRequestParser::ParseStatus::Incomplete) {
status = parser.finish_streaming_parse(request); status = parser.finish_streaming_parse();
} }
REQUIRE(status == CommitRequestParser::ParseStatus::Complete); REQUIRE(status == CommitRequestParser::ParseStatus::Complete);