Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for serializing/deserializing user-defined stream-level metadata. #133

Merged
merged 9 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion clp_ffi_py/ir/native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ class KeyValuePairLogEvent:
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]: ...

class Serializer:
def __init__(self, output_stream: IO[bytes], buffer_size_limit: int = 65536): ...
def __init__(
self,
output_stream: IO[bytes],
buffer_size_limit: int = 65536,
user_defined_metadata: Optional[Dict[str, Any]] = None,
): ...
def __enter__(self) -> Serializer: ...
def __exit__(
self,
Expand All @@ -115,5 +120,6 @@ class Deserializer:
allow_incomplete_stream: bool = False,
): ...
def deserialize_log_event(self) -> Optional[KeyValuePairLogEvent]: ...
def get_user_defined_metadata(self) -> Optional[Dict[str, Any]]: ...

class IncompleteStreamError(Exception): ...
13 changes: 13 additions & 0 deletions clp_ffi_py/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ def serialize_dict_to_msgpack(dictionary: Dict[Any, Any]) -> bytes:
return msgpack.packb(dictionary)


def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
"""
Serializes the given dictionary into a JSON string.

:param dictionary: The dictionary to serialize.
:return: JSON string of serialized dictionary.
LinZhihao-723 marked this conversation as resolved.
Show resolved Hide resolved
:raises: TypeError The given input is not a dictionary.
"""
if not isinstance(dictionary, dict):
raise TypeError("The type of the input object must be a dictionary.")
return json.dumps(dictionary)

Copy link

@coderabbitai coderabbitai bot Jan 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling for JSON serialization failures.

The function should handle potential JSON serialization errors that could occur with complex Python objects or circular references.

Apply this diff to add error handling:

 def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
     """
     Serializes the given dictionary into a JSON string.

     :param dictionary: The dictionary to serialize.
     :return: JSON string of serialized dictionary.
     :raises: TypeError The given input is not a dictionary.
+    :raises: TypeError The dictionary contains values that are not JSON serializable.
     """
     if not isinstance(dictionary, dict):
         raise TypeError("The type of the input object must be a dictionary.")
-    return json.dumps(dictionary)
+    try:
+        return json.dumps(dictionary)
+    except TypeError as e:
+        raise TypeError(f"The dictionary contains values that are not JSON serializable: {e}")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
"""
Serializes the given dictionary into a JSON string.
:param dictionary: The dictionary to serialize.
:return: JSON string of serialized dictionary.
:raises: TypeError The given input is not a dictionary.
"""
if not isinstance(dictionary, dict):
raise TypeError("The type of the input object must be a dictionary.")
return json.dumps(dictionary)
def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
"""
Serializes the given dictionary into a JSON string.
:param dictionary: The dictionary to serialize.
:return: JSON string of serialized dictionary.
:raises: TypeError The given input is not a dictionary.
:raises: TypeError The dictionary contains values that are not JSON serializable.
"""
if not isinstance(dictionary, dict):
raise TypeError("The type of the input object must be a dictionary.")
try:
return json.dumps(dictionary)
except TypeError as e:
raise TypeError(f"The dictionary contains values that are not JSON serializable: {e}")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is re-throwing the same exception here making any difference?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LinZhihao-723 You make a good point. Re-throwing the same TypeError doesn't add significant value here since json.dumps already provides a descriptive error message indicating which value couldn't be serialized. The extra try/except block would just add complexity without meaningful benefit. The original implementation letting json.dumps' TypeError propagate naturally is cleaner and sufficient.


✏️ Learnings added
Learnt from: LinZhihao-723
PR: y-scope/clp-ffi-py#133
File: clp_ffi_py/utils.py:51-62
Timestamp: 2025-01-26T10:21:10.011Z
Learning: For the serialize_dict_to_json_str function in clp_ffi_py/utils.py, let json.dumps' TypeError propagate naturally as it already provides sufficient error context, rather than catching and re-throwing with a custom message.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


def parse_json_str(json_str: str) -> Any:
"""
Wrapper of `json.loads`, which parses a JSON string into a Python object.
Expand Down
1 change: 1 addition & 0 deletions src/clp_ffi_py/PyObjectCast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PySerializer);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyBytesObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyDictObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyTypeObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyUnicodeObject);
} // namespace clp_ffi_py

#endif // CLP_FFI_PY_PY_OBJECT_CAST_HPP
35 changes: 35 additions & 0 deletions src/clp_ffi_py/Py_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace {
constexpr std::string_view cPyFuncNameGetFormattedTimestamp{"get_formatted_timestamp"};
constexpr std::string_view cPyFuncNameGetTimezoneFromTimezoneId{"get_timezone_from_timezone_id"};
constexpr std::string_view cPyFuncNameSerializeDictToMsgpack{"serialize_dict_to_msgpack"};
constexpr std::string_view cPyFuncNameSerializeDictToJsonStr{"serialize_dict_to_json_str"};
constexpr std::string_view cPyFuncNameParseJsonStr{"parse_json_str"};

// NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables)
PyObjectStaticPtr<PyObject> Py_func_get_formatted_timestamp{nullptr};
PyObjectStaticPtr<PyObject> Py_func_get_timezone_from_timezone_id{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_msgpack{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_json_str{nullptr};
PyObjectStaticPtr<PyObject> Py_func_parse_json_str{nullptr};

// NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables)
Expand Down Expand Up @@ -68,6 +70,14 @@ auto py_utils_init() -> bool {
return false;
}

Py_func_serialize_dict_to_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameSerializeDictToJsonStr)
));
if (nullptr == Py_func_serialize_dict_to_json_str.get()) {
return false;
}

Py_func_parse_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameParseJsonStr)
Expand Down Expand Up @@ -122,6 +132,31 @@ auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*
return py_reinterpret_cast<PyBytesObject>(result);
}

auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(O)", py_reinterpret_cast<PyObject>(py_dict))
};
auto* func_args{func_args_ptr.get()};
if (nullptr == func_args) {
return nullptr;
}
auto* result{py_utils_function_call_wrapper(Py_func_serialize_dict_to_json_str.get(), func_args)
};
if (nullptr == result) {
return nullptr;
}
if (false == static_cast<bool>(PyUnicode_Check(result))) {
PyErr_Format(
PyExc_TypeError,
"`%s` is supposed to return a `str` object",
cPyFuncNameSerializeDictToJsonStr
);
return nullptr;
}

return py_reinterpret_cast<PyUnicodeObject>(result);
}

auto py_utils_parse_json_str(std::string_view json_str) -> PyObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(s#)", json_str.data(), static_cast<Py_ssize_t>(json_str.size()))
Expand Down
9 changes: 9 additions & 0 deletions src/clp_ffi_py/Py_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ py_utils_get_formatted_timestamp(clp::ir::epoch_time_ms_t timestamp, PyObject* t
*/
[[nodiscard]] auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.serialize_dict_to_json_str`.
* @param py_dict
* @return a new reference of a Python Unicode object containing JSON string representation of the
* dictionary.
* @return nullptr on failure with the relevant Python exception and error set.
*/
[[nodiscard]] auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.parse_json_str_to_dict`.
* @param json_str
Expand Down
80 changes: 80 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
#include "PyDeserializer.hpp"

#include <new>
#include <string>
#include <system_error>
#include <type_traits>
#include <utility>

#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/ir_stream/IrUnitType.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <clp/TraceableException.hpp>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>

namespace clp_ffi_py::ir::native {
Expand Down Expand Up @@ -67,6 +72,23 @@ PyDoc_STRVAR(
);
CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s `get_user_defined_metadata`.
*/
PyDoc_STRVAR(
cPyDeserializerGetUserDefinedMetadataDoc,
"get_user_defined_metadata(self)\n"
"--\n\n"
"Gets the user-defined stream-level metadata.\n\n"
":return:\n"
" - The deserialized user-defined stream-level metadata, loaded as a"
" dictionary.\n"
" - None if user-defined stream-level metadata was not given in the deserialized"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be less handling for API users if we return an empty dictionary if "user-defined stream-level metadata was not given in the deserialized IR stream"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, it does seem in the Serializer we do permit users to provide a None or "{}" value. I haven't traced down to the CLP code but I think the given metadata string will be encoded as-is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to return None indicating that the user-defined metadata wasn't specified in the source.
It is possible to encode an empty dictionary as the metadata though.

" IR stream.\n"
":rtype: dict | None\n"
);
CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s deallocator.
*/
Expand All @@ -79,6 +101,11 @@ PyMethodDef PyDeserializer_method_table[]{
METH_NOARGS,
static_cast<char const*>(cPyDeserializerDeserializeLogEventDoc)},

{"get_user_defined_metadata",
py_c_function_cast(PyDeserializer_get_user_defined_metadata),
METH_NOARGS,
static_cast<char const*>(cPyDeserializerGetUserDefinedMetadataDoc)},

{nullptr}
};

Expand Down Expand Up @@ -153,6 +180,48 @@ CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self
return self->deserialize_log_event();
}

CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject* {
auto const* user_defined_metadata{self->get_user_defined_metadata()};
if (nullptr == user_defined_metadata) {
Py_RETURN_NONE;
}

if (false == user_defined_metadata->is_object()) {
PyErr_SetString(
PyExc_TypeError,
"User-defined stream-level metadata is not a JSON object."
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the type of this JSON value is not an object, it means either the file is not created using a Serializer we provide, or something went wrong in our code, right?

Though i imagine such cases are rare, would it be more helpful to debug such case if we include the JSON value type in the error message?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the only possible case is that someone created a stream that follows our format but doesn't enforce the use of a JSON object as the metadata. Technically we should check this in the ffi core. Let me think about it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As improved in y-scope/clp#695, we moved this check to the ffi core. The latest commit should have this check removed (however, we need to merge #134 first).

);
return nullptr;
}

std::string json_str;
try {
json_str = user_defined_metadata->dump();
} catch (nlohmann::json::exception const& ex) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to serialize the user-defined stream-level metadata into a JSON string."
" Error: %s",
ex.what()
);
return nullptr;
}

PyObjectPtr<PyObject> py_metadata_dict{py_utils_parse_json_str(json_str)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for not being able find any previous discussions on this - we have considered the https://github.com/pybind/pybind11_json and other alternatives discussed at https://www.github.com/nlohmann/json/issues/1014 , right?

just curious, for more efficient conversions, do we have plans to make use of those or create our own bindings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this isn't in any of the critical path anyways, I would probably defer it to future release

if (nullptr == py_metadata_dict) {
return nullptr;
}
if (false == static_cast<bool>(PyDict_Check(py_metadata_dict.get()))) {
PyErr_SetString(
PyExc_TypeError,
"Failed to convert the user-defined stream-level metadata into a dictionary."
);
return nullptr;
}

return py_metadata_dict.release();
}

CLP_FFI_PY_METHOD auto PyDeserializer_dealloc(PyDeserializer* self) -> void {
self->clean();
Py_TYPE(self)->tp_free(py_reinterpret_cast<PyObject>(self));
Expand Down Expand Up @@ -282,6 +351,17 @@ auto PyDeserializer::deserialize_log_event() -> PyObject* {
Py_RETURN_NONE;
}

auto PyDeserializer::get_user_defined_metadata() const -> nlohmann::json const* {
auto const& metadata{m_deserializer->get_metadata()};
std::string const user_defined_metadata_key{
clp::ffi::ir_stream::cProtocol::Metadata::UserDefinedMetadataKey
};
if (false == metadata.contains(user_defined_metadata_key)) {
return nullptr;
}
return &metadata.at(user_defined_metadata_key);
}

auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event) -> IRErrorCode {
if (has_unreleased_deserialized_log_event()) {
// This situation may occur if the deserializer methods return an error after the last
Expand Down
8 changes: 8 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <gsl/gsl>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
Expand Down Expand Up @@ -109,6 +110,13 @@ class PyDeserializer {
*/
[[nodiscard]] auto deserialize_log_event() -> PyObject*;

/**
* @return A pointer to the user-defined stream-level metadata, deserialized from the stream's
* preamble, if defined.
* @return std::nullptr if the user-defined stream-level metadata is not defined.
*/
[[nodiscard]] auto get_user_defined_metadata() const -> nlohmann::json const*;

private:
/**
* Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for deserializing
Expand Down
60 changes: 56 additions & 4 deletions src/clp_ffi_py/ir/native/PySerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#include <utility>

#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <json/single_include/nlohmann/json.hpp>
#include <wrapped_facade_headers/msgpack.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>
Expand All @@ -32,7 +34,7 @@ PyDoc_STRVAR(
"Serializer for serializing CLP key-value pair IR streams.\n"
"This class serializes log events into the CLP key-value pair IR format and writes the"
" serialized data to a specified byte stream object.\n\n"
"__init__(self, output_stream, buffer_size_limit=65536)\n\n"
"__init__(self, output_stream, buffer_size_limit=65536, user_defined_metadata=None)\n\n"
"Initializes a :class:`Serializer` instance with the given output stream. Note that each"
" object should only be initialized once. Double initialization will result in a memory"
" leak.\n\n"
Expand All @@ -42,6 +44,11 @@ PyDoc_STRVAR(
":param buffer_size_limit: The maximum amount of serialized data to buffer before flushing"
" it to `output_stream`. Defaults to 64 KiB.\n"
":type buffer_size_limit: int\n"
":param user_defined_metadata: A dictionary representing user-defined stream-level"
" metadata, or None to indicate the absence of such metadata. If a dictionary is provided,"
" it must be valid for serialization as a string using the `Python Standard JSON library"
" <https://docs.python.org/3/library/json.html>`_\.\n"
":type user_defined_metadata: dict | None\n"
);
CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyObject* keywords)
-> int;
Expand Down Expand Up @@ -218,9 +225,11 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
-> int {
static char keyword_output_stream[]{"output_stream"};
static char keyword_buffer_size_limit[]{"buffer_size_limit"};
static char keyword_user_defined_metadata[]{"user_defined_metadata"};
static char* keyword_table[]{
static_cast<char*>(keyword_output_stream),
static_cast<char*>(keyword_buffer_size_limit),
static_cast<char*>(keyword_user_defined_metadata),
nullptr
};

Expand All @@ -229,15 +238,17 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
self->default_init();

PyObject* output_stream{Py_None};
PyObject* py_user_defined_metadata{Py_None};
Py_ssize_t buffer_size_limit{PySerializer::cDefaultBufferSizeLimit};
if (false
== static_cast<bool>(PyArg_ParseTupleAndKeywords(
args,
keywords,
"O|n",
"O|nO",
static_cast<char**>(keyword_table),
&output_stream,
&buffer_size_limit
&buffer_size_limit,
&py_user_defined_metadata
)))
{
return -1;
Expand Down Expand Up @@ -276,7 +287,48 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
return -1;
}

auto serializer_result{PySerializer::ClpIrSerializer::create()};
std::optional<nlohmann::json> optional_user_defined_metadata;
if (Py_None != py_user_defined_metadata) {
if (false == static_cast<bool>(PyDict_Check(py_user_defined_metadata))) {
PyErr_Format(
PyExc_TypeError,
"`%s` must be a dictionary, if not None.",
static_cast<char const*>(keyword_user_defined_metadata)
);
return -1;
}
auto* py_serialized_json_str{py_utils_serialize_dict_to_json_str(
py_reinterpret_cast<PyDictObject>(py_user_defined_metadata)
)};
if (nullptr == py_serialized_json_str) {
return -1;
}
Py_ssize_t json_str_size{};
auto const* json_str_data{PyUnicode_AsUTF8AndSize(
py_reinterpret_cast<PyObject>(py_serialized_json_str),
&json_str_size
)};
if (nullptr == json_str_data) {
return -1;
}
auto parsed_user_defined_metadata = nlohmann::json::parse(
std::string_view{json_str_data, static_cast<size_t>(json_str_size)},
nullptr,
false
);
if (parsed_user_defined_metadata.is_discarded()) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to parse `%s`: %s",
static_cast<char const*>(keyword_user_defined_metadata),
json_str_data
);
return -1;
}
optional_user_defined_metadata = std::move(parsed_user_defined_metadata);
}

auto serializer_result{PySerializer::ClpIrSerializer::create(optional_user_defined_metadata)};
if (serializer_result.has_error()) {
PyErr_Format(
PyExc_RuntimeError,
Expand Down
Loading
Loading