From 303435aa794d8df1728f83ca5179e896b17ca4ff Mon Sep 17 00:00:00 2001 From: Mathew Fournier <160646114+tabmatfournier@users.noreply.github.com> Date: Thu, 4 Apr 2024 12:05:51 -0600 Subject: [PATCH] SMT for json parsing (#214) * smt-nested-json-as-map - parse json objects into Maps rather than Structs prior to handing to the iceberg connector, for users with unstructured json data. --- kafka-connect-transforms/README.md | 74 ++++ .../transforms/JsonToMapException.java | 29 ++ .../transforms/JsonToMapTransform.java | 155 ++++++++ .../connect/transforms/JsonToMapUtils.java | 307 ++++++++++++++++ .../iceberg/connect/transforms/FileLoads.java | 33 ++ .../transforms/JsonToMapTransformTest.java | 168 +++++++++ .../transforms/JsonToMapUtilsTest.java | 337 ++++++++++++++++++ .../src/test/resources/jsonmap.json | 27 ++ 8 files changed, 1130 insertions(+) create mode 100644 kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapException.java create mode 100644 kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapTransform.java create mode 100644 kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapUtils.java create mode 100644 kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/FileLoads.java create mode 100644 kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapTransformTest.java create mode 100644 kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapUtilsTest.java create mode 100644 kafka-connect-transforms/src/test/resources/jsonmap.json diff --git a/kafka-connect-transforms/README.md b/kafka-connect-transforms/README.md index e8d22e45..cc37211b 100644 --- a/kafka-connect-transforms/README.md +++ b/kafka-connect-transforms/README.md @@ -47,3 +47,77 @@ It will promote the `before` or `after` element fields to top level and add the | Property | Description | |---------------------|-----------------------------------------------------------------------------------| | cdc.target.pattern | Pattern to use for setting the CDC target field value, default is `{db}.{table}` | + +# JsonToMapTransform +_(Experimental)_ + +The `JsonToMapTransform` SMT parses Strings as Json object payloads to infer schemas. The iceberg-kafka-connect +connector for schema-less data (e.g. the Map produced by the Kafka supplied JsonConverter) is to convert Maps into Iceberg +Structs. This is fine when the JSON is well-structured, but when you have JSON objects with dynamically +changing keys, it will lead to an explosion of columns in the Iceberg table due to schema evolutions. + +This SMT is useful in situations where the JSON is not well-structured, in order to get data into Iceberg where +it can be further processed by query engines into a more manageable form. It will convert nested objects to +Maps and include Map type in the Schema. The connector will respect the Schema and create Iceberg tables with Iceberg +Map (String) columns for the JSON objects. + +Note: + +- You must use the `stringConverter` as the `value.converter` setting for your connector, not `jsonConverter` + - It expects JSON objects (`{...}`) in those strings. +- Message keys, tombstones, and headers are not transformed and are passed along as-is by the SMT + +## Configuration + +| Property | Description (default value) | +|----------------------|------------------------------------------| +| json.root | (false) Boolean value to start at root | + +The `transforms.IDENTIFIER_HERE.json.root` is meant for the most inconsistent data. It will construct a Struct with a single field +called `payload` with a Schema of `Map`. + +If `transforms.IDENTIFIER_HERE.json.root` is false (the default), it will construct a Struct with inferred schemas for primitive and +array fields. Nested objects become fields of type `Map`. + +Keys with empty arrays and empty objects are filtered out from the final schema. Arrays will be typed unless the +json arrays have mixed types in which case they are converted to arrays of strings. + +Example json: + +```json +{ + "key": 1, + "array": [1,"two",3], + "empty_obj": {}, + "nested_obj": {"some_key": ["one", "two"]} +} +``` + +Will become the following if `json.root` is true: + +``` +SinkRecord.schema: + "payload" : (Optional) Map + +Sinkrecord.value (Struct): + "payload" : Map( + "key" : "1", + "array" : "[1,"two",3]" + "empty_obj": "{}" + "nested_obj": "{"some_key":["one","two"]}}" + ) +``` + +Will become the following if `json.root` is false + +``` +SinkRecord.schema: + "key": (Optional) Int32, + "array": (Optional) Array, + "nested_object": (Optional) Map + +SinkRecord.value (Struct): + "key" 1, + "array" ["1", "two", "3"] + "nested_object" Map ("some_key" : "["one", "two"]") +``` diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapException.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapException.java new file mode 100644 index 00000000..e009a502 --- /dev/null +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +class JsonToMapException extends RuntimeException { + JsonToMapException(String errorMessage) { + super(errorMessage); + } + + JsonToMapException(String errorMessage, Throwable err) { + super(errorMessage, err); + } +} diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapTransform.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapTransform.java new file mode 100644 index 00000000..d51a69d8 --- /dev/null +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapTransform.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +public class JsonToMapTransform> implements Transformation { + + public static final String JSON_LEVEL = "json.root"; + + private static final ObjectReader mapper = new ObjectMapper().reader(); + + private boolean startAtRoot = false; + + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + JSON_LEVEL, + ConfigDef.Type.BOOLEAN, + false, + ConfigDef.Importance.MEDIUM, + "Boolean value to start at root. False is one level in from the root"); + + private static final String ALL_JSON_SCHEMA_FIELD = "payload"; + private static final Schema JSON_MAP_SCHEMA = + SchemaBuilder.struct() + .field( + ALL_JSON_SCHEMA_FIELD, + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()) + .build(); + + @Override + public R apply(R record) { + if (record.value() == null) { + return record; + } else { + return process(record); + } + } + + private R process(R record) { + if (!(record.value() instanceof String)) { + throw new JsonToMapException("record value is not a string, use StringConverter"); + } + + String json = (String) record.value(); + JsonNode obj; + + try { + obj = mapper.readTree(json); + } catch (Exception e) { + throw new JsonToMapException( + String.format( + "record.value is not valid json for record.value: %s", collectRecordDetails(record)), + e); + } + + if (!(obj instanceof ObjectNode)) { + throw new JsonToMapException( + String.format( + "Expected json object for record.value after parsing: %s", + collectRecordDetails(record))); + } + + if (startAtRoot) { + return singleField(record, (ObjectNode) obj); + } + return structRecord(record, (ObjectNode) obj); + } + + private R singleField(R record, ObjectNode obj) { + Struct struct = + new Struct(JSON_MAP_SCHEMA) + .put(ALL_JSON_SCHEMA_FIELD, JsonToMapUtils.populateMap(obj, Maps.newHashMap())); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + JSON_MAP_SCHEMA, + struct, + record.timestamp(), + record.headers()); + } + + private R structRecord(R record, ObjectNode contents) { + SchemaBuilder builder = SchemaBuilder.struct(); + contents.fields().forEachRemaining(entry -> JsonToMapUtils.addField(entry, builder)); + Schema schema = builder.build(); + Struct value = JsonToMapUtils.addToStruct(contents, schema, new Struct(schema)); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + record.key(), + schema, + value, + record.timestamp(), + record.headers()); + } + + private String collectRecordDetails(R record) { + if (record instanceof SinkRecord) { + SinkRecord sinkRecord = (SinkRecord) record; + return String.format( + "topic %s partition %s offset %s", + sinkRecord.topic(), sinkRecord.kafkaPartition(), sinkRecord.kafkaOffset()); + } else { + return String.format("topic %s partition %S", record.topic(), record.kafkaPartition()); + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() {} + + @Override + public void configure(Map configs) { + SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + startAtRoot = config.getBoolean(JSON_LEVEL); + } +} diff --git a/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapUtils.java b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapUtils.java new file mode 100644 index 00000000..859c2621 --- /dev/null +++ b/kafka-connect-transforms/src/main/java/io/tabular/iceberg/connect/transforms/JsonToMapUtils.java @@ -0,0 +1,307 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BigIntegerNode; +import com.fasterxml.jackson.databind.node.BinaryNode; +import com.fasterxml.jackson.databind.node.BooleanNode; +import com.fasterxml.jackson.databind.node.DecimalNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.FloatNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.LongNode; +import com.fasterxml.jackson.databind.node.MissingNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import java.math.BigDecimal; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; + +class JsonToMapUtils { + + private static final String DECIMAL_LOGICAL_NAME = "org.apache.kafka.connect.data.Decimal"; + private static final String SCALE_FIELD = "scale"; + + public static final Schema ARRAY_OPTIONAL_STRING = + SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build(); + public static final Schema ARRAY_MAP_OPTIONAL_STRING = + SchemaBuilder.array( + SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA) + .optional() + .build()) + .optional() + .build(); + + public static final Schema ARRAY_ARRAY_OPTIONAL_STRING = + SchemaBuilder.array(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) + .optional() + .build(); + + private static SchemaBuilder decimalBuilder(int scale) { + return SchemaBuilder.bytes() + .name(DECIMAL_LOGICAL_NAME) + .parameter(SCALE_FIELD, Integer.toString(scale)) + .version(1); + } + + public static Schema decimalSchema(int scale) { + return decimalBuilder(scale).optional().build(); + } + + private JsonToMapUtils() { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + private static final Map, Schema> JSON_NODE_TO_SCHEMA = + getJsonNodeToSchema(); + + private static Map, Schema> getJsonNodeToSchema() { + final Map, Schema> map = Maps.newHashMap(); + map.put(BinaryNode.class, Schema.OPTIONAL_BYTES_SCHEMA); + map.put(BooleanNode.class, Schema.OPTIONAL_BOOLEAN_SCHEMA); + map.put(TextNode.class, Schema.OPTIONAL_STRING_SCHEMA); + map.put(IntNode.class, Schema.OPTIONAL_INT32_SCHEMA); + map.put(LongNode.class, Schema.OPTIONAL_INT64_SCHEMA); + map.put(FloatNode.class, Schema.OPTIONAL_FLOAT32_SCHEMA); + map.put(DoubleNode.class, Schema.OPTIONAL_FLOAT64_SCHEMA); + map.put(ArrayNode.class, Schema.OPTIONAL_STRING_SCHEMA); + map.put( + ObjectNode.class, + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()); + map.put(BigIntegerNode.class, decimalSchema(0)); + map.put(DecimalNode.class, Schema.OPTIONAL_STRING_SCHEMA); + return ImmutableMap.copyOf(map); + } + + public static void addField(Map.Entry kv, SchemaBuilder builder) { + String key = kv.getKey(); + Schema schema = schemaFromNode(kv.getValue()); + if (schema != null) { + builder.field(key, schema); + } + } + + public static Schema schemaFromNode(JsonNode node) { + if (!node.isNull() && !node.isMissingNode()) { + if (node.isArray()) { + return arraySchema((ArrayNode) node); + } else if (node.isObject()) { + if (node.elements().hasNext()) { + return JSON_NODE_TO_SCHEMA.get(node.getClass()); + } + } else { + return JSON_NODE_TO_SCHEMA.get(node.getClass()); + } + } + return null; + } + + @SuppressWarnings("checkstyle:CyclomaticComplexity") + private static Schema arraySchema(ArrayNode array) { + final Schema result; + + if (array.isEmpty()) { + result = null; + } else { + final Class arrayType = arrayNodeType(array); + + if (arrayType == null) { + // inconsistent types + result = ARRAY_OPTIONAL_STRING; + } else { + if (arrayType == NullNode.class || arrayType == MissingNode.class) { + // if types of the array are inconsistent, convert to a string + result = ARRAY_OPTIONAL_STRING; + } else { + if (arrayType == ObjectNode.class) { + result = ARRAY_MAP_OPTIONAL_STRING; + } else if (arrayType == ArrayNode.class) { + // nested array case + // need to protect against arrays of empty arrays, arrays of empty objects, arrays + // inconsistent types, etc. + + Set nestedSchemas = Sets.newHashSet(); + array + .elements() + .forEachRemaining(node -> nestedSchemas.add(JsonToMapUtils.schemaFromNode(node))); + + if (nestedSchemas.size() > 1) { + // inconsistent types for nested arrays + result = SchemaBuilder.array(ARRAY_OPTIONAL_STRING).optional().build(); + } else { + // nestedSchemas.size() == 1 in this case (we already checked array is not empty) + // i.e. consistent types for nested arrays + Schema nestedArraySchema = nestedSchemas.iterator().next(); + if (nestedArraySchema == null) { + result = null; + } else { + result = SchemaBuilder.array(nestedArraySchema).optional().build(); + } + } + } else { + // we are a consistent primitive + result = SchemaBuilder.array(JSON_NODE_TO_SCHEMA.get(arrayType)).optional().build(); + } + } + } + } + + return result; + } + + /* Kafka Connect arrays must all be the same type */ + public static Class arrayNodeType(ArrayNode array) { + Set> elementTypes = Sets.newHashSet(); + array.elements().forEachRemaining(node -> elementTypes.add(node.getClass())); + + Class result; + if (elementTypes.isEmpty()) { + // empty ArrayNode, cannot determine element type + result = null; + } else if (elementTypes.size() == 1) { + // consistent element type + result = elementTypes.iterator().next(); + } else { + // inconsistent element types + result = null; + } + + return result; + } + + public static Struct addToStruct(ObjectNode node, Schema schema, Struct struct) { + schema + .fields() + .forEach( + field -> { + JsonNode element = node.get(field.name()); + Schema.Type targetType = field.schema().type(); + if (targetType == Schema.Type.ARRAY) { + struct.put( + field.name(), + populateArray( + element, field.schema().valueSchema(), field.name(), Lists.newArrayList())); + } else if (targetType == Schema.Type.MAP) { + struct.put(field.name(), populateMap(element, Maps.newHashMap())); + } else { + struct.put(field.name(), extractValue(element, targetType, field.name())); + } + }); + return struct; + } + + public static Object extractValue(JsonNode node, Schema.Type type, String fieldName) { + Object obj; + switch (type) { + case STRING: + obj = nodeToText(node); + break; + case BOOLEAN: + obj = node.booleanValue(); + break; + case INT32: + obj = node.intValue(); + break; + case INT64: + obj = node.longValue(); + break; + case FLOAT32: + obj = node.floatValue(); + break; + case FLOAT64: + obj = node.doubleValue(); + break; + case MAP: + ObjectNode mapNode = (ObjectNode) node; + Map map = Maps.newHashMap(); + populateMap(mapNode, map); + obj = map; + break; + case BYTES: + obj = extractBytes(node, fieldName); + break; + default: + throw new JsonToMapException( + String.format("Unexpected type %s for field %s", type, fieldName)); + } + return obj; + } + + private static Object extractBytes(JsonNode node, String fieldName) { + Object obj; + try { + if (node.isBigInteger()) { + obj = new BigDecimal(node.bigIntegerValue()); + } else if (node.isBigDecimal()) { + obj = node.decimalValue(); + } else { + obj = node.binaryValue(); + } + } catch (Exception e) { + throw new JsonToMapException( + String.format("parsing binary value threw exception for %s", fieldName), e); + } + return obj; + } + + private static List populateArray( + JsonNode node, Schema schema, String fieldName, List acc) { + if (schema.type() == Schema.Type.ARRAY) { + node.elements() + .forEachRemaining( + arrayNode -> { + List nestedList = Lists.newArrayList(); + acc.add(populateArray(arrayNode, schema.valueSchema(), fieldName, nestedList)); + }); + } else { + node.elements() + .forEachRemaining( + arrayEntry -> acc.add(extractValue(arrayEntry, schema.type(), fieldName))); + } + return acc; + } + + public static Map populateMap(JsonNode node, Map map) { + for (Iterator> it = node.fields(); it.hasNext(); ) { + Map.Entry element = it.next(); + map.put(element.getKey(), nodeToText(element.getValue())); + } + return map; + } + + private static String nodeToText(JsonNode node) { + if (node.isTextual()) { + return node.textValue(); + } else { + return node.toString(); + } + } +} diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/FileLoads.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/FileLoads.java new file mode 100644 index 00000000..1b094add --- /dev/null +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/FileLoads.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import java.io.IOException; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Paths; + +public abstract class FileLoads { + protected static final String getFile(String fileName) throws IOException, URISyntaxException { + URL jsonResource = FileLoads.class.getClassLoader().getResource(fileName); + return new String(Files.readAllBytes(Paths.get(jsonResource.toURI())), StandardCharsets.UTF_8); + } +} diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapTransformTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapTransformTest.java new file mode 100644 index 00000000..39e306ff --- /dev/null +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapTransformTest.java @@ -0,0 +1,168 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +public class JsonToMapTransformTest extends FileLoads { + + private String loadJson() { + try { + return getFile("jsonmap.json"); + } catch (Exception e) { + throw new RuntimeException("failed to load jsonmap.json in test", e); + } + } + + private final String topic = "topic"; + private final int partition = 0; + private final Long offset = 100L; + private final Long timestamp = 1000L; + private final String keyValue = "key_value:"; + private final Schema keySchema = SchemaBuilder.STRING_SCHEMA; + + @Test + @DisplayName("should return null records as-is") + public void nullRecords() { + try (JsonToMapTransform smt = new JsonToMapTransform<>()) { + SinkRecord record = + new SinkRecord( + topic, + partition, + null, + null, + null, + null, + offset, + timestamp, + TimestampType.CREATE_TIME); + SinkRecord result = smt.apply(record); + assertThat(result).isSameAs(record); + } + } + + @Test + @DisplayName("should throw exception if the value is not a json object") + public void shouldThrowExceptionNonJsonObjects() { + try (JsonToMapTransform smt = new JsonToMapTransform<>()) { + SinkRecord record = + new SinkRecord( + topic, + partition, + keySchema, + keyValue, + null, + "not_a_json_object", + offset, + timestamp, + TimestampType.CREATE_TIME); + assertThrows(JsonToMapException.class, () -> smt.apply(record)); + } + } + + @Test + @DisplayName("should throw exception if not valid json") + public void shouldThrowExceptionInvalidJson() { + try (JsonToMapTransform smt = new JsonToMapTransform<>()) { + SinkRecord record = + new SinkRecord( + topic, + partition, + keySchema, + keyValue, + null, + "{\"key\": 1,\"\"\"***", + offset, + timestamp, + TimestampType.CREATE_TIME); + assertThrows(JsonToMapException.class, () -> smt.apply(record)); + } + } + + @Test + @DisplayName( + "should contain a single value of Map if configured to convert root node") + public void singleValueOnRootNode() { + try (JsonToMapTransform smt = new JsonToMapTransform<>()) { + smt.configure(ImmutableMap.of(JsonToMapTransform.JSON_LEVEL, "true")); + SinkRecord record = + new SinkRecord( + topic, + partition, + keySchema, + keyValue, + null, + "{\"key\":1,\"a\":\"a\"}", + offset, + timestamp, + TimestampType.CREATE_TIME); + SinkRecord result = smt.apply(record); + Schema expectedSchema = + SchemaBuilder.struct() + .field( + "payload", + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()) + .build(); + assertInstanceOf(Struct.class, result.value()); + Struct resultStruct = (Struct) result.value(); + + Map expectedValue = Maps.newHashMap(); + expectedValue.put("key", "1"); + expectedValue.put("a", "a"); + assertThat(resultStruct.get("payload")).isEqualTo(expectedValue); + assertThat(result.valueSchema()).isEqualTo(expectedSchema); + } + } + + @Test + @DisplayName("should contain a struct on the value if configured to convert after root node") + public void structOnRootNode() { + try (JsonToMapTransform smt = new JsonToMapTransform<>()) { + SinkRecord record = + new SinkRecord( + topic, + partition, + keySchema, + keyValue, + null, + loadJson(), + offset, + timestamp, + TimestampType.CREATE_TIME); + SinkRecord result = smt.apply(record); + assertInstanceOf(Struct.class, result.value()); + Struct resultStruct = (Struct) result.value(); + assertThat(resultStruct.schema().fields().size()).isEqualTo(19); + assertThat(resultStruct.get("string")).isEqualTo("string"); + } + } +} diff --git a/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapUtilsTest.java b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapUtilsTest.java new file mode 100644 index 00000000..caebfc94 --- /dev/null +++ b/kafka-connect-transforms/src/test/java/io/tabular/iceberg/connect/transforms/JsonToMapUtilsTest.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package io.tabular.iceberg.connect.transforms; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.BigIntegerNode; +import com.fasterxml.jackson.databind.node.DoubleNode; +import com.fasterxml.jackson.databind.node.IntNode; +import com.fasterxml.jackson.databind.node.MissingNode; +import com.fasterxml.jackson.databind.node.NullNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.node.TextNode; +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.AbstractMap; +import java.util.Base64; +import java.util.Map; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Maps; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; + +class JsonToMapUtilsTest extends FileLoads { + private final ObjectMapper mapper = new ObjectMapper(); + + private final ObjectNode objNode = loadJson(mapper); + + private ObjectNode loadJson(ObjectMapper objectMapper) { + try { + return (ObjectNode) objectMapper.readTree(getFile("jsonmap.json")); + } catch (Exception e) { + throw new RuntimeException("failed to load jsonmap.json in test", e); + } + } + + @Test + @DisplayName("addField should add schemas to builder unless schema is null") + public void addToSchema() { + SchemaBuilder builder = SchemaBuilder.struct(); + Map.Entry good = + new AbstractMap.SimpleEntry("good", TextNode.valueOf("text")); + Map.Entry missing = + new AbstractMap.SimpleEntry("missing", NullNode.getInstance()); + JsonToMapUtils.addField(good, builder); + JsonToMapUtils.addField(missing, builder); + Schema result = builder.build(); + assertThat(result.fields()) + .isEqualTo(Lists.newArrayList(new Field("good", 0, Schema.OPTIONAL_STRING_SCHEMA))); + } + + @Test + @DisplayName("extractSimpleValue extracts type from Node based on Schema") + public void primitiveBasedOnSchemaHappyPath() { + JsonNode stringNode = objNode.get("string"); + JsonNode booleanNode = objNode.get("boolean"); + JsonNode intNode = objNode.get("int"); + JsonNode longNode = objNode.get("long"); + JsonNode floatIsDoubleNode = objNode.get("float_is_double"); + JsonNode doubleNode = objNode.get("double"); + JsonNode bytesNode = objNode.get("bytes"); + + assertThat(JsonToMapUtils.extractValue(stringNode, Schema.Type.STRING, "")).isEqualTo("string"); + assertThat(JsonToMapUtils.extractValue(booleanNode, Schema.Type.BOOLEAN, "")).isEqualTo(true); + assertThat(JsonToMapUtils.extractValue(intNode, Schema.Type.INT32, "")).isEqualTo(42); + assertThat(JsonToMapUtils.extractValue(longNode, Schema.Type.INT64, "")).isEqualTo(3147483647L); + assertThat(JsonToMapUtils.extractValue(floatIsDoubleNode, Schema.Type.FLOAT64, "")) + .isEqualTo(3.0); + assertThat(JsonToMapUtils.extractValue(doubleNode, Schema.Type.FLOAT64, "")).isEqualTo(0.3); + byte[] byteResult = (byte[]) JsonToMapUtils.extractValue(bytesNode, Schema.Type.BYTES, ""); + assertArrayEquals(byteResult, Base64.getDecoder().decode("SGVsbG8=")); + } + + @Test + @DisplayName("extractValue converts complex nodes to strings if schema is string") + public void exactStringsFromComplexNodes() { + JsonNode arrayObjects = objNode.get("array_objects"); + assertInstanceOf(ArrayNode.class, arrayObjects); + + JsonNode nestedObjNode = objNode.get("nested_obj"); + assertInstanceOf(ObjectNode.class, nestedObjNode); + + JsonNode arrayDifferentTypes = objNode.get("array_different_types"); + assertInstanceOf(ArrayNode.class, arrayDifferentTypes); + + JsonNode bigInt = objNode.get("bigInt"); + assertInstanceOf(BigIntegerNode.class, bigInt); + + assertThat(JsonToMapUtils.extractValue(arrayObjects, Schema.Type.STRING, "")) + .isEqualTo("[{\"key\":1}]"); + assertThat(JsonToMapUtils.extractValue(arrayDifferentTypes, Schema.Type.STRING, "")) + .isEqualTo("[\"one\",1]"); + assertThat(JsonToMapUtils.extractValue(bigInt, Schema.Type.STRING, "")) + .isEqualTo("354736184430273859332531123456"); + } + + @Test + @DisplayName("extractSimpleValue throws for non-primitive schema types") + public void primitiveBasedOnSchemaThrows() { + assertThrows( + RuntimeException.class, + () -> JsonToMapUtils.extractValue(objNode.get("string"), Schema.Type.STRUCT, "")); + } + + @Test + @DisplayName("arrayNodeType returns a type if all elements of the array are the same type") + public void determineArrayNodeType() { + ArrayNode arrayInt = (ArrayNode) objNode.get("array_int"); + ArrayNode arrayArrayInt = (ArrayNode) objNode.get("array_array_int"); + assertThat(IntNode.class).isEqualTo(JsonToMapUtils.arrayNodeType(arrayInt)); + assertThat(ArrayNode.class).isEqualTo(JsonToMapUtils.arrayNodeType(arrayArrayInt)); + } + + @Test + @DisplayName("arrayNodeType returns null if elements of the array are different types") + public void determineArrayNodeTypeNotSameTypes() { + ArrayNode mixedArray = (ArrayNode) objNode.get("array_different_types"); + assertThat(JsonToMapUtils.arrayNodeType(mixedArray)).isNull(); + } + + @Test + @DisplayName("schemaFromNode returns null for NullNode and MissingNode types") + public void schemaFromNodeNullOnNullNodes() { + JsonNode nullNode = NullNode.getInstance(); + JsonNode missingNode = MissingNode.getInstance(); + assertThat(JsonToMapUtils.schemaFromNode(nullNode)).isNull(); + assertThat(JsonToMapUtils.schemaFromNode(missingNode)).isNull(); + } + + @Test + @DisplayName("schemaFromNode returns null for empty ObjectNodes") + public void schemaFromNodeNullEmptyObjectNodes() { + JsonNode node = objNode.get("empty_obj"); + assertInstanceOf(ObjectNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)).isNull(); + } + + @Test + @DisplayName( + "schemaFromNode returns bytes with logical name decimal with scale 0 for BigInteger nodes") + public void schemaFromNodeStringForBigInteger() { + JsonNode node = objNode.get("bigInt"); + assertInstanceOf(BigIntegerNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)).isEqualTo(JsonToMapUtils.decimalSchema(0)); + } + + @Test + @DisplayName("schemaFromNode returns primitive Schemas for primitive nodes") + public void schemaFromNodePrimitiveSchemasFromPrimitiveNodes() { + JsonNode intNode = objNode.get("int"); + JsonNode doubleNode = objNode.get("double"); + assertInstanceOf(IntNode.class, intNode); + assertInstanceOf(DoubleNode.class, doubleNode); + assertThat(JsonToMapUtils.schemaFromNode(intNode)).isEqualTo(Schema.OPTIONAL_INT32_SCHEMA); + assertThat(JsonToMapUtils.schemaFromNode(doubleNode)).isEqualTo(Schema.OPTIONAL_FLOAT64_SCHEMA); + } + + @Test + @DisplayName("schemaFromNode returns Map for ObjectNodes") + public void schmefromNodeObjectNodesAsMaps() { + JsonNode node = objNode.get("nested_obj"); + assertInstanceOf(ObjectNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)) + .isEqualTo( + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).optional().build()); + } + + @Test + @DisplayName("schemaFromNode returns Array String schema for ArrayNodes with ObjectNode elements") + public void schemaFromNodeArrayStringFromArrayObjects() { + JsonNode arrayObjects = objNode.get("array_objects"); + assertInstanceOf(ArrayNode.class, arrayObjects); + assertThat(JsonToMapUtils.schemaFromNode(arrayObjects)) + .isEqualTo(JsonToMapUtils.ARRAY_MAP_OPTIONAL_STRING); + } + + @Test + @DisplayName("schemaFromNode returns Array String schema for ArrayNodes with inconsistent types") + public void schemaFromNodeArrayStringFromInconsistentArrayNodes() { + JsonNode inconsistent = objNode.get("array_different_types"); + assertInstanceOf(ArrayNode.class, inconsistent); + assertThat(JsonToMapUtils.schemaFromNode(inconsistent)) + .isEqualTo(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()); + } + + @Test + @DisplayName( + "schemaFromNode returns Array[Array[String]] for ArrayNodes of ArrayNodes with inconsistent types") + public void schemaFromNodeArraysArrays() { + JsonNode node = objNode.get("array_array_inconsistent"); + assertInstanceOf(ArrayNode.class, node); + + Schema expected = + SchemaBuilder.array(SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA).optional().build()) + .optional() + .build(); + Schema result = JsonToMapUtils.schemaFromNode(node); + assertThat(result).isEqualTo(expected); + } + + @Test + @DisplayName( + "schemaFromNode returns Array[Array[Map]] for ArrayNodes of ArrayNodes of objects") + public void schemaFromNodeArrayArrayObjects() { + JsonNode node = objNode.get("array_array_objects"); + assertInstanceOf(ArrayNode.class, node); + Schema expected = + SchemaBuilder.array(JsonToMapUtils.ARRAY_MAP_OPTIONAL_STRING).optional().build(); + Schema result = JsonToMapUtils.schemaFromNode(node); + assertThat(result).isEqualTo(expected); + } + + @Test + @DisplayName("schemaFromNode returns Array[Array[Int]] for ArrayNodes of ArrayNodes of IntNode") + public void schemaFromNodeArrayArrayOfArrayArrayInt() { + JsonNode node = objNode.get("array_array_int"); + assertInstanceOf(ArrayNode.class, node); + Schema expected = + SchemaBuilder.array(SchemaBuilder.array(Schema.OPTIONAL_INT32_SCHEMA).optional().build()) + .optional() + .build(); + Schema result = JsonToMapUtils.schemaFromNode(node); + assertThat(result).isEqualTo(expected); + } + + @Test + @DisplayName("schemaFromNode returns null for empty ArrayNodes") + public void schemaFromNodeNullFromEmptyArray() { + JsonNode node = objNode.get("empty_arr"); + assertInstanceOf(ArrayNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)).isNull(); + } + + @Test + @DisplayName("schemaFromNode returns null for empty Array of Array nodes") + public void schemaFromNodeEmptyArrayOfEmptyArrays() { + JsonNode node = objNode.get("empty_arr_arr"); + assertInstanceOf(ArrayNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)).isNull(); + } + + @Test + @DisplayName( + "schemaFromNode returns optional map of optional for array of empty object") + public void schemaFromNodeNullArrayEmptyObject() { + JsonNode node = objNode.get("array_empty_object"); + assertInstanceOf(ArrayNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)) + .isEqualTo(JsonToMapUtils.ARRAY_MAP_OPTIONAL_STRING); + } + + @Test + @DisplayName( + "schemaFromNode returns Array[Map] for array of objects if one object is empty") + public void schemaFromNodeMixedObjectsOneEmpty() { + JsonNode node = objNode.get("nested_object_contains_empty"); + assertInstanceOf(ArrayNode.class, node); + assertThat(JsonToMapUtils.schemaFromNode(node)) + .isEqualTo(JsonToMapUtils.ARRAY_MAP_OPTIONAL_STRING); + } + + @Test + public void addToStruct() { + SchemaBuilder builder = SchemaBuilder.struct(); + objNode.fields().forEachRemaining(entry -> JsonToMapUtils.addField(entry, builder)); + Schema schema = builder.build(); + + Struct result = new Struct(schema); + JsonToMapUtils.addToStruct(objNode, schema, result); + + assertThat(result.get("string")).isEqualTo("string"); + assertThat(result.get("int")).isEqualTo(42); + assertThat(result.get("long")).isEqualTo(3147483647L); + assertThat(result.get("boolean")).isEqualTo(true); + assertThat(result.get("float_is_double")).isEqualTo(3.0); + assertThat(result.get("double")).isEqualTo(0.3); + // we don't actually convert to bytes when parsing the json + // so just check it is a string + assertThat(result.get("bytes")).isEqualTo("SGVsbG8="); + BigDecimal bigIntExpected = new BigDecimal(new BigInteger("354736184430273859332531123456")); + assertThat(result.get("bigInt")).isEqualTo(bigIntExpected); + assertThat(result.get("nested_object_contains_empty")) + .isEqualTo(Lists.newArrayList(Maps.newHashMap(), ImmutableMap.of("one", "1"))); + assertThat(result.get("array_int")).isEqualTo(Lists.newArrayList(1, 1)); + assertThat(result.get("array_array_int")) + .isEqualTo(Lists.newArrayList(Lists.newArrayList(1, 1), Lists.newArrayList(2, 2))); + assertThat(result.get("array_objects")) + .isEqualTo(Lists.newArrayList(ImmutableMap.of("key", "1"))); + assertThat(result.get("array_array_objects")) + .isEqualTo( + Lists.newArrayList( + Lists.newArrayList(ImmutableMap.of("key", "1")), + Lists.newArrayList(ImmutableMap.of("key", "2", "other", "{\"ugly\":[1,2]}")))); + assertThat(result.get("array_array_inconsistent")) + .isEqualTo(Lists.newArrayList(Lists.newArrayList("1"), Lists.newArrayList("2.0"))); + assertThat(result.get("array_different_types")).isEqualTo(Lists.newArrayList("one", "1")); + + Map expectedNestedObject = Maps.newHashMap(); + expectedNestedObject.put("key", "{\"nested_key\":1}"); + assertThat(result.get("nested_obj")).isEqualTo(expectedNestedObject); + + assertThat(result.get("empty_string")).isEqualTo(""); + + // assert empty fields don't show up on the struct + assertThrows(RuntimeException.class, () -> result.get("null")); + assertThrows(RuntimeException.class, () -> result.get("empty_obj")); + assertThrows(RuntimeException.class, () -> result.get("empty_arr")); + assertThrows(RuntimeException.class, () -> result.get("empty_arr_arr")); + } +} diff --git a/kafka-connect-transforms/src/test/resources/jsonmap.json b/kafka-connect-transforms/src/test/resources/jsonmap.json new file mode 100644 index 00000000..c37e3a1a --- /dev/null +++ b/kafka-connect-transforms/src/test/resources/jsonmap.json @@ -0,0 +1,27 @@ +{ + "string": "string", + "int": 42, + "long": 3147483647, + "boolean": true, + "float_is_double": 3.0, + "double": 0.3, + "null": null, + "empty_obj": {}, + "empty_arr": [], + "empty_arr_arr": [[], []], + "empty_string": "", + "array_empty_object": [{}, {}], + "nested_object_contains_empty": [{}, {"one": 1}], + "array_int": [1, 1], + "array_array_int": [[1,1], [2,2]], + "array_objects": [{"key": 1}], + "array_array_objects":[[{"key": 1}], [{"key": 2, "other": {"ugly": [1,2]}}]], + "array_array_inconsistent": [[1], [2.0]], + "array_different_types": ["one", 1], + "nested_obj": { + "key": {"nested_key": 1} + }, + "bytes": "SGVsbG8=", + "bigInt": 354736184430273859332531123456, + "bigDecimal": 13.3333333333333333333333333344444444455555555 +} \ No newline at end of file