Files
weaseljson/weaseljson.py
Andrew Noyes f6cd807da3 Remove on_{begin,end}_{string,number}
And add `done` arg to data callback
2025-05-25 21:01:37 -04:00

216 lines
5.7 KiB
Python

import ctypes
import json
import enum
import os
from typing import Optional
event_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
data_callback = ctypes.CFUNCTYPE(
None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int
)
class WeaselJsonCallbacks(ctypes.Structure):
_fields_ = [
("on_begin_object", event_callback),
("on_end_object", event_callback),
("on_string_data", data_callback),
("on_begin_array", event_callback),
("on_end_array", event_callback),
("on_number_data", data_callback),
("on_true_literal", event_callback),
("on_false_literal", event_callback),
("on_null_literal", event_callback),
]
class WeaselJsonStatus(enum.Enum):
OK = 0
AGAIN = 1
REJECT = 2
OVERFLOW = 3
class WeaselJsonCallbacksBase:
def on_begin_object(self):
pass
def on_end_object(self):
pass
def on_string_data(self, data, done):
pass
def on_begin_array(self):
pass
def on_end_array(self):
pass
def on_number_data(self, data, done):
pass
def on_true_literal(self):
pass
def on_false_literal(self):
pass
def on_null_literal(self):
pass
class WeaselJsonParser:
def __init__(
self,
callbacks: WeaselJsonCallbacksBase,
build_dir: Optional[str] = None,
stackSize: int = 1024,
) -> None:
self._lib = None
if build_dir is None:
build_dir = os.path.dirname(__file__) + "/build"
for f in (build_dir + "/" + "libweaseljson.so",):
try:
self._lib = ctypes.cdll.LoadLibrary(f)
except OSError:
print("Could not load " + f)
pass
if self._lib is None:
import sys
print(
"Could not find libweaseljson implementation",
file=sys.stderr,
)
sys.exit(1)
self._lib.WeaselJsonParser_create.argtypes = (
ctypes.c_int,
ctypes.POINTER(WeaselJsonCallbacks),
ctypes.c_void_p,
)
self._lib.WeaselJsonParser_create.restype = ctypes.c_void_p
self._lib.WeaselJsonParser_reset.argtypes = (ctypes.c_void_p,)
self._lib.WeaselJsonParser_destroy.argtypes = (ctypes.c_void_p,)
self._lib.WeaselJsonParser_parse.argtypes = (
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_int,
)
self._lib.WeaselJsonParser_parse.restype = WeaselJsonStatus
self.voidp_callbacks = ctypes.cast(
ctypes.pointer(ctypes.py_object(callbacks)), ctypes.c_void_p
)
self.p = self._lib.WeaselJsonParser_create(
stackSize,
c_callbacks,
self.voidp_callbacks,
)
def parse(self, data: bytes) -> WeaselJsonStatus:
buf = (ctypes.c_ubyte * len(data)).from_buffer(bytearray(data))
return self._lib.WeaselJsonParser_parse(self.p, buf, len(data))
def reset(self):
self._lib.WeaselJsonParser_reset(self.p)
def __enter__(self):
return self
def close(self) -> None:
if self.p is not None:
self._lib.WeaselJsonParser_destroy(self.p)
self.p = None
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_object(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_object()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_object(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_object()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int)
def on_string_data(p, buf, len, done):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_string_data(bytes(ctypes.string_at(buf, len)), bool(done))
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_array(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_array()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_array(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_array()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int, ctypes.c_int)
def on_number_data(p, buf, len, done):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_number_data(bytes(ctypes.string_at(buf, len)), bool(done))
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_true_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_true_literal()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_false_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_false_literal()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_null_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_null_literal()
c_callbacks = WeaselJsonCallbacks(
on_begin_object,
on_end_object,
on_string_data,
on_begin_array,
on_end_array,
on_number_data,
on_true_literal,
on_false_literal,
on_null_literal,
)
class MyCallbacks(WeaselJsonCallbacksBase):
# override callbacks
def on_string_data(self, data, done):
print(data)
with WeaselJsonParser(MyCallbacks()) as parser:
raw = json.dumps({"hello": "world", "foo": 42}).encode()
i = 0
stride = 1
while True:
slice = raw[i : i + stride]
s = parser.parse(slice)
if s != WeaselJsonStatus.AGAIN:
break
i += stride
print(s)