Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for serializing/deserializing user-defined stream-level metadata. #133

Merged
merged 9 commits into from
Jan 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion clp_ffi_py/ir/native.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,12 @@ class KeyValuePairLogEvent:
) -> Tuple[Dict[Any, Any], Dict[Any, Any]]: ...

class Serializer:
def __init__(self, output_stream: IO[bytes], buffer_size_limit: int = 65536): ...
def __init__(
self,
output_stream: IO[bytes],
buffer_size_limit: int = 65536,
user_defined_metadata: Optional[Dict[str, Any]] = None,
): ...
def __enter__(self) -> Serializer: ...
def __exit__(
self,
Expand All @@ -115,5 +120,6 @@ class Deserializer:
allow_incomplete_stream: bool = False,
): ...
def deserialize_log_event(self) -> Optional[KeyValuePairLogEvent]: ...
def get_user_defined_metadata(self) -> Optional[Dict[str, Any]]: ...

class IncompleteStreamError(Exception): ...
13 changes: 13 additions & 0 deletions clp_ffi_py/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,19 @@ def serialize_dict_to_msgpack(dictionary: Dict[Any, Any]) -> bytes:
return msgpack.packb(dictionary)


def serialize_dict_to_json_str(dictionary: Dict[Any, Any]) -> str:
"""
Serializes the given dictionary into a JSON string.

:param dictionary: The dictionary to serialize.
:return: JSON string of the serialized dictionary.
:raises: TypeError The given input is not a dictionary.
"""
if not isinstance(dictionary, dict):
raise TypeError("The type of the input object must be a dictionary.")
return json.dumps(dictionary)


def parse_json_str(json_str: str) -> Any:
"""
Wrapper of `json.loads`, which parses a JSON string into a Python object.
Expand Down
2 changes: 1 addition & 1 deletion src/clp
1 change: 1 addition & 0 deletions src/clp_ffi_py/PyObjectCast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ CLP_FFI_PY_MARK_AS_PYOBJECT(ir::native::PySerializer);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyBytesObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyDictObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyTypeObject);
CLP_FFI_PY_MARK_AS_PYOBJECT(PyUnicodeObject);
} // namespace clp_ffi_py

#endif // CLP_FFI_PY_PY_OBJECT_CAST_HPP
35 changes: 35 additions & 0 deletions src/clp_ffi_py/Py_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace {
constexpr std::string_view cPyFuncNameGetFormattedTimestamp{"get_formatted_timestamp"};
constexpr std::string_view cPyFuncNameGetTimezoneFromTimezoneId{"get_timezone_from_timezone_id"};
constexpr std::string_view cPyFuncNameSerializeDictToMsgpack{"serialize_dict_to_msgpack"};
constexpr std::string_view cPyFuncNameSerializeDictToJsonStr{"serialize_dict_to_json_str"};
constexpr std::string_view cPyFuncNameParseJsonStr{"parse_json_str"};

// NOLINTBEGIN(cppcoreguidelines-avoid-non-const-global-variables)
PyObjectStaticPtr<PyObject> Py_func_get_formatted_timestamp{nullptr};
PyObjectStaticPtr<PyObject> Py_func_get_timezone_from_timezone_id{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_msgpack{nullptr};
PyObjectStaticPtr<PyObject> Py_func_serialize_dict_to_json_str{nullptr};
PyObjectStaticPtr<PyObject> Py_func_parse_json_str{nullptr};

// NOLINTEND(cppcoreguidelines-avoid-non-const-global-variables)
Expand Down Expand Up @@ -68,6 +70,14 @@ auto py_utils_init() -> bool {
return false;
}

Py_func_serialize_dict_to_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameSerializeDictToJsonStr)
));
if (nullptr == Py_func_serialize_dict_to_json_str.get()) {
return false;
}

Py_func_parse_json_str.reset(PyObject_GetAttrString(
py_utils,
get_c_str_from_constexpr_string_view(cPyFuncNameParseJsonStr)
Expand Down Expand Up @@ -122,6 +132,31 @@ auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*
return py_reinterpret_cast<PyBytesObject>(result);
}

auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(O)", py_reinterpret_cast<PyObject>(py_dict))
};
auto* func_args{func_args_ptr.get()};
if (nullptr == func_args) {
return nullptr;
}
auto* result{py_utils_function_call_wrapper(Py_func_serialize_dict_to_json_str.get(), func_args)
};
if (nullptr == result) {
return nullptr;
}
if (false == static_cast<bool>(PyUnicode_Check(result))) {
PyErr_Format(
PyExc_TypeError,
"`%s` is supposed to return a `str` object",
cPyFuncNameSerializeDictToJsonStr
);
return nullptr;
}

return py_reinterpret_cast<PyUnicodeObject>(result);
}

auto py_utils_parse_json_str(std::string_view json_str) -> PyObject* {
PyObjectPtr<PyObject> const func_args_ptr{
Py_BuildValue("(s#)", json_str.data(), static_cast<Py_ssize_t>(json_str.size()))
Expand Down
9 changes: 9 additions & 0 deletions src/clp_ffi_py/Py_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ py_utils_get_formatted_timestamp(clp::ir::epoch_time_ms_t timestamp, PyObject* t
*/
[[nodiscard]] auto py_utils_serialize_dict_to_msgpack(PyDictObject* py_dict) -> PyBytesObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.serialize_dict_to_json_str`.
* @param py_dict
* @return a new reference of a Python Unicode object containing JSON string representation of the
* dictionary.
* @return nullptr on failure with the relevant Python exception and error set.
*/
[[nodiscard]] auto py_utils_serialize_dict_to_json_str(PyDictObject* py_dict) -> PyUnicodeObject*;

/**
* CPython wrapper of `clp_ffi_py.utils.parse_json_str_to_dict`.
* @param json_str
Expand Down
72 changes: 72 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,29 @@
#include "PyDeserializer.hpp"

#include <new>
#include <string>
#include <system_error>
#include <type_traits>
#include <utility>

#include <clp/ffi/ir_stream/decoding_methods.hpp>
#include <clp/ffi/ir_stream/Deserializer.hpp>
#include <clp/ffi/ir_stream/IrUnitType.hpp>
#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <clp/ffi/KeyValuePairLogEvent.hpp>
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <clp/TraceableException.hpp>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/ir/native/PyKeyValuePairLogEvent.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>

namespace clp_ffi_py::ir::native {
Expand Down Expand Up @@ -67,6 +72,23 @@ PyDoc_STRVAR(
);
CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s `get_user_defined_metadata`.
*/
PyDoc_STRVAR(
cPyDeserializerGetUserDefinedMetadataDoc,
"get_user_defined_metadata(self)\n"
"--\n\n"
"Gets the user-defined stream-level metadata.\n\n"
":return:\n"
" - The deserialized user-defined stream-level metadata, loaded as a"
" dictionary.\n"
" - None if user-defined stream-level metadata was not given in the deserialized"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would there be less handling for API users if we return an empty dictionary if "user-defined stream-level metadata was not given in the deserialized IR stream"?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though, it does seem in the Serializer we do permit users to provide a None or "{}" value. I haven't traced down to the CLP code but I think the given metadata string will be encoded as-is?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer to return None indicating that the user-defined metadata wasn't specified in the source.
It is possible to encode an empty dictionary as the metadata though.

" IR stream.\n"
":rtype: dict | None\n"
);
CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject*;

/**
* Callback of `PyDeserializer`'s deallocator.
*/
Expand All @@ -79,6 +101,11 @@ PyMethodDef PyDeserializer_method_table[]{
METH_NOARGS,
static_cast<char const*>(cPyDeserializerDeserializeLogEventDoc)},

{"get_user_defined_metadata",
py_c_function_cast(PyDeserializer_get_user_defined_metadata),
METH_NOARGS,
static_cast<char const*>(cPyDeserializerGetUserDefinedMetadataDoc)},

{nullptr}
};

Expand Down Expand Up @@ -153,6 +180,40 @@ CLP_FFI_PY_METHOD auto PyDeserializer_deserialize_log_event(PyDeserializer* self
return self->deserialize_log_event();
}

CLP_FFI_PY_METHOD auto PyDeserializer_get_user_defined_metadata(PyDeserializer* self) -> PyObject* {
auto const* user_defined_metadata{self->get_user_defined_metadata()};
if (nullptr == user_defined_metadata) {
Py_RETURN_NONE;
}

std::string json_str;
try {
json_str = user_defined_metadata->dump();
} catch (nlohmann::json::exception const& ex) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to serialize the user-defined stream-level metadata into a JSON string."
" Error: %s",
ex.what()
);
return nullptr;
}

PyObjectPtr<PyObject> py_metadata_dict{py_utils_parse_json_str(json_str)};
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry for not being able find any previous discussions on this - we have considered the https://github.com/pybind/pybind11_json and other alternatives discussed at https://www.github.com/nlohmann/json/issues/1014 , right?

just curious, for more efficient conversions, do we have plans to make use of those or create our own bindings?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this isn't in any of the critical path anyways, I would probably defer it to future release

if (nullptr == py_metadata_dict) {
return nullptr;
}
if (false == static_cast<bool>(PyDict_Check(py_metadata_dict.get()))) {
PyErr_SetString(
PyExc_TypeError,
"Failed to convert the user-defined stream-level metadata into a dictionary."
);
return nullptr;
}

return py_metadata_dict.release();
}

CLP_FFI_PY_METHOD auto PyDeserializer_dealloc(PyDeserializer* self) -> void {
self->clean();
Py_TYPE(self)->tp_free(py_reinterpret_cast<PyObject>(self));
Expand Down Expand Up @@ -282,6 +343,17 @@ auto PyDeserializer::deserialize_log_event() -> PyObject* {
Py_RETURN_NONE;
}

auto PyDeserializer::get_user_defined_metadata() const -> nlohmann::json const* {
auto const& metadata{m_deserializer->get_metadata()};
std::string const user_defined_metadata_key{
clp::ffi::ir_stream::cProtocol::Metadata::UserDefinedMetadataKey
};
if (false == metadata.contains(user_defined_metadata_key)) {
return nullptr;
}
return &metadata.at(user_defined_metadata_key);
}

auto PyDeserializer::handle_log_event(clp::ffi::KeyValuePairLogEvent&& log_event) -> IRErrorCode {
if (has_unreleased_deserialized_log_event()) {
// This situation may occur if the deserializer methods return an error after the last
Expand Down
8 changes: 8 additions & 0 deletions src/clp_ffi_py/ir/native/PyDeserializer.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include <clp/ffi/SchemaTree.hpp>
#include <clp/time_types.hpp>
#include <gsl/gsl>
#include <json/single_include/nlohmann/json.hpp>

#include <clp_ffi_py/ir/native/DeserializerBufferReader.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
Expand Down Expand Up @@ -109,6 +110,13 @@ class PyDeserializer {
*/
[[nodiscard]] auto deserialize_log_event() -> PyObject*;

/**
* @return A pointer to the user-defined stream-level metadata, deserialized from the stream's
* preamble, if defined.
* @return std::nullptr if the user-defined stream-level metadata is not defined.
*/
[[nodiscard]] auto get_user_defined_metadata() const -> nlohmann::json const*;

private:
/**
* Class that implements `clp::ffi::ir_stream::IrUnitHandlerInterface` for deserializing
Expand Down
60 changes: 56 additions & 4 deletions src/clp_ffi_py/ir/native/PySerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@
#include <utility>

#include <clp/ffi/ir_stream/protocol_constants.hpp>
#include <json/single_include/nlohmann/json.hpp>
#include <wrapped_facade_headers/msgpack.hpp>

#include <clp_ffi_py/api_decoration.hpp>
#include <clp_ffi_py/error_messages.hpp>
#include <clp_ffi_py/ir/native/error_messages.hpp>
#include <clp_ffi_py/Py_utils.hpp>
#include <clp_ffi_py/PyObjectCast.hpp>
#include <clp_ffi_py/PyObjectUtils.hpp>
#include <clp_ffi_py/utils.hpp>
Expand All @@ -32,7 +34,7 @@ PyDoc_STRVAR(
"Serializer for serializing CLP key-value pair IR streams.\n"
"This class serializes log events into the CLP key-value pair IR format and writes the"
" serialized data to a specified byte stream object.\n\n"
"__init__(self, output_stream, buffer_size_limit=65536)\n\n"
"__init__(self, output_stream, buffer_size_limit=65536, user_defined_metadata=None)\n\n"
"Initializes a :class:`Serializer` instance with the given output stream. Note that each"
" object should only be initialized once. Double initialization will result in a memory"
" leak.\n\n"
Expand All @@ -42,6 +44,11 @@ PyDoc_STRVAR(
":param buffer_size_limit: The maximum amount of serialized data to buffer before flushing"
" it to `output_stream`. Defaults to 64 KiB.\n"
":type buffer_size_limit: int\n"
":param user_defined_metadata: A dictionary representing user-defined stream-level"
" metadata, or None to indicate the absence of such metadata. If a dictionary is provided,"
" it must be valid for serialization as a string using the `Python Standard JSON library"
" <https://docs.python.org/3/library/json.html>`_\.\n"
":type user_defined_metadata: dict | None\n"
);
CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyObject* keywords)
-> int;
Expand Down Expand Up @@ -218,9 +225,11 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
-> int {
static char keyword_output_stream[]{"output_stream"};
static char keyword_buffer_size_limit[]{"buffer_size_limit"};
static char keyword_user_defined_metadata[]{"user_defined_metadata"};
static char* keyword_table[]{
static_cast<char*>(keyword_output_stream),
static_cast<char*>(keyword_buffer_size_limit),
static_cast<char*>(keyword_user_defined_metadata),
nullptr
};

Expand All @@ -229,15 +238,17 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
self->default_init();

PyObject* output_stream{Py_None};
PyObject* py_user_defined_metadata{Py_None};
Py_ssize_t buffer_size_limit{PySerializer::cDefaultBufferSizeLimit};
if (false
== static_cast<bool>(PyArg_ParseTupleAndKeywords(
args,
keywords,
"O|n",
"O|nO",
static_cast<char**>(keyword_table),
&output_stream,
&buffer_size_limit
&buffer_size_limit,
&py_user_defined_metadata
)))
{
return -1;
Expand Down Expand Up @@ -276,7 +287,48 @@ CLP_FFI_PY_METHOD auto PySerializer_init(PySerializer* self, PyObject* args, PyO
return -1;
}

auto serializer_result{PySerializer::ClpIrSerializer::create()};
std::optional<nlohmann::json> optional_user_defined_metadata;
if (Py_None != py_user_defined_metadata) {
if (false == static_cast<bool>(PyDict_Check(py_user_defined_metadata))) {
PyErr_Format(
PyExc_TypeError,
"`%s` must be a dictionary, if not None.",
static_cast<char const*>(keyword_user_defined_metadata)
);
return -1;
}
auto* py_serialized_json_str{py_utils_serialize_dict_to_json_str(
py_reinterpret_cast<PyDictObject>(py_user_defined_metadata)
)};
if (nullptr == py_serialized_json_str) {
return -1;
}
Py_ssize_t json_str_size{};
auto const* json_str_data{PyUnicode_AsUTF8AndSize(
py_reinterpret_cast<PyObject>(py_serialized_json_str),
&json_str_size
)};
if (nullptr == json_str_data) {
return -1;
}
auto parsed_user_defined_metadata = nlohmann::json::parse(
std::string_view{json_str_data, static_cast<size_t>(json_str_size)},
nullptr,
false
);
if (parsed_user_defined_metadata.is_discarded()) {
PyErr_Format(
PyExc_RuntimeError,
"Failed to parse `%s`: %s",
static_cast<char const*>(keyword_user_defined_metadata),
json_str_data
);
return -1;
}
optional_user_defined_metadata = std::move(parsed_user_defined_metadata);
}

auto serializer_result{PySerializer::ClpIrSerializer::create(optional_user_defined_metadata)};
if (serializer_result.has_error()) {
PyErr_Format(
PyExc_RuntimeError,
Expand Down
Loading
Loading