From e13f75383310a747ed43ef3a751aeb75a3ac692c Mon Sep 17 00:00:00 2001 From: Oren Leiman Date: Tue, 14 Jan 2025 20:59:51 -0800 Subject: [PATCH] iceberg: Single-value JSON serde Needed for processing default values for schema fields. As specified in https://iceberg.apache.org/spec/#json-single-value-serialization Signed-off-by: Oren Leiman --- src/v/iceberg/BUILD | 23 + src/v/iceberg/CMakeLists.txt | 1 + src/v/iceberg/tests/BUILD | 19 + src/v/iceberg/tests/CMakeLists.txt | 11 + src/v/iceberg/tests/values_json_test.cc | 911 ++++++++++++++++++++++++ src/v/iceberg/values_json.cc | 676 ++++++++++++++++++ src/v/iceberg/values_json.h | 55 ++ 7 files changed, 1696 insertions(+) create mode 100644 src/v/iceberg/tests/values_json_test.cc create mode 100644 src/v/iceberg/values_json.cc create mode 100644 src/v/iceberg/values_json.h diff --git a/src/v/iceberg/BUILD b/src/v/iceberg/BUILD index b66009289b3d3..6b3fe4a14fb86 100644 --- a/src/v/iceberg/BUILD +++ b/src/v/iceberg/BUILD @@ -194,6 +194,29 @@ redpanda_cc_library( ], ) +redpanda_cc_library( + name = "values_json", + srcs = [ + "values_json.cc", + ], + hdrs = [ + "values_json.h", + ], + include_prefix = "iceberg", + deps = [ + ":datatypes", + ":json_utils", + ":json_writer", + ":values", + "//src/v/json", + "//src/v/strings:string_switch", + "@abseil-cpp//absl/strings", + "@abseil-cpp//absl/time", + "@boost//:iostreams", + "@boost//:range", + ], +) + redpanda_cc_library( name = "field_collecting_visitor", hdrs = [ diff --git a/src/v/iceberg/CMakeLists.txt b/src/v/iceberg/CMakeLists.txt index 25aa4717efd38..a2830565d9528 100644 --- a/src/v/iceberg/CMakeLists.txt +++ b/src/v/iceberg/CMakeLists.txt @@ -73,6 +73,7 @@ v_cc_library( values.cc values_avro.cc values_bytes.cc + values_json.cc rest_client/json.cc rest_client/retry_policy.cc rest_client/catalog_client.cc diff --git a/src/v/iceberg/tests/BUILD b/src/v/iceberg/tests/BUILD index 75fe90bd7a4ec..708ce71535c5d 100644 --- a/src/v/iceberg/tests/BUILD +++ b/src/v/iceberg/tests/BUILD @@ -75,6 +75,25 @@ redpanda_cc_gtest( ], ) +redpanda_cc_gtest( + name = "values_json_test", + timeout = "short", + srcs = [ + "values_json_test.cc", + ], + deps = [ + ":test_schemas", + "//src/v/bytes:random", + "//src/v/iceberg:json_writer", + "//src/v/iceberg:values", + "//src/v/iceberg:values_json", + "//src/v/json", + "//src/v/test_utils:gtest", + "@abseil-cpp//absl/numeric:int128", + "@googletest//:gtest", + ], +) + redpanda_cc_gtest( name = "filesystem_catalog_test", timeout = "short", diff --git a/src/v/iceberg/tests/CMakeLists.txt b/src/v/iceberg/tests/CMakeLists.txt index 19ce79c01d048..532cadede1ae3 100644 --- a/src/v/iceberg/tests/CMakeLists.txt +++ b/src/v/iceberg/tests/CMakeLists.txt @@ -77,6 +77,17 @@ rp_test( ARGS "-- -c1" ) +rp_test( + UNIT_TEST + GTEST + BINARY_NAME iceberg_values_json + SOURCES + values_json_test.cc + LIBRARIES + v::gtest_main + v::iceberg_test_utils +) + rp_test( BENCHMARK_TEST BINARY_NAME iceberg_uri diff --git a/src/v/iceberg/tests/values_json_test.cc b/src/v/iceberg/tests/values_json_test.cc new file mode 100644 index 0000000000000..9839bda72a3db --- /dev/null +++ b/src/v/iceberg/tests/values_json_test.cc @@ -0,0 +1,911 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "bytes/random.h" +#include "gmock/gmock.h" +#include "iceberg/tests/test_schemas.h" +#include "iceberg/values.h" +#include "iceberg/values_json.h" +#include "json/document.h" +#include "json/json.h" + +#include +#include + +#include + +using namespace iceberg; + +namespace { + +template +concept StructuredValue = std::is_same_v + || std::is_same_v + || std::is_same_v; + +struct values_json_test_case { + values_json_test_case() = default; + values_json_test_case( + std::string description, + primitive_value val, + field_type type, + std::string expect = "") + : desc(std::move(description)) + , val(std::move(val)) + , type(std::move(type)) + , json_rep(std::move(expect)) {} + + values_json_test_case( + std::string description, field_type type, std::string expect = "") + : desc(std::move(description)) + , val(int_value{}) + , type(std::move(type)) + , json_rep(std::move(expect)) {} + template + values_json_test_case( + std::string description, + std::unique_ptr val, + field_type type, + std::string expect = "") + : desc(std::move(description)) + , val(std::move(val)) + , type(std::move(type)) + , json_rep(std::move(expect)) {} + values_json_test_case(const values_json_test_case& other) + : desc(other.desc) + , val(make_copy(other.val)) + , type(make_copy(other.type)) + , json_rep(other.json_rep) {} + values_json_test_case(values_json_test_case&& other) noexcept + : desc(std::move(other.desc)) + , val(std::move(other.val)) + , type(std::move(other.type)) + , json_rep(std::move(other.json_rep)) {} + + ~values_json_test_case() = default; + + values_json_test_case& operator=(const values_json_test_case&) = delete; + values_json_test_case& operator=(values_json_test_case&&) = delete; + + friend std::ostream& + operator<<(std::ostream& os, const values_json_test_case&); + + std::string desc; + value val; + field_type type; + std::string json_rep; +}; + +std::ostream& operator<<(std::ostream& os, const values_json_test_case& tc) { + return os << tc.desc; +} + +template +ss::sstring json_str(T v) { + json::StringBuffer buf; + json::Writer writer(buf); + rjson_serialize(writer, v); + return buf.GetString(); +} + +static auto some_uuid = uuid_t::create(); +static auto some_random_bytes = random_generators::get_bytes(); + +static const std::vector test_cases{ + values_json_test_case{ + "boolean_value false", + boolean_value{false}, + boolean_type{}, + "false", + }, + values_json_test_case{ + "boolean_value true", + boolean_value{true}, + boolean_type{}, + "true", + }, + values_json_test_case{ + "int_value max", + int_value{std::numeric_limits::max()}, + int_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "int_value min", + int_value{std::numeric_limits::min()}, + int_type{}, + json_str(std::numeric_limits::min()), + }, + values_json_test_case{ + "long_value max", + long_value{std::numeric_limits::max()}, + long_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "long_value min", + long_value{std::numeric_limits::min()}, + long_type{}, + json_str(std::numeric_limits::min()), + }, + values_json_test_case{ + "long_value (short range value)", + long_value{1}, + long_type{}, + json_str(1), + }, + values_json_test_case{ + "float_value max", + float_value{std::numeric_limits::max() / 10}, + float_type{}, + json_str(std::numeric_limits::max() / 10), + }, + values_json_test_case{ + "float_value min", + float_value{std::numeric_limits::min() * 10}, + float_type{}, + json_str(std::numeric_limits::min() * 10), + }, + values_json_test_case{ + "double_value max", + double_value{std::numeric_limits::max()}, + double_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "double_value min", + double_value{std::numeric_limits::min()}, + double_type{}, + json_str(std::numeric_limits::min()), + }, + values_json_test_case{ + "date_value", + date_value{365}, + date_type{}, + R"("1971-01-01")", + }, + values_json_test_case{ + "time_value", + time_value{(24L * 60L * 60L) * 1000000L - 999999L}, + time_type{}, + R"("23:59:59.000001")", + }, + values_json_test_case{ + "timestamp_value", + timestamp_value{365ul * 24ul * 60ul * 60ul * 1000000ul + 23}, + timestamp_type{}, + R"("1971-01-01T00:00:00.000023")", + }, + values_json_test_case{ + "timestamptz_value", + timestamptz_value{365ul * 24ul * 60ul * 60ul * 1000000ul + 23}, + timestamptz_type{}, + R"("1971-01-01T00:00:00.000023+00:00")", + }, + values_json_test_case{ + "string_value", + string_value{bytes_to_iobuf(bytes::from_string("foobar"))}, + string_type{}, + R"("foobar")", + }, + values_json_test_case{ + "uuid_value", + uuid_value{some_uuid}, + uuid_type{}, + fmt::format(R"("{}")", some_uuid), + }, + values_json_test_case{ + "fixed_value", + fixed_value{bytes_to_iobuf(bytes{1, 2, 3, 4, 5, 6, 7, 8, 255})}, + fixed_type{9}, + fmt::format(R"("{}")", to_hex(bytes{1, 2, 3, 4, 5, 6, 7, 8, 255})), + }, + values_json_test_case{ + "binary_value", + binary_value{bytes_to_iobuf(some_random_bytes)}, + binary_type{}, + fmt::format(R"("{}")", to_hex(some_random_bytes)), + }, + values_json_test_case{ + "decimal_value", + decimal_value{absl::int128{std::numeric_limits::max()}}, + decimal_type{21, 8}, + R"("92233720368.54775807")", + }, + values_json_test_case{ + "decimal_value (trailing decimal point)", + decimal_value{absl::int128{std::numeric_limits::max()}}, + decimal_type{21, 0}, + R"("9223372036854775807.")", + }, + values_json_test_case{ + "struct_value", + std::make_unique([]() -> decltype(struct_value::fields) { + decltype(struct_value::fields) vec; + vec.emplace_back(int_value{23}); + vec.emplace_back( + string_value{bytes_to_iobuf(bytes::from_string("foobar"))}); + vec.emplace_back( + std::make_unique([]() -> decltype(list_value::elements) { + decltype(list_value::elements) vec; + for (auto i : std::views::iota(0, 4)) { + if (i == 0) { + vec.emplace_back(std::nullopt); + } else { + vec.emplace_back(int_value{i}); + } + } + return vec; + }())); + return vec; + }()), + []() -> field_type { + struct_type s; + s.fields.emplace_back( + nested_field::create(8, "field_one", field_required::no, int_type{})); + s.fields.emplace_back(nested_field::create( + 9, "field_two", field_required::no, string_type{})); + s.fields.emplace_back(nested_field::create( + 10, + "field_three", + field_required::no, + list_type::create(4, field_required::no, int_type{}))); + return s; + }(), + R"({"8":23,"9":"foobar","10":[null,1,2,3]})", + }, + values_json_test_case{ + "list_value", + std::make_unique([]() -> decltype(list_value::elements) { + decltype(list_value::elements) vec; + for (auto i : std::views::iota(0, 3)) { + vec.emplace_back(std::make_unique( + [i]() -> decltype(struct_value::fields) { + decltype(struct_value::fields) vec; + if (i == 0) { + vec.emplace_back(string_value{ + bytes_to_iobuf(bytes::from_string("foobar"))}); + } else { + vec.emplace_back(std::nullopt); + } + return vec; + }())); + } + return vec; + }()), + list_type::create( + 1, + field_required::no, + []() -> field_type { + struct_type s; + s.fields.emplace_back( + nested_field::create(42, "key", field_required::no, string_type{})); + return s; + }()), + R"([{"42":"foobar"},{"42":null},{"42":null}])", + }, + values_json_test_case{ + "map_value", + std::make_unique([]() -> decltype(map_value::kvs) { + decltype(map_value::kvs) vec; + vec.emplace_back(kv_value{ + int_value{23}, + string_value{bytes_to_iobuf(bytes::from_string("foobar"))}}); + vec.emplace_back(kv_value{int_value{42}, std::nullopt}); + return vec; + }()), + map_type::create(1, int_type{}, 2, field_required::no, string_type{}), + R"({"keys":[23,42],"values":["foobar",null]})", + }, +}; + +struct ValuesJsonTestBase + : ::testing::Test + , testing::WithParamInterface { + const auto& get_value() const { return GetParam().val; } + const field_type& get_type() const { return GetParam().type; } + std::string_view get_json_rep() const { return GetParam().json_rep; } + std::string_view get_description() const { return GetParam().desc; } +}; + +} // namespace + +struct ValuesJsonTest : public ValuesJsonTestBase {}; + +INSTANTIATE_TEST_SUITE_P( + ValuesJsonSerde, ValuesJsonTest, ::testing::ValuesIn(test_cases)); + +std::string value_to_json_str(const value& v, const field_type& t) { + json::chunked_buffer buf; + iceberg::json_writer w(buf); + value_to_json(w, v, t); + auto p = iobuf_parser(std::move(buf).as_iobuf()); + std::string str; + str.resize(p.bytes_left()); + p.consume_to(p.bytes_left(), str.data()); + validate_utf8(str); + return str; +} + +TEST_P(ValuesJsonTest, CanSerializeValues) { + std::string s; + ASSERT_NO_THROW(s = value_to_json_str(get_value(), get_type())); + ASSERT_EQ(s, get_json_rep()); +} + +TEST_P(ValuesJsonTest, CanDeSerializeValues) { + json::Document parsed; + parsed.Parse(get_json_rep().data()); + std::optional parsed_value; + ASSERT_NO_THROW( + parsed_value = value_from_json(parsed, get_type(), field_required::yes)); + EXPECT_EQ(parsed_value, get_value()); +} + +namespace { +static const std::vector serialization_errs{ + values_json_test_case{ + "type mismatch for value", + boolean_value{false}, + struct_type{}, + }, + values_json_test_case{ + // TODO(oren): too strict maybe? + "Wrong number of fields for struct", + std::make_unique(), + []() -> field_type { + struct_type s{}; + s.fields.emplace_back( + nested_field::create(0, "foo", field_required::no, int_type{})); + return s; + }(), + }, + values_json_test_case{ + "Found null nested field", + std::make_unique([]() -> struct_value { + struct_value v; + v.fields.emplace_back(int_value{1}); + return v; + }()), + []() -> field_type { + struct_type s; + s.fields.emplace_back(nullptr); + return s; + }(), + }, + values_json_test_case{ + "Found null value for required field foo", + std::make_unique([]() -> struct_value { + struct_value v; + v.fields.emplace_back(std::nullopt); + return v; + }()), + []() -> field_type { + struct_type s; + s.fields.emplace_back( + nested_field::create(0, "foo", field_required::yes, int_type{})); + return s; + }(), + }, + values_json_test_case{ + "Malformed list_type", + std::make_unique(), + []() -> field_type { + auto l = list_type::create(0, field_required::yes, int_type{}); + l.element_field = nullptr; + return l; + }(), + }, + values_json_test_case{ + "Found null value for required list element", + std::make_unique([]() -> list_value { + list_value l{}; + l.elements.emplace_back(std::nullopt); + return l; + }()), + list_type::create(0, field_required::yes, int_type{}), + }, + values_json_test_case{ + "Malformed map_type", + std::make_unique(), + []() -> field_type { + auto m = map_type::create( + 0, int_type{}, 1, field_required::yes, int_type{}); + m.key_field = nullptr; + return m; + }(), + }, + values_json_test_case{ + "Found null value for required map value for key 'int(2)'", + std::make_unique([]() -> map_value { + map_value m; + m.kvs.emplace_back(int_value{2}, std::nullopt); + return m; + }()), + []() -> field_type { + auto m = map_type::create( + 0, int_type{}, 1, field_required::yes, int_type{}); + return m; + }(), + }, + values_json_test_case{ + "Expected fixed_value of type", + fixed_value{bytes_to_iobuf(bytes{1, 2, 3, 4, 5, 6, 7, 8, 9})}, + fixed_type{2}, + }, +}; + +} // namespace + +struct ValuesJsonSerializationErrorsTest : public ValuesJsonTestBase {}; + +INSTANTIATE_TEST_SUITE_P( + ValuesJsonSerde, + ValuesJsonSerializationErrorsTest, + ::testing::ValuesIn(serialization_errs)); + +TEST_P(ValuesJsonSerializationErrorsTest, SerializationFailsGracefully) { + ASSERT_THROW( + { + try { + value_to_json_str(get_value(), get_type()); + } catch (const std::invalid_argument& e) { + EXPECT_THAT(e.what(), testing::HasSubstr(get_description())); + throw; + } + }, + std::invalid_argument); +} + +namespace { +static const std::vector parse_errs{ + values_json_test_case{ + "Unexpected JSON null", + int_type{}, + "null", + }, + + /* LIST TESTS */ + values_json_test_case{ + "Expected a JSON array for list_value", + list_type::create(0, field_required::no, int_type{}), + "{}", + }, + values_json_test_case{ + "Unexpected JSON null for required int", + list_type::create(0, field_required::yes, int_type{}), + "[null]", + }, + + /* STRUCT TESTS */ + values_json_test_case{ + "Expected a JSON object for struct_value", + struct_type{}, + "1", + }, + values_json_test_case{ + "Expected JSON object with 0 members, got 1", + struct_type{}, + R"({"1":2})", + }, + values_json_test_case{ + "Null field in struct type", + []() -> struct_type { + struct_type s{}; + s.fields.emplace_back(nullptr); + return s; + }(), + R"({"1":2})", + }, + values_json_test_case{ + "Expected int value, got 5", + []() -> struct_type { + struct_type s{}; + s.fields.emplace_back( + nested_field::create(0, "foo", field_required::yes, int_type{})); + return s; + }(), + R"({"1":"foobar"})", + }, + + /* MAP TESTS */ + values_json_test_case{ + "No member named 'values'", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":[],"not_values":[]})", + }, + values_json_test_case{ + "Expected array for field 'keys'", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":{},"values":[]})", + }, + values_json_test_case{ + "No member named 'keys'", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"values":[]})", + }, + values_json_test_case{ + "Expected complete key-value mapping, got 1 keys and 0 values", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":[1],"values":[]})", + }, + values_json_test_case{ + "Expected int value, got 5", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":["foobar"],"values":[3]})", + }, + values_json_test_case{ + "Expected int value, got 5", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":[1],"values":["string"]})", + }, + values_json_test_case{ + "Unexpected JSON null for required int", + map_type::create(0, int_type{}, 1, field_required::no, int_type{}), + R"({"keys":[null],"values":[2]})", + }, + values_json_test_case{ + "Unexpected JSON null for required int", + map_type::create(0, int_type{}, 1, field_required::yes, int_type{}), + R"({"keys":[1],"values":[null]})", + }, + + /* PRIMITIVE TESTS */ + values_json_test_case{ + "Expected int value, got 2", + int_type{}, + "true", + }, + values_json_test_case{ + "Expected int value, got 6", + int_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "Expected float value, got double", + float_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "Expected float value", + float_type{}, + json_str(std::numeric_limits::max()), + }, + values_json_test_case{ + "Expected JSON string for date_value, got 6", + date_type{}, + "23", + }, + values_json_test_case{ + "Failed to parse date string 'foobar', expected format '%F'", + date_type{}, + R"("foobar")", + }, + values_json_test_case{ + "Expected fractional part for time_value, got '00:00:00'", + time_type{}, + R"("00:00:00")", + }, + values_json_test_case{ + "Expected 6-digit microsecond resolution, got '111'", + time_type{}, + R"("00:00:00.111")", + }, + values_json_test_case{ + "Failed to parse microseconds: 'foobar'", + time_type{}, + R"("00:00:00.foobar")", + }, + values_json_test_case{ + "Failed to parse date string '00::00', expected format '%T'", + time_type{}, + R"("00::00.111111")", + }, + values_json_test_case{ + "Failed to parse date string '00::00', expected format '%T'", + time_type{}, + R"("00::00.111111")", + }, + values_json_test_case{ + "Expected fractional part for timestamp_value, got '1971-01-01T00:00:00'", + timestamp_type{}, + R"("1971-01-01T00:00:00")", + }, + values_json_test_case{ + "Expected 6-digit microsecond resolution, got '023'", + timestamp_type{}, + R"("1971-01-01T00:00:00.023")", + }, + values_json_test_case{ + "Failed to parse microseconds: 'foobar'", + timestamp_type{}, + R"("1971-01-01T00:00:00.foobar")", + }, + values_json_test_case{ + "Failed to parse date string 'junk1971-01-01T00:00:00', expected format " + "'%FT%T'", + timestamp_type{}, + R"("junk1971-01-01T00:00:00.000023")", + }, + values_json_test_case{ + "Expected fractional part for timestamptz_value, got '1971-01-01T00:00:00'", + timestamptz_type{}, + R"("1971-01-01T00:00:00")", + }, + values_json_test_case{ + "Expected 6-digit microsecond resolution, got '023'", + timestamptz_type{}, + R"("1971-01-01T00:00:00.023+00:00")", + }, + values_json_test_case{ + "Failed to parse microseconds: 'foobar'", + timestamptz_type{}, + R"("1971-01-01T00:00:00.foobar+00:00")", + }, + values_json_test_case{ + "Expected offset part for timestamptz_value, got " + "'1971-01-01T00:00:00.000023'", + timestamptz_type{}, + R"("1971-01-01T00:00:00.000023")", + }, + values_json_test_case{ + "Failed to parse date string 'junk1971-01-01T00:00:00', expected format " + "'%FT%T'", + timestamptz_type{}, + R"("junk1971-01-01T00:00:00.000023+00:00")", + }, + values_json_test_case{ + "Expected 8 hex digits for fixed[4]: got 4", + fixed_type{4}, + R"("ffff")", + }, + values_json_test_case{ + "Failed to parse uuid: invalid uuid string", + uuid_type{}, + R"("ffff")", + }, + values_json_test_case{ + "Expected even length hex string, got 9", + binary_type{}, + R"("123456789")", + }, + values_json_test_case{ + "Failed to parse hex byte 'GG' - ec: 'generic:22'", + binary_type{}, + R"("GGGGGGGG")", + }, + values_json_test_case{ + fmt::format( + "Expected at most 5-byte precision for {}, got 6", decimal_type{5, 2}), + decimal_type{5, 2}, + R"("1000.00")", + }, + values_json_test_case{ + "Failed to parse int128", + decimal_type{5, 2}, + R"("FuB.Ar")", + }, +}; + +} // namespace + +struct ValuesJsonParseErrorsTest : public ValuesJsonTestBase {}; + +INSTANTIATE_TEST_SUITE_P( + ValuesJsonSerde, ValuesJsonParseErrorsTest, ::testing::ValuesIn(parse_errs)); + +TEST_P(ValuesJsonParseErrorsTest, ParsingFailsGracefully) { + json::Document parsed; + parsed.Parse(get_json_rep().data()); + ASSERT_THROW( + value_from_json(parsed, get_type(), field_required::yes), + std::invalid_argument); + ASSERT_THROW( + { + try { + value_from_json(parsed, get_type(), field_required::yes); + } catch (const std::invalid_argument& e) { + EXPECT_THAT(e.what(), testing::HasSubstr(get_description())); + throw; + } + }, + std::invalid_argument); +} + +namespace { +struct decimal_parsing_test_case { + std::string description; + std::string json; + decimal_value value; + decimal_type type; + std::optional re_json = std::nullopt; +}; + +std::ostream& +operator<<(std::ostream& os, const decimal_parsing_test_case& tc) { + return os << tc.description; +} + +struct DecimalRoundTripTest + : ::testing::Test + , testing::WithParamInterface {}; + +static const std::vector decimal_cases{ + decimal_parsing_test_case{ + "simple", + R"("1234.5678")", + decimal_value{12345678}, + decimal_type{8, 4}, + }, + decimal_parsing_test_case{ + "simple (negative)", + R"("-1234.5678")", + decimal_value{-12345678}, + decimal_type{8, 4}, + }, + decimal_parsing_test_case{ + "no integral part", + R"("0.12345678")", + decimal_value{12345678}, + decimal_type{8, 8}, + R"(".12345678")", + }, + decimal_parsing_test_case{ + "no integral part (negative)", + R"("-0.12345678")", + decimal_value{-12345678}, + decimal_type{8, 8}, + R"("-.12345678")", + }, + decimal_parsing_test_case{ + "trailing decimal point", + R"("12345678.")", + decimal_value{12345678}, + decimal_type{8, 0}, + }, + decimal_parsing_test_case{ + "trailing decimal point (negative)", + R"("-12345678.")", + decimal_value{-12345678}, + decimal_type{8, 0}, + }, + decimal_parsing_test_case{ + "no trailing decimal point", + R"("12345678")", + decimal_value{12345678}, + decimal_type{8, 0}, + }, + decimal_parsing_test_case{ + "no trailing decimal point (negative)", + R"("-12345678")", + decimal_value{-12345678}, + decimal_type{8, 0}, + }, + decimal_parsing_test_case{ + "truncates trailing digits", // TODO(oren): rounding? + R"("1234.5678888")", + decimal_value(12345678), + decimal_type{8, 4}, + }, + decimal_parsing_test_case{ + "truncates trailing digits (negative)", // TODO(oren): rounding + R"("-1234.5678888")", + decimal_value(-12345678), + decimal_type{8, 4}, + }, + decimal_parsing_test_case{ + "parses left padding but DOES NOT serialize it", + R"("000000001234.5678")", + decimal_value(12345678), + decimal_type{16, 4}, + R"("1234.5678")", + }, + decimal_parsing_test_case{ + "parses left padding but DOES NOT serialize it (negative)", + R"("-000000001234.5678")", + decimal_value(-12345678), + decimal_type{16, 4}, + R"("-1234.5678")", + }, + decimal_parsing_test_case{ + "behaves sanely if we undershoot precision", + R"("1234.5678")", + decimal_value(12345678), + decimal_type{16, 4}, + R"("1234.5678")", + }, + decimal_parsing_test_case{ + "behaves sanely if we undershoot precision (negative)", + R"("-1234.5678")", + decimal_value(-12345678), + decimal_type{16, 4}, + R"("-1234.5678")", + }, + decimal_parsing_test_case{ + "parses right padding and DOES serialize it", + R"("1234.56780000")", + decimal_value(123456780000), + decimal_type{16, 8}, + R"("1234.56780000")", + }, + decimal_parsing_test_case{ + "parses right padding and DOES serialize it (negative)", + R"("-1234.56780000")", + decimal_value(-123456780000), + decimal_type{16, 8}, + R"("-1234.56780000")", + }, + decimal_parsing_test_case{ + "scale exceeds value width", + R"(".000012345678")", + decimal_value{12345678}, + decimal_type{16, 12}, + R"(".000012345678")", + }, + decimal_parsing_test_case{ + "scale exceeds value width (negative)", + R"("-.000012345678")", + decimal_value{-12345678}, + decimal_type{16, 12}, + R"("-.000012345678")", + }, +}; + +INSTANTIATE_TEST_SUITE_P( + ValuesJsonSerde, DecimalRoundTripTest, ::testing::ValuesIn(decimal_cases)); + +TEST_P(DecimalRoundTripTest, JsonToValToJsonToVal) { + const auto& [desc, json, expected_value, type, re_json] = GetParam(); + + // parse the provided JSON string, check success, extract the decimal_value, + // compare to expected + json::Document parsed; + parsed.Parse(json); + std::optional parsed_value; + ASSERT_NO_THROW( + parsed_value = value_from_json(parsed, type, field_required::yes)); + ASSERT_TRUE(parsed_value.has_value()); + ASSERT_TRUE(std::holds_alternative(parsed_value.value())); + ASSERT_TRUE(std::holds_alternative( + std::get(parsed_value.value()))); + + auto parsed_dec = std::get( + std::get(parsed_value.value())); + ASSERT_EQ(parsed_dec.val, expected_value.val); + + // then re-serialize to JSON and parse again. check success and that the + // resulting value is equal to the original parse + + std::string s; + ASSERT_NO_THROW(s = value_to_json_str(parsed_value.value(), type)); + if (re_json.has_value()) { + ASSERT_EQ(re_json, s); + } + + // note that we don't care about string equality to the original serialized + // form because we want to be flexible with parsing rules + + json::Document parsed_2; + parsed_2.Parse(json); + std::optional parsed_value_2; + ASSERT_NO_THROW( + parsed_value_2 = value_from_json(parsed_2, type, field_required::yes)); + ASSERT_TRUE(parsed_value_2.has_value()); + ASSERT_TRUE( + std::holds_alternative(parsed_value_2.value())); + ASSERT_TRUE(std::holds_alternative( + std::get(parsed_value_2.value()))); + + auto parsed_dec_2 = std::get( + std::get(parsed_value_2.value())); + ASSERT_EQ(parsed_dec_2.val, expected_value.val); + ASSERT_EQ(parsed_dec_2.val, parsed_dec.val); +} + +} // namespace diff --git a/src/v/iceberg/values_json.cc b/src/v/iceberg/values_json.cc new file mode 100644 index 0000000000000..3faea3575f3e8 --- /dev/null +++ b/src/v/iceberg/values_json.cc @@ -0,0 +1,676 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#include "iceberg/values_json.h" + +#include "iceberg/json_utils.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +namespace { + +using namespace iceberg; + +std::string_view +parse_string_view(const json::Value& data, std::string_view context = "") { + if (!data.IsString()) { + throw std::invalid_argument(fmt::format( + "Expected JSON string for {}, got {}", context, data.GetType())); + } + return std::string_view{data.GetString(), data.GetStringLength()}; +} + +iobuf hex_str_to_iobuf(std::string_view str) { + if (str.size() & 0x01ul) { + throw std::invalid_argument( + fmt::format("Expected even length hex string, got {}", str.size())); + } + bytes b; + b.reserve(str.size() / 2); + while (!str.empty()) { + auto sub = str.substr(0, 2); + uint8_t x{}; + auto [ptr, ec] = std::from_chars( + sub.data(), sub.data() + sub.size(), x, 16); + + if (ec != std::errc{}) { + throw std::invalid_argument(fmt::format( + "Failed to parse hex byte '{}' - ec: '{}'", + sub, + std::make_error_code(ec))); + } + + b.push_back(static_cast(x)); + str = str.substr(2); + } + return bytes_to_iobuf(b); +} + +std::chrono::system_clock::time_point +sv_to_time_point(std::string_view fmt, std::string_view str) { + // non copying input stream + using ibufstream + = boost::iostreams::stream>; + + ibufstream is(str.begin(), str.size()); + std::tm tm{}; + is >> std::get_time(&tm, fmt.data()); + if (is.fail()) { + throw std::invalid_argument(fmt::format( + "Failed to parse date string '{}', expected format '{}'", str, fmt)); + } + return absl::ToChronoTime(absl::FromTM(tm, absl::UTCTimeZone())); +} + +std::chrono::microseconds sv_to_us(std::string_view u_str) { + if (u_str.size() != 6) { + throw std::invalid_argument(fmt::format( + "Expected 6-digit microsecond resolution, got '{}'", u_str)); + } + int us_raw{}; + auto [ptr, ec] = std::from_chars( + u_str.data(), u_str.data() + u_str.size(), us_raw); + + if (ec != std::errc{}) { + throw std::invalid_argument(fmt::format( + "Failed to parse microseconds: '{}' - ec: {}", + u_str, + std::make_error_code(ec))); + } + return std::chrono::microseconds{us_raw}; +} + +struct primitive_value_parsing_visitor { + explicit primitive_value_parsing_visitor(const json::Value& d) + : data_(d) {} + + value operator()(const boolean_type&) { + if (!data_.IsBool()) { + throw std::invalid_argument( + fmt::format("Expected bool value, got {}", data_.GetType())); + } + return boolean_value{data_.GetBool()}; + } + value operator()(const int_type&) { + if (!data_.IsInt()) { + throw std::invalid_argument( + fmt::format("Expected int value, got {}", data_.GetType())); + } + return int_value{data_.GetInt()}; + } + value operator()(const long_type&) { + if (!data_.IsInt64()) { + throw std::invalid_argument( + fmt::format("Expected long value, got {}", data_.GetType())); + } + return long_value{data_.GetInt64()}; + } + value operator()(const float_type&) { + if (!data_.IsDouble()) { + throw std::invalid_argument( + fmt::format("Expected float value, got {}", data_.GetType())); + } + auto v = data_.GetDouble(); + if ( + v > std::numeric_limits::max() + || v < std::numeric_limits::min()) { + throw std::invalid_argument( + fmt::format("Expected float value, got double: {}", v)); + } + return float_value{static_cast(data_.GetDouble())}; + } + value operator()(const double_type&) { + if (!data_.IsDouble()) { + throw std::invalid_argument( + fmt::format("Expected double value, got {}", data_.GetType())); + } + return double_value{data_.GetDouble()}; + } + value operator()(const date_type&) { + using namespace std::chrono_literals; + auto str = parse_string_view(data_, "date_value"); + auto tp = sv_to_time_point("%F", str); + return date_value{ + static_cast(tp.time_since_epoch() / std::chrono::days{1})}; + } + value operator()(const time_type&) { + using namespace std::chrono_literals; + auto str = parse_string_view(data_, "time_value"); + std::vector split = absl::StrSplit(str, '.'); + if (split.size() != 2) { + throw std::invalid_argument(fmt::format( + "Expected fractional part for time_value, got '{}'", str)); + } + auto t_str = split[0]; + auto u_str = split[1]; + auto us_sub_sec = sv_to_us(u_str); + auto since_midnight = sv_to_time_point("%T", t_str) + - sv_to_time_point("%T", "00:00:00"); + return time_value{(since_midnight + us_sub_sec) / 1us}; + } + value operator()(const timestamp_type&) { + using namespace std::chrono_literals; + auto str = parse_string_view(data_, "timestamp_value"); + std::vector split = absl::StrSplit(str, '.'); + if (split.size() != 2) { + throw std::invalid_argument(fmt::format( + "Expected fractional part for timestamp_value, got '{}'", str)); + } + auto t_str = split[0]; + auto u_str = split[1]; + auto us_sub_sec = sv_to_us(u_str); + auto since_epoch = sv_to_time_point("%FT%T", t_str).time_since_epoch(); + return timestamp_value{(since_epoch + us_sub_sec) / 1us}; + } + value operator()(const timestamptz_type&) { + using namespace std::chrono_literals; + auto str = parse_string_view(data_, "timestamptz_value"); + std::vector split = absl::StrSplit(str, '.'); + if (split.size() != 2) { + throw std::invalid_argument(fmt::format( + "Expected fractional part for timestamptz_value, got '{}'", str)); + } + auto t_str = split[0]; + auto rest = split[1]; + split = absl::StrSplit(rest, '+'); + if (split.size() != 2) { + throw std::invalid_argument(fmt::format( + "Expected offset part for timestamptz_value, got '{}'", str)); + } + auto u_str = split[0]; + auto us_sub_sec = sv_to_us(u_str); + auto since_epoch = sv_to_time_point("%FT%T", t_str).time_since_epoch(); + return timestamptz_value{(since_epoch + us_sub_sec) / 1us}; + } + value operator()(const string_type&) { + auto str = parse_string_view(data_, "string_value"); + return string_value{iobuf::from(str)}; + } + value operator()(const fixed_type& t) { + auto str = parse_string_view(data_, "fixed_value"); + if (t.length * 2 != str.size()) { + throw std::invalid_argument(fmt::format( + "Expected {} hex digits for {}: got {}", + t.length * 2, + t, + str.size())); + } + return fixed_value{hex_str_to_iobuf(str)}; + } + value operator()(const uuid_type&) { + auto str = parse_string_view(data_, "uuid_value"); + try { + return uuid_value{uuid_t::from_string(str)}; + } catch (const boost::wrapexcept& e) { + throw std::invalid_argument( + fmt::format("Failed to parse uuid: {}", e.what())); + } + } + value operator()(const binary_type&) { + auto str = parse_string_view(data_, "binary_value"); + return binary_value{hex_str_to_iobuf(str)}; + } + value operator()(const decimal_type& t) { + // TODO(oren): need to support negative scale? see datatypes.h + auto str = parse_string_view(data_, "decimal_value"); + + std::vector split = absl::StrSplit(str, '.'); + if (split.size() > 2) { + throw std::invalid_argument( + fmt::format("Too many decimal points for {}: '{}'", t, str)); + } + + auto int_part = split[0]; + bool is_neg = !int_part.empty() && int_part.front() == '-'; + if (is_neg) { + int_part = int_part.substr(1); + } + + auto nz_it = std::ranges::find_if_not( + int_part, [](char c) { return c == '0'; }); + int_part = std::string_view{nz_it, int_part.end()}; + + auto frac_part = split.size() == 2 ? split[1] : std::string_view{}; + + if (auto json_scale = frac_part.size(); json_scale > t.scale) { + // TODO(oren): should we round up rather than truncate? + frac_part = frac_part.substr(0, t.scale); + } + + auto json_precision = int_part.size() + frac_part.size(); + + if (json_precision > t.precision) { + throw std::invalid_argument(fmt::format( + "Expected at most {}-byte precision for {}, got {}", + t.precision, + t, + json_precision)); + } + + absl::int128 integral{0}; + if (!int_part.empty() && !absl::SimpleAtoi(int_part, &integral)) { + throw std::invalid_argument( + fmt::format("Failed to parse int128 from '{}'", int_part)); + } + + absl::int128 frac{0}; + if (!frac_part.empty() && !absl::SimpleAtoi(frac_part, &frac)) { + throw std::invalid_argument( + fmt::format("Failed to parse int128 from '{}'", frac_part)); + } + + std::ranges::for_each( + boost::irange(0u, t.scale), + [&integral](const auto) { integral *= 10; }); + + return decimal_value{(integral + frac) * (is_neg ? -1 : 1)}; + } + +private: + const json::Value& data_; +}; + +struct value_parsing_visitor { + explicit value_parsing_visitor(const json::Value& d) + : data_(d) {} + + value operator()(const primitive_type& t) { + return std::visit(primitive_value_parsing_visitor{data_}, t); + } + value operator()(const list_type& t) { + if (!data_.IsArray()) { + throw std::invalid_argument("Expected a JSON array for list_value"); + } + auto v = std::make_unique(); + const auto& arr = data_.GetArray(); + std::ranges::transform( + arr, std::back_inserter(v->elements), [&t](const auto& elt) { + return value_from_json( + elt, t.element_field->type, t.element_field->required); + }); + return v; + } + value operator()(const struct_type& t) { + if (!data_.IsObject()) { + throw std::invalid_argument( + "Expected a JSON object for struct_value"); + } + auto obj = data_.GetObject(); + if (t.fields.size() != obj.MemberCount()) { + throw std::invalid_argument(fmt::format( + "Expected JSON object with {} members, got {}", + t.fields.size(), + obj.MemberCount())); + } + + auto v = std::make_unique(); + v->fields.reserve(t.fields.size()); + std::ranges::transform( + boost::irange(0ul, t.fields.size()), + std::back_inserter(v->fields), + [&t, &obj](const auto i) { + const auto& t_field = t.fields[i]; + const auto& o_member = *(obj.MemberBegin() + i); + if (t_field == nullptr) { + throw std::invalid_argument( + fmt::format("Null field in struct type {}", t)); + } + return value_from_json( + o_member.value, t_field->type, t_field->required); + }); + + return v; + } + value operator()(const map_type& t) { + const auto& keys_arr = parse_required_array(data_, "keys"); + const auto& values_arr = parse_required_array(data_, "values"); + if (keys_arr.Size() != values_arr.Size()) { + throw std::invalid_argument(fmt::format( + "Expected complete key-value mapping, got {} keys and {} " + "values", + keys_arr.Size(), + values_arr.Size())); + } + + auto v = std::make_unique(); + v->kvs.reserve(keys_arr.Size()); + std::ranges::transform( + boost::irange(0u, keys_arr.Size()), + std::back_inserter(v->kvs), + [&t, &keys_arr, &values_arr](const auto i) -> kv_value { + return { + value_from_json( + keys_arr[i], t.key_field->type, field_required::yes) + .value(), + value_from_json( + values_arr[i], t.value_field->type, t.value_field->required)}; + }); + + return v; + } + +private: + const json::Value& data_; +}; + +void value_to_json( + iceberg::json_writer&, + const iceberg::primitive_value&, + const iceberg::primitive_type&); +void value_to_json( + iceberg::json_writer&, + const iceberg::struct_value&, + const iceberg::struct_type&); +void value_to_json( + iceberg::json_writer&, const iceberg::list_value&, const iceberg::list_type&); +void value_to_json( + iceberg::json_writer&, const iceberg::map_value&, const iceberg::map_type&); + +struct rjson_visitor { + explicit rjson_visitor(iceberg::json_writer& w) + : w(w) {} + void + operator()(const iceberg::boolean_value& v, const iceberg::boolean_type&) { + w.Bool(v.val); + } + void operator()(const iceberg::int_value& v, const iceberg::int_type&) { + w.Int(v.val); + } + void operator()(const iceberg::long_value& v, const iceberg::long_type&) { + w.Int64(v.val); + } + void operator()(const iceberg::float_value& v, const iceberg::float_type&) { + w.Double(v.val); + } + void + operator()(const iceberg::double_value& v, const iceberg::double_type&) { + w.Double(v.val); + } + + void operator()(const iceberg::date_value& v, const iceberg::date_type&) { + const std::chrono::system_clock::time_point tp{ + std::chrono::days(v.val)}; + const std::chrono::year_month_day ymd{ + std::chrono::floor(tp)}; + w.String(fmt::to_string(ymd)); + } + + void operator()(const iceberg::time_value& v, const iceberg::time_type&) { + using namespace std::chrono_literals; + const std::chrono::microseconds us{v.val}; + const auto s = std::chrono::floor(us); + const auto rest = (us - s) / 1us; + + const std::chrono::system_clock::time_point tp{s}; + const auto tt = std::chrono::system_clock::to_time_t(tp); + const auto tm = *std::gmtime(&tt); + w.String(fmt::format("{}.{:06}", std::put_time(&tm, "%T"), rest)); + } + + void operator()( + const iceberg::timestamp_value& v, const iceberg::timestamp_type&) { + using namespace std::chrono_literals; + const std::chrono::microseconds us{v.val}; + const auto s = std::chrono::floor(us); + const auto rest = (us - s) / 1us; + + const std::chrono::system_clock::time_point tp{s}; + const auto tt = std::chrono::system_clock::to_time_t(tp); + const auto tm = *std::gmtime(&tt); + w.String(fmt::format("{}.{:06}", std::put_time(&tm, "%FT%T"), rest)); + } + void operator()( + const iceberg::timestamptz_value& v, const iceberg::timestamptz_type&) { + // Stores ISO - 8601 standard timestamp with microsecond precision; + // must include a zone offset and it must be '+00:00'. + // That is, timestamp with time zone is always stored as UTC. + using namespace std::chrono_literals; + const std::chrono::microseconds us{v.val}; + const auto s = std::chrono::floor(us); + const auto rest = (us - s) / 1us; + + const std::chrono::system_clock::time_point tp{s}; + const auto tt = std::chrono::system_clock::to_time_t(tp); + const auto tm = *std::gmtime(&tt); + w.String( + fmt::format("{}.{:06}+00:00", std::put_time(&tm, "%FT%T"), rest)); + } + + void + operator()(const iceberg::string_value& v, const iceberg::string_type&) { + w.String(v.val); + } + void operator()(const iceberg::uuid_value& v, const iceberg::uuid_type&) { + w.String(fmt::to_string(v.val)); + } + void + operator()(const iceberg::fixed_value& v, const iceberg::fixed_type& t) { + if (auto sz = v.val.size_bytes(); sz > t.length) { + throw std::invalid_argument(fmt::format( + "Expected fixed_value of type {} but got {}B", t, sz)); + } + w.String(to_hex(iobuf_to_bytes(v.val))); + } + void + operator()(const iceberg::binary_value& v, const iceberg::binary_type&) { + w.String(to_hex(iobuf_to_bytes(v.val))); + } + void operator()( + const iceberg::decimal_value& v, const iceberg::decimal_type& t) { + // TODO(oren): need negative scale? see datatypes.h + auto [p, s] = t; + // NOTE: Interestingly, max precision of 38 decimal digits is not enough + // to support int128::max, but we can truncate if needed + vassert(p <= 38 && s <= p, "Malformed decimal_type {}", t); + + absl::int128 val = v.val; + bool is_neg = val < 0; + if (is_neg) { + val *= -1; + } + + // left pad w/ zeros, taking into account sign of int representation. + // this makes precision arithmetic a bit easier. we will drop these + // later + auto raw = fmt::format("{:0>{}}", val, 38); + + // left truncate based on decimal_type::precision + auto truncated = raw | std::views::reverse | std::views::take(p) + | std::views::reverse; + + auto integral_part = truncated | std::views::take(p - s) + | std::views::drop_while( + [](const char c) { return c == '0'; }); + auto fractional_part = truncated | std::views::drop(p - s); + w.String(fmt::format( + "{}{}.{}", + is_neg ? "-" : "", + std::string_view{&integral_part.front(), integral_part.size()}, + std::string_view{&fractional_part.front(), fractional_part.size()})); + } + + void operator()( + const iceberg::primitive_value& v, const iceberg::primitive_type& t) { + value_to_json(w, v, t); + } + void + operator()(const iceberg::struct_value& v, const iceberg::struct_type& t) { + value_to_json(w, v, t); + } + void operator()(const iceberg::list_value& v, const iceberg::list_type& t) { + value_to_json(w, v, t); + } + void operator()(const iceberg::map_value& v, const iceberg::map_type& t) { + value_to_json(w, v, t); + } + + void operator()( + const std::unique_ptr& pv, + const iceberg::struct_type& t) { + if (pv == nullptr) { + w.Null(); + } else { + value_to_json(w, *pv, t); + } + } + + void operator()( + const std::unique_ptr& pv, + const iceberg::list_type& t) { + if (pv == nullptr) { + w.Null(); + } else { + value_to_json(w, *pv, t); + } + } + + void operator()( + const std::unique_ptr& pv, + const iceberg::map_type& t) { + if (pv == nullptr) { + w.Null(); + } else { + value_to_json(w, *pv, t); + } + } + + template + void operator()(const V& v, const T& t) { + throw std::invalid_argument(fmt::format( + "JSON serde type mismatch for value {}, got type {}", v, t)); + } + +private: + iceberg::json_writer& w; +}; + +void value_to_json( + iceberg::json_writer& w, + const iceberg::primitive_value& v, + const iceberg::primitive_type& t) { + std::visit(rjson_visitor{w}, v, t); +} +void value_to_json( + iceberg::json_writer& w, + const iceberg::struct_value& s, + const iceberg::struct_type& t) { + if (s.fields.size() != t.fields.size()) { + throw std::invalid_argument(fmt::format( + "Wrong number of fields for struct_value {} of type {}", s, t)); + } + w.StartObject(); + + std::ranges::for_each( + boost::irange(0ul, s.fields.size()), [&w, &s, &t](const auto i) { + const auto& v_field = s.fields[i]; + const auto& t_field = t.fields[i]; + if (t_field == nullptr) { + throw std::invalid_argument(fmt::format( + "Found null nested field in struct type for value {}", s)); + } + w.Key(fmt::to_string(t_field->id)); + if (v_field.has_value()) { + value_to_json(w, v_field.value(), t_field->type); + } else if (!t_field->required) { + w.Null(); + } else { + throw std::invalid_argument(fmt::format( + "Found null value for required field {}", t_field->name)); + } + }); + + w.EndObject(); +} +void value_to_json( + iceberg::json_writer& w, + const iceberg::list_value& l, + const iceberg::list_type& t) { + if (t.element_field == nullptr) { + throw std::invalid_argument(fmt::format("Malformed list_type: {}", t)); + } + w.StartArray(); + for (const auto& e : l.elements) { + if (e.has_value()) { + value_to_json(w, e.value(), t.element_field->type); + } else if (!t.element_field->required) { + w.Null(); + } else { + throw std::invalid_argument( + "Found null value for required list element"); + } + } + w.EndArray(); +} +void value_to_json( + iceberg::json_writer& w, + const iceberg::map_value& m, + const iceberg::map_type& t) { + if (t.key_field == nullptr || t.value_field == nullptr) { + throw std::invalid_argument(fmt::format("Malformed map_type: {}", t)); + } + + w.StartObject(); + w.Key("keys"); + w.StartArray(); + for (const auto& kv : m.kvs) { + value_to_json(w, kv.key, t.key_field->type); + } + w.EndArray(); + + w.Key("values"); + w.StartArray(); + for (const auto& kv : m.kvs) { + if (kv.val.has_value()) { + value_to_json(w, kv.val.value(), t.value_field->type); + } else if (!t.value_field->required) { + w.Null(); + } else { + throw std::invalid_argument(fmt::format( + "Found null value for required map value for key '{}'", kv.key)); + } + } + w.EndArray(); + + w.EndObject(); +} +} // namespace + +namespace iceberg { + +std::optional value_from_json( + const json::Value& data, const field_type& type, field_required required) { + if (data.IsNull()) { + if (required) { + throw std::invalid_argument( + fmt::format("Unexpected JSON null for required {}", type)); + } + return std::nullopt; + } + return std::visit(value_parsing_visitor{data}, type); +} + +void value_to_json( + iceberg::json_writer& w, + const iceberg::value& v, + const iceberg::field_type& t) { + std::visit(rjson_visitor{w}, v, t); +} + +} // namespace iceberg diff --git a/src/v/iceberg/values_json.h b/src/v/iceberg/values_json.h new file mode 100644 index 0000000000000..bce5215b169e1 --- /dev/null +++ b/src/v/iceberg/values_json.h @@ -0,0 +1,55 @@ +// Copyright 2025 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 +#pragma once + +#include "iceberg/datatypes.h" +#include "iceberg/json_writer.h" +#include "iceberg/values.h" +#include "json/document.h" + +/** + * values_json.h - Implements JSON single-value serialization for Iceberg values + * As specified in: + * https://iceberg.apache.org/spec/#json-single-value-serialization + * + * This is needed processing default values for nested field in Iceberg schemas, + * to/from their representation in table metadata. + * + */ + +namespace iceberg { + +/** + * value_from_json - Take a parsed json::Value (rapidjson) and produce an + * Iceberg value subject to the provided field_type. + * + * @param json::Value - The source JSON + * @param field_type - The expected iceberg type + * @param field_required - If the JSON is null, either return nullopt + * or raise an exception, depending on requiredness + * @return The resulting iceberg::value. Throw invalid_argument on semantic + * errors during conversion (e.g. type mismatch) + */ +std::optional +value_from_json(const json::Value&, const field_type&, field_required); + +/** + * value_to_json - Write the provided iceberg::value to JSON subject to the + * provided field_type. + * + * @param json_writer - JSON writer sink + * @param value - The value to serialize + * @param field_type - The ostensible type of the value + * + * Throws invalid_argument on semantic errors during serialization (e.g. type + * mismatch) + */ +void value_to_json( + iceberg::json_writer&, const iceberg::value&, const iceberg::field_type&); +} // namespace iceberg