Skip to content

Commit

Permalink
feat: Add support for serializing/deserializing user-defined stream-l…
Browse files Browse the repository at this point in the history
…evel metadata. (#133)

Co-authored-by: Junhao Liao <[email protected]>
  • Loading branch information
LinZhihao-723 and junhaoliao authored Jan 26, 2025
1 parent 2a02ccd commit eb59339
Show file tree
Hide file tree
Showing 10 changed files with 231 additions and 6 deletions.
8 changes: 7 additions & 1 deletion clp_ffi_py/ir/native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ class KeyValuePairLogEvent:
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]: ...

class Serializer:
def __init__(self, output_stream: IO[bytes], buffer_size_limit: int = 65536): ...
def __init__(
self,
output_stream: IO[bytes],
buffer_size_limit: int = 65536,
user_defined_metadata: Optional[Dict[str, Any]] = None,
): ...
def __enter__(self) -> Serializer: ...
def __exit__(
self,
Expand All @@ -115,5 +120,6 @@ class Deserializer:
allow_incomplete_stream: bool = False,
): ...
def deserialize_log_event(self) -> Optional[KeyValuePairLogEvent]: ...
def get_user_defined_metadata(self) -> Optional[Dict[str, Any]]: ...

class IncompleteStreamError(Exception): ...
13 changes: 13 additions & 0 deletions clp_ffi_py/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ def serialize_dict_to_msgpack(dictionary: Dict[Any, Any]) -> bytes:
return msgpack.packb(dictionary)


def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
"""
Serializes the given dictionary into a JSON string.
:param dictionary: The dictionary to serialize.
:return: JSON string of the serialized dictionary.
:raises: TypeError The given input is not a dictionary.
"""
if not isinstance(dictionary, dict):
raise TypeError("The type of the input object must be a dictionary.")
return json.dumps(dictionary)


def parse_json_str(json_str: str) -> Any:
"""
Wrapper of `json.loads`, which parses a JSON string into a Python object.
Expand Down
1 change: 1 addition & 0 deletions src/clp_ffi_py/PyObjectCast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PySerializer);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyBytesObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyDictObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyTypeObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyUnicodeObject);
} // namespace clp_ffi_py

#endif // CLP_FFI_PY_PY_OBJECT_CAST_HPP
35 changes: 35 additions & 0 deletions src/clp_ffi_py/Py_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace {
constexpr std::string_view cPyFuncNameGetFormattedTimestamp{"get_formatted_timestamp"};
constexpr std::string_view cPyFuncNameGetTimezoneFromTimezoneId{"get_timezone_from_timezone_id"};
constexpr std::string_view cPyFuncNameSerializeDictToMsgpack{"serialize_dict_to_msgpack"};
constexpr std::string_view cPyFuncNameSerializeDictToJsonStr{"serialize_dict_to_json_str"};
constexpr std::string_view cPyFuncNameParseJsonStr{"parse_json_str"};

// NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables)
PyObjectStaticPtr<PyObject> Py_func_get_formatted_timestamp{nullptr};
PyObjectStaticPtr<PyObject> Py_func_get_timezone_from_timezone_id{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_msgpack{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_json_str{nullptr};
PyObjectStaticPtr<PyObject> Py_func_parse_json_str{nullptr};

// NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables)
Expand Down Expand Up @@ -68,6 +70,14 @@ auto py_utils_init() -> bool {
return false;
}

Py_func_serialize_dict_to_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameSerializeDictToJsonStr)
));
if (nullptr == Py_func_serialize_dict_to_json_str.get()) {
return false;
}

Py_func_parse_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameParseJsonStr)
Expand Down Expand Up @@ -122,6 +132,31 @@ auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*
return py_reinterpret_cast<PyBytesObject>(result);
}

auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(O)", py_reinterpret_cast<PyObject>(py_dict))
};
auto* func_args{func_args_ptr.get()};
if (nullptr == func_args) {
return nullptr;
}
auto* result{py_utils_function_call_wrapper(Py_func_serialize_dict_to_json_str.get(), func_args)
};
if (nullptr == result) {
return nullptr;
}
if (false == static_cast<bool>(PyUnicode_Check(result))) {
PyErr_Format(
PyExc_TypeError,
"`%s` is supposed to return a `str` object",
cPyFuncNameSerializeDictToJsonStr
);
return nullptr;
}

return py_reinterpret_cast<PyUnicodeObject>(result);
}

auto py_utils_parse_json_str(std::string_view json_str) -> PyObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(s#)", json_str.data(), static_cast<Py_ssize_t>(json_str.size()))
Expand Down
9 changes: 9 additions & 0 deletions src/clp_ffi_py/Py_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ py_utils_get_formatted_timestamp(clp::ir::epoch_time_ms_t timestamp, PyObject* t
*/
[[nodiscard]] auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.serialize_dict_to_json_str`.
* @param py_dict
* @return a new reference of a Python Unicode object containing JSON string representation of the
* dictionary.
* @return nullptr on failure with the relevant Python exception and error set.
*/
[[nodiscard]] auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.parse_json_str_to_dict`.
* @param json_str
Expand Down
72 changes: 72 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
#include "PyDeserializer.hpp"

#include <new>
#include <string>
#include <system_error>
#include <type_traits>
#include <utility>

#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/ir_stream/IrUnitType.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <clp/TraceableException.hpp>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>

namespace clp_ffi_py::ir::native {
Expand Down Expand Up @@ -67,6 +72,23 @@ PyDoc_STRVAR(
);
CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s `get_user_defined_metadata`.
*/
PyDoc_STRVAR(
cPyDeserializerGetUserDefinedMetadataDoc,
"get_user_defined_metadata(self)\n"
"--\n\n"
"Gets the user-defined stream-level metadata.\n\n"
":return:\n"
" - The deserialized user-defined stream-level metadata, loaded as a"
" dictionary.\n"
" - None if user-defined stream-level metadata was not given in the deserialized"
" IR stream.\n"
":rtype: dict | None\n"
);
CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s deallocator.
*/
Expand All @@ -79,6 +101,11 @@ PyMethodDef PyDeserializer_method_table[]{
METH_NOARGS,
static_cast<char const*>(cPyDeserializerDeserializeLogEventDoc)},

{"get_user_defined_metadata",
py_c_function_cast(PyDeserializer_get_user_defined_metadata),
METH_NOARGS,
static_cast<char const*>(cPyDeserializerGetUserDefinedMetadataDoc)},

{nullptr}
};

Expand Down Expand Up @@ -153,6 +180,40 @@ CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self
return self->deserialize_log_event();
}

CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject* {
auto const* user_defined_metadata{self->get_user_defined_metadata()};
if (nullptr == user_defined_metadata) {
Py_RETURN_NONE;
}

std::string json_str;
try {
json_str = user_defined_metadata->dump();
} catch (nlohmann::json::exception const& ex) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to serialize the user-defined stream-level metadata into a JSON string."
" Error: %s",
ex.what()
);
return nullptr;
}

PyObjectPtr<PyObject> py_metadata_dict{py_utils_parse_json_str(json_str)};
if (nullptr == py_metadata_dict) {
return nullptr;
}
if (false == static_cast<bool>(PyDict_Check(py_metadata_dict.get()))) {
PyErr_SetString(
PyExc_TypeError,
"Failed to convert the user-defined stream-level metadata into a dictionary."
);
return nullptr;
}

return py_metadata_dict.release();
}

CLP_FFI_PY_METHOD auto PyDeserializer_dealloc(PyDeserializer* self) -> void {
self->clean();
Py_TYPE(self)->tp_free(py_reinterpret_cast<PyObject>(self));
Expand Down Expand Up @@ -282,6 +343,17 @@ auto PyDeserializer::deserialize_log_event() -> PyObject* {
Py_RETURN_NONE;
}

auto PyDeserializer::get_user_defined_metadata() const -> nlohmann::json const* {
auto const& metadata{m_deserializer->get_metadata()};
std::string const user_defined_metadata_key{
clp::ffi::ir_stream::cProtocol::Metadata::UserDefinedMetadataKey
};
if (false == metadata.contains(user_defined_metadata_key)) {
return nullptr;
}
return &metadata.at(user_defined_metadata_key);
}

auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event) -> IRErrorCode {
if (has_unreleased_deserialized_log_event()) {
// This situation may occur if the deserializer methods return an error after the last
Expand Down
8 changes: 8 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <gsl/gsl>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
Expand Down Expand Up @@ -109,6 +110,13 @@ class PyDeserializer {
*/
[[nodiscard]] auto deserialize_log_event() -> PyObject*;

/**
* @return A pointer to the user-defined stream-level metadata, deserialized from the stream's
* preamble, if defined.
* @return std::nullptr if the user-defined stream-level metadata is not defined.
*/
[[nodiscard]] auto get_user_defined_metadata() const -> nlohmann::json const*;

private:
/**
* Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for deserializing
Expand Down
60 changes: 56 additions & 4 deletions src/clp_ffi_py/ir/native/PySerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#include <utility>

#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <json/single_include/nlohmann/json.hpp>
#include <wrapped_facade_headers/msgpack.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>
Expand All @@ -32,7 +34,7 @@ PyDoc_STRVAR(
"Serializer for serializing CLP key-value pair IR streams.\n"
"This class serializes log events into the CLP key-value pair IR format and writes the"
" serialized data to a specified byte stream object.\n\n"
"__init__(self, output_stream, buffer_size_limit=65536)\n\n"
"__init__(self, output_stream, buffer_size_limit=65536, user_defined_metadata=None)\n\n"
"Initializes a :class:`Serializer` instance with the given output stream. Note that each"
" object should only be initialized once. Double initialization will result in a memory"
" leak.\n\n"
Expand All @@ -42,6 +44,11 @@ PyDoc_STRVAR(
":param buffer_size_limit: The maximum amount of serialized data to buffer before flushing"
" it to `output_stream`. Defaults to 64 KiB.\n"
":type buffer_size_limit: int\n"
":param user_defined_metadata: A dictionary representing user-defined stream-level"
" metadata, or None to indicate the absence of such metadata. If a dictionary is provided,"
" it must be valid for serialization as a string using the `Python Standard JSON library"
" <https://docs.python.org/3/library/json.html>`_\.\n"
":type user_defined_metadata: dict | None\n"
);
CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyObject* keywords)
-> int;
Expand Down Expand Up @@ -218,9 +225,11 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
-> int {
static char keyword_output_stream[]{"output_stream"};
static char keyword_buffer_size_limit[]{"buffer_size_limit"};
static char keyword_user_defined_metadata[]{"user_defined_metadata"};
static char* keyword_table[]{
static_cast<char*>(keyword_output_stream),
static_cast<char*>(keyword_buffer_size_limit),
static_cast<char*>(keyword_user_defined_metadata),
nullptr
};

Expand All @@ -229,15 +238,17 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
self->default_init();

PyObject* output_stream{Py_None};
PyObject* py_user_defined_metadata{Py_None};
Py_ssize_t buffer_size_limit{PySerializer::cDefaultBufferSizeLimit};
if (false
== static_cast<bool>(PyArg_ParseTupleAndKeywords(
args,
keywords,
"O|n",
"O|nO",
static_cast<char**>(keyword_table),
&output_stream,
&buffer_size_limit
&buffer_size_limit,
&py_user_defined_metadata
)))
{
return -1;
Expand Down Expand Up @@ -276,7 +287,48 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
return -1;
}

auto serializer_result{PySerializer::ClpIrSerializer::create()};
std::optional<nlohmann::json> optional_user_defined_metadata;
if (Py_None != py_user_defined_metadata) {
if (false == static_cast<bool>(PyDict_Check(py_user_defined_metadata))) {
PyErr_Format(
PyExc_TypeError,
"`%s` must be a dictionary, if not None.",
static_cast<char const*>(keyword_user_defined_metadata)
);
return -1;
}
auto* py_serialized_json_str{py_utils_serialize_dict_to_json_str(
py_reinterpret_cast<PyDictObject>(py_user_defined_metadata)
)};
if (nullptr == py_serialized_json_str) {
return -1;
}
Py_ssize_t json_str_size{};
auto const* json_str_data{PyUnicode_AsUTF8AndSize(
py_reinterpret_cast<PyObject>(py_serialized_json_str),
&json_str_size
)};
if (nullptr == json_str_data) {
return -1;
}
auto parsed_user_defined_metadata = nlohmann::json::parse(
std::string_view{json_str_data, static_cast<size_t>(json_str_size)},
nullptr,
false
);
if (parsed_user_defined_metadata.is_discarded()) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to parse `%s`: %s",
static_cast<char const*>(keyword_user_defined_metadata),
json_str_data
);
return -1;
}
optional_user_defined_metadata = std::move(parsed_user_defined_metadata);
}

auto serializer_result{PySerializer::ClpIrSerializer::create(optional_user_defined_metadata)};
if (serializer_result.has_error()) {
PyErr_Format(
PyExc_RuntimeError,
Expand Down
Loading

0 comments on commit eb59339

Please sign in to comment.