Files
weaseljson/weaseljson.py
Andrew Noyes 5971ad2ef0 Fix segfault issue
I think we need to keep a reference to the ctypes voidp object that we
pass as userdata alive
2025-05-24 18:20:36 -04:00

258 lines
6.6 KiB
Python

import ctypes
import json
import enum
import os
from typing import Optional
event_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p)
data_callback = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
class WeaselJsonCallbacks(ctypes.Structure):
_fields_ = [
("on_begin_object", event_callback),
("on_end_object", event_callback),
("on_begin_string", event_callback),
("on_string_data", data_callback),
("on_end_string", event_callback),
("on_begin_array", event_callback),
("on_end_array", event_callback),
("on_begin_number", event_callback),
("on_number_data", data_callback),
("on_end_number", event_callback),
("on_true_literal", event_callback),
("on_false_literal", event_callback),
("on_null_literal", event_callback),
]
class WeaselJsonStatus(enum.Enum):
OK = 0
AGAIN = 1
REJECT = 2
OVERFLOW = 3
class WeaselJsonCallbacksBase:
def on_begin_object(self):
pass
def on_end_object(self):
pass
def on_begin_string(self):
pass
def on_string_data(self, data):
pass
def on_end_string(self):
pass
def on_begin_array(self):
pass
def on_end_array(self):
pass
def on_begin_number(self):
pass
def on_number_data(self, data):
pass
def on_end_number(self):
pass
def on_true_literal(self):
pass
def on_false_literal(self):
pass
def on_null_literal(self):
pass
class WeaselJsonParser:
def __init__(
self,
callbacks: WeaselJsonCallbacksBase,
build_dir: Optional[str] = None,
stackSize: int = 1024,
) -> None:
self._lib = None
if build_dir is None:
build_dir = os.path.dirname(__file__) + "/build"
for f in (build_dir + "/" + "libweaseljson.so",):
try:
self._lib = ctypes.cdll.LoadLibrary(f)
except OSError:
print("Could not load " + f)
pass
if self._lib is None:
import sys
print(
"Could not find libweaseljson implementation",
file=sys.stderr,
)
sys.exit(1)
self._lib.WeaselJsonParser_create.argtypes = (
ctypes.c_int,
ctypes.POINTER(WeaselJsonCallbacks),
ctypes.c_void_p,
)
self._lib.WeaselJsonParser_create.restype = ctypes.c_void_p
self._lib.WeaselJsonParser_reset.argtypes = (ctypes.c_void_p,)
self._lib.WeaselJsonParser_destroy.argtypes = (ctypes.c_void_p,)
self._lib.WeaselJsonParser_parse.argtypes = (
ctypes.c_void_p,
ctypes.c_void_p,
ctypes.c_int,
)
self._lib.WeaselJsonParser_parse.restype = WeaselJsonStatus
self.voidp_callbacks = ctypes.cast(
ctypes.pointer(ctypes.py_object(callbacks)), ctypes.c_void_p
)
self.p = self._lib.WeaselJsonParser_create(
stackSize,
c_callbacks,
self.voidp_callbacks,
)
def parse(self, data: bytes) -> WeaselJsonStatus:
buf = (ctypes.c_ubyte * len(data)).from_buffer(bytearray(data))
return self._lib.WeaselJsonParser_parse(self.p, buf, len(data))
def reset(self):
self._lib.WeaselJsonParser_reset(self.p)
def __enter__(self):
return self
def close(self) -> None:
if self.p is not None:
self._lib.WeaselJsonParser_destroy(self.p)
self.p = None
def __exit__(self, exception_type, exception_value, exception_traceback):
self.close()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_object(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_object()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_object(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_object()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_string(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_string()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
def on_string_data(p, buf, len):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_string_data(bytes(ctypes.string_at(buf, len)))
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_string(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_string()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_array(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_array()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_array(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_array()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_begin_number(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_begin_number()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_void_p, ctypes.c_int)
def on_number_data(p, buf, len):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_number_data(bytes(ctypes.string_at(buf, len)))
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_end_number(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_end_number()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_true_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_true_literal()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_false_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_false_literal()
@ctypes.CFUNCTYPE(None, ctypes.c_void_p)
def on_null_literal(p):
self = ctypes.cast(p, ctypes.POINTER(ctypes.py_object)).contents.value
self.on_null_literal()
c_callbacks = WeaselJsonCallbacks(
on_begin_object,
on_end_object,
on_begin_string,
on_string_data,
on_end_string,
on_begin_array,
on_end_array,
on_begin_number,
on_number_data,
on_end_number,
on_true_literal,
on_false_literal,
on_null_literal,
)
class MyCallbacks(WeaselJsonCallbacksBase):
# override callbacks
def on_string_data(self, data):
print(data)
with WeaselJsonParser(MyCallbacks()) as parser:
raw = json.dumps({"hello": "world", "foo": 42}).encode()
i = 0
stride = 1
while True:
slice = raw[i : i + stride]
s = parser.parse(slice)
if s != WeaselJsonStatus.AGAIN:
break
i += stride
print(s)