import ctypes import json import enum import os from typing import Optional event_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p) data_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int) class WeaselJsonCallbacks(ctypes.Structure): _fields_ = [ ("on_begin_object", event_callback), ("on_end_object", event_callback), ("on_begin_string", event_callback), ("on_string_data", data_callback), ("on_end_string", event_callback), ("on_begin_array", event_callback), ("on_end_array", event_callback), ("on_begin_number", event_callback), ("on_number_data", data_callback), ("on_end_number", event_callback), ("on_true_literal", event_callback), ("on_false_literal", event_callback), ("on_null_literal", event_callback), ] class WeaselJsonStatus(enum.Enum): OK = 0 AGAIN = 1 REJECT = 2 OVERFLOW = 3 class WeaselJsonCallbacksBase: def on_begin_object(self): pass def on_end_object(self): pass def on_begin_string(self): pass def on_string_data(self, data): pass def on_end_string(self): pass def on_begin_array(self): pass def on_end_array(self): pass def on_begin_number(self): pass def on_number_data(self, data): pass def on_end_number(self): pass def on_true_literal(self): pass def on_false_literal(self): pass def on_null_literal(self): pass class WeaselJsonParser: def __init__( self, callbacks: WeaselJsonCallbacksBase, build_dir: Optional[str] = None, stackSize: int = 1024, ) -> None: self._lib = None if build_dir is None: build_dir = os.path.dirname(__file__) + "/build" for f in (build_dir + "/" + "libweaseljson.so",): try: self._lib = ctypes.cdll.LoadLibrary(f) except OSError: print("Could not load " + f) pass if self._lib is None: import sys print( "Could not find libweaseljson implementation", file=sys.stderr, ) sys.exit(1) self._lib.WeaselJsonParser_create.argtypes = ( ctypes.c_int, ctypes.POINTER(WeaselJsonCallbacks), ctypes.c_void_p, ) self._lib.WeaselJsonParser_create.restype = ctypes.c_void_p self._lib.WeaselJsonParser_reset.argtypes = (ctypes.c_void_p,) self._lib.WeaselJsonParser_destroy.argtypes = (ctypes.c_void_p,) self._lib.WeaselJsonParser_parse.argtypes = ( ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ) self._lib.WeaselJsonParser_parse.restype = WeaselJsonStatus self.voidp_callbacks = ctypes.cast( ctypes.pointer(ctypes.py_object(callbacks)), ctypes.c_void_p ) self.p = self._lib.WeaselJsonParser_create( stackSize, c_callbacks, self.voidp_callbacks, ) def parse(self, data: bytes) -> WeaselJsonStatus: buf = (ctypes.c_ubyte * len(data)).from_buffer(bytearray(data)) return self._lib.WeaselJsonParser_parse(self.p, buf, len(data)) def reset(self): self._lib.WeaselJsonParser_reset(self.p) def __enter__(self): return self def close(self) -> None: if self.p is not None: self._lib.WeaselJsonParser_destroy(self.p) self.p = None def __exit__(self, exception_type, exception_value, exception_traceback): self.close() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_begin_object(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_begin_object() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_end_object(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_end_object() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_begin_string(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_begin_string() @ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int) def on_string_data(p, buf, len): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_string_data(bytes(ctypes.string_at(buf, len))) @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_end_string(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_end_string() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_begin_array(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_begin_array() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_end_array(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_end_array() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_begin_number(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_begin_number() @ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int) def on_number_data(p, buf, len): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_number_data(bytes(ctypes.string_at(buf, len))) @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_end_number(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_end_number() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_true_literal(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_true_literal() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_false_literal(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_false_literal() @ctypes.CFUNCTYPE(None, ctypes.c_void_p) def on_null_literal(p): self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value self.on_null_literal() c_callbacks = WeaselJsonCallbacks( on_begin_object, on_end_object, on_begin_string, on_string_data, on_end_string, on_begin_array, on_end_array, on_begin_number, on_number_data, on_end_number, on_true_literal, on_false_literal, on_null_literal, ) class MyCallbacks(WeaselJsonCallbacksBase): # override callbacks def on_string_data(self, data): print(data) with WeaselJsonParser(MyCallbacks()) as parser: raw = json.dumps({"hello": "world", "foo": 42}).encode() i = 0 stride = 1 while True: slice = raw[i : i + stride] s = parser.parse(slice) if s != WeaselJsonStatus.AGAIN: break i += stride print(s)