Skip to content

Commit

Permalink
Merge pull request #36 from reifyhealth/upstream-sync-2023-05-17
Browse files Browse the repository at this point in the history
sink additions [CVEs, lakeformation support, debezium metadata support, debezium time type coercion]
  • Loading branch information
gliter authored May 25, 2023
2 parents 2c86490 + 7b1a5e7 commit 43e77e3
Show file tree
Hide file tree
Showing 12 changed files with 443 additions and 48 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
# Changelog

- Updates AWS lakeformation transitive dependency providing lakeformation support in s3 iceberg tables.

## [Unreleased]

- Updates dependencies to resolve some jackson-databind critical CVEs.
- Updates AWS lakeformation transitive dependency providing lakeformation support in s3 iceberg tables.
- Added Iceberg coercion support for Avro Array<Struct> types. Supports Debezium `data_collections` metadata.
- Added support for coercion of five Debezium temporal types to their Iceberg equivalents: Date, MicroTimestamp, ZonedTimestamp, MicroTime, and ZonedTime
- Rich temporal types are toggled on by new boolean configuration property: `rich-temporal-types`

## [0.3.1] - 2023-04-06

- Add `iceberg.format-version` config setting to indicate which Iceberg table format version is used.
Expand All @@ -18,7 +26,6 @@

- Added support for `double` primitive type fields.
- Allow coercion of iceberg table identifiers to `snake_case` setting `table.snake-case` boolean configuration.

## [0.2.2] - 2023-02-17

- Allow changing iceberg-table specific settings using `iceberg.table-default.*` connector configuration properties
Expand Down
18 changes: 15 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@

<!-- versions -->
<kafka.connect.version>3.2.2</kafka.connect.version>
<iceberg.version>1.0.0</iceberg.version>
<iceberg.version>1.2.1</iceberg.version>
<debezium.version>1.9.7.Final</debezium.version>
<hadoop.version>3.3.3</hadoop.version>
<awssdk.version>2.17.295</awssdk.version>
<hadoop.version>3.3.5</hadoop.version>
<awssdk.version>2.20.51</awssdk.version>
<hive.version>3.1.3</hive.version>
<!-- spark 3.2.2_2.13 includes jackson 2.12.3 which requires Jackson Databind version >= 2.12.0 and < 2.13.0 -->
<!-- jackson is also provided by kafka connect-runtime -> pinning to the same version -->
Expand Down Expand Up @@ -71,6 +71,12 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>lakeformation</artifactId>
<version>2.20.45</version>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.2_2.13</artifactId>
Expand All @@ -83,6 +89,12 @@
<version>${debezium.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop-bundle</artifactId>
<version>1.13.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Type.TypeID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.*;
import java.util.*;

/**
Expand All @@ -35,15 +34,17 @@ public class IcebergChangeEvent {
private final JsonNode value;
private final JsonNode key;
private final JsonSchema jsonSchema;
private final IcebergSinkConfiguration configuration;

public IcebergChangeEvent(String destination,
JsonNode value,
JsonNode key,
JsonNode valueSchema,
JsonNode keySchema) {
JsonNode keySchema, IcebergSinkConfiguration configuration) {
this.destination = destination;
this.value = value;
this.key = key;
this.configuration = configuration;
this.jsonSchema = new JsonSchema(valueSchema, keySchema);
}

Expand Down Expand Up @@ -100,14 +101,30 @@ private GenericRecord asIcebergRecord(Types.StructType tableFields, JsonNode dat
return record;
}

private Type.PrimitiveType icebergFieldType(String fieldType) {
private Type.PrimitiveType icebergFieldType(String fieldType, String fieldTypeName) {
switch (fieldType) {
case "int8":
case "int16":
case "int32": // int 4 bytes
return Types.IntegerType.get();
if (configuration.isRichTemporalTypes() &&
fieldTypeName.equals("io.debezium.time.Date")) {
return Types.DateType.get();
}
else {
return Types.IntegerType.get();
}
case "int64": // long 8 bytes
return Types.LongType.get();
if (configuration.isRichTemporalTypes() &&
fieldTypeName.equals("io.debezium.time.MicroTimestamp")) {
return Types.TimestampType.withoutZone();
}
else if (configuration.isRichTemporalTypes() &&
fieldTypeName.equals("io.debezium.time.MicroTime")) {
return Types.TimeType.get();
}
else {
return Types.LongType.get();
}
case "float8":
case "float16":
case "float32": // float is represented in 32 bits,
Expand All @@ -119,7 +136,17 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {
case "boolean":
return Types.BooleanType.get();
case "string":
return Types.StringType.get();
if (configuration.isRichTemporalTypes() &&
fieldTypeName.equals("io.debezium.time.ZonedTimestamp")) {
return Types.TimestampType.withZone();
}
else if (configuration.isRichTemporalTypes() &&
fieldTypeName.equals("io.debezium.time.ZonedTime")) {
return Types.TimeType.get();
}
else {
return Types.StringType.get();
}
case "bytes":
return Types.BinaryType.get();
case "timestamptz":
Expand All @@ -133,7 +160,9 @@ private Type.PrimitiveType icebergFieldType(String fieldType) {

private Object jsonValToIcebergVal(Types.NestedField field,
JsonNode node) {
LOGGER.debug("Processing Field:{} Type:{}", field.name(), field.type());
String fieldTypeName = field.doc();
LOGGER.debug("Processing Field:{} Type:{} Doc:{}",
field.name(), field.type(), fieldTypeName);
final Object val;
switch (field.type().typeId()) {
case INTEGER: // int 4 bytes
Expand All @@ -151,8 +180,42 @@ private Object jsonValToIcebergVal(Types.NestedField field,
case BOOLEAN:
val = node.isNull() ? null : node.asBoolean();
break;
case DATE:
val = node.isNull() ? null
: LocalDate.ofEpochDay(node.asInt());
break;
case TIMESTAMP:
if (node.isTextual()) {
val = OffsetDateTime.parse(node.asText());
}
else if (node.isNumber()) {
Instant instant = Instant.ofEpochSecond(0L, node.asLong() * 1000);
val = LocalDateTime.ofInstant(instant, ZoneOffset.UTC);
}
else if (node.isNull()){
val = null;
}
else {
throw new RuntimeException("Unrecognized JSON node type for Iceberg type TIMESTAMP: " + node.getNodeType());
}
break;
case TIME:
if (node.isLong()) {
val = LocalTime.ofNanoOfDay(node.asLong() * 1000);
}
else if (node.isTextual()) {
// Debezium converts ZonedTime values to UTC on capture, so no information is lost by converting them
// to LocalTimes here. Iceberg doesn't support a ZonedTime equivalent anyway.
val = OffsetTime.parse(node.asText()).toLocalTime();
}
else if (node.isNull()){
val = null;
}
else {
throw new RuntimeException("Unrecognized JSON node type for Iceberg type TIME: " + node.getNodeType());
}
break;
case STRING:
// if the node is not a value node (method isValueNode returns false), convert it to string.
val = node.isValueNode() ? node.asText(null) : node.toString();
break;
case BINARY:
Expand All @@ -164,7 +227,19 @@ private Object jsonValToIcebergVal(Types.NestedField field,
}
break;
case LIST:
val = MAPPER.convertValue(node, ArrayList.class);
// for now we support two LIST type cases
Types.ListType listType = (Types.ListType) field.type();
if (listType.elementType().typeId() == TypeID.STRUCT) {
List<GenericRecord> structList = new ArrayList<>();
Iterator<JsonNode> it = node.iterator();
while (it.hasNext()) {
structList.add(asIcebergRecord(listType.elementType().asStructType(), it.next()));
}
val = structList;
}
else {
val = MAPPER.convertValue(node, ArrayList.class);
}
break;
case MAP:
val = MAPPER.convertValue(node, Map.class);
Expand Down Expand Up @@ -285,21 +360,42 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
columnId++;
String fieldName = jsonSchemaFieldNode.get("field").textValue();
String fieldType = jsonSchemaFieldNode.get("type").textValue();
LOGGER.debug("Processing Field: [{}] {}.{}::{}", columnId, schemaName, fieldName, fieldType);
String fieldTypeName = "";
JsonNode fieldTypeNameNode = jsonSchemaFieldNode.get("name");
if (fieldTypeNameNode != null && !fieldTypeNameNode.isMissingNode()) {
fieldTypeName = fieldTypeNameNode.textValue();
}

LOGGER.debug("Processing Field: [{}] {}.{}::{} ({})",
columnId, schemaName, fieldName, fieldType, fieldTypeName);
switch (fieldType) {
case "array":
JsonNode items = jsonSchemaFieldNode.get("items");
if (items != null && items.has("type")) {
String listItemType = items.get("type").textValue();

if (listItemType.equals("struct") || listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("Complex nested array types are not supported," +

if (listItemType.equals("array") || listItemType.equals("map")) {
throw new RuntimeException("'array' and 'map' nested array types are not supported," +
" array[" + listItemType + "], field " + fieldName);
}

Type.PrimitiveType item = icebergFieldType(listItemType);
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item)));
else {
if (listItemType.equals("struct")) {
List<Types.NestedField> subSchema = icebergSchema(items, fieldName, columnId+2);
schemaColumns.add(Types.NestedField.optional(columnId,
fieldName,
Types.ListType.ofOptional(columnId+1,
Types.StructType.of(subSchema)),
""));
columnId += subSchema.size() + 2;
}
else {
// primitive coercions are not supported for list types, pass '""' for fieldTypeName
Type.PrimitiveType item = icebergFieldType(listItemType, "");
schemaColumns.add(Types.NestedField.optional(
columnId, fieldName, Types.ListType.ofOptional(++columnId, item), ""));
}
}
} else {
throw new RuntimeException("Unexpected Array type for field " + fieldName);
}
Expand All @@ -309,8 +405,11 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
//break;
case "struct":
// create it as struct, nested type
// passing "" for NestedField `doc` attribute,
// as `doc` annotated coercions are not supported for members of struct types
List<Types.NestedField> subSchema = icebergSchema(jsonSchemaFieldNode, fieldName, columnId);
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, Types.StructType.of(subSchema)));
schemaColumns.add(Types.NestedField.optional(columnId, fieldName,
Types.StructType.of(subSchema), ""));
columnId += subSchema.size();
break;
default: //primitive types
Expand All @@ -320,14 +419,19 @@ private List<Types.NestedField> icebergSchema(JsonNode eventSchema, String schem
// we also dont need to add a partition field, since it already exists.
addPartitionField = false;
}
schemaColumns.add(Types.NestedField.optional(columnId, fieldName, icebergFieldType(fieldType)));
// passing fieldTypeName for NestedField `doc` attribute,
// annotation based value coercions can be made utilizing the NestedField `doc` initializer/method
schemaColumns.add(Types.NestedField.optional(columnId, fieldName,
icebergFieldType(fieldType, fieldTypeName),
fieldTypeName));
break;
}
}

if (addPartitionField) {
columnId++;
schemaColumns.add(Types.NestedField.optional(columnId, partitionColumn, Types.TimestampType.withZone()));
schemaColumns.add(Types.NestedField.optional(columnId, partitionColumn,
Types.TimestampType.withZone(), ""));
}
return schemaColumns;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public class IcebergSinkConfiguration {
public static final String TABLE_PREFIX = "table.prefix";
public static final String TABLE_AUTO_CREATE = "table.auto-create";
public static final String TABLE_SNAKE_CASE = "table.snake-case";
public static final String RICH_TEMPORAL_TYPES = "rich-temporal-types";
public static final String ICEBERG_PREFIX = "iceberg.";
public static final String ICEBERG_TABLE_PREFIX = "iceberg.table-default";
public static final String CATALOG_NAME = ICEBERG_PREFIX + "name";
Expand Down Expand Up @@ -53,6 +54,9 @@ public class IcebergSinkConfiguration {
"Prefix added to all table names")
.define(TABLE_SNAKE_CASE, BOOLEAN, false, MEDIUM,
"Coerce table names to snake_case")
.define(RICH_TEMPORAL_TYPES, BOOLEAN, false, MEDIUM,
"Coerce Debezium Date, MicroTimestamp, ZonedTimestamp, MicroTime, and ZonedTime values " +
"from JSON primitives to their corresponding Iceberg rich types")
.define(CATALOG_NAME, STRING, "default", MEDIUM,
"Iceberg catalog name")
.define(CATALOG_IMPL, STRING, null, MEDIUM,
Expand Down Expand Up @@ -114,6 +118,10 @@ public boolean isTableSnakeCase() {
return parsedConfig.getBoolean(TABLE_SNAKE_CASE);
}

public boolean isRichTemporalTypes() {
return parsedConfig.getBoolean(RICH_TEMPORAL_TYPES);
}

public String getCatalogName() {
return parsedConfig.getString(CATALOG_NAME);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public void start(Map<String, String> properties) {
IcebergSinkConfiguration configuration = new IcebergSinkConfiguration(properties);
Catalog icebergCatalog = IcebergCatalogFactory.create(configuration);
IcebergTableOperator icebergTableOperator = IcebergTableOperatorFactory.create(configuration);
SinkRecordToIcebergChangeEventConverter converter = SinkRecordToIcebergChangeEventConverterFactory.create();
SinkRecordToIcebergChangeEventConverter converter = SinkRecordToIcebergChangeEventConverterFactory.create(configuration);
consumer = new IcebergChangeConsumer(configuration, icebergCatalog, icebergTableOperator, converter);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.getindata.kafka.connect.iceberg.sink.IcebergChangeEvent;
import com.getindata.kafka.connect.iceberg.sink.IcebergSinkConfiguration;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.json.JsonConverter;
Expand All @@ -19,17 +20,20 @@ public class SinkRecordToIcebergChangeEventConverter {
private final JsonConverter valueJsonConverter;
private final Deserializer<JsonNode> keyDeserializer;
private final Deserializer<JsonNode> valueDeserializer;
private final IcebergSinkConfiguration configuration;

public SinkRecordToIcebergChangeEventConverter(Transformation<SinkRecord> extractNewRecordStateTransformation,
JsonConverter keyJsonConverter,
JsonConverter valueJsonConverter,
Deserializer<JsonNode> keyDeserializer,
Deserializer<JsonNode> valueDeserializer) {
Deserializer<JsonNode> valueDeserializer,
IcebergSinkConfiguration configuration) {
this.extractNewRecordStateTransformation = extractNewRecordStateTransformation;
this.keyJsonConverter = keyJsonConverter;
this.valueJsonConverter = valueJsonConverter;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.configuration = configuration;
}

public IcebergChangeEvent convert(SinkRecord record) {
Expand All @@ -41,7 +45,7 @@ public IcebergChangeEvent convert(SinkRecord record) {
JsonNode value = getValue(unwrappedRecord.topic(), valueDeserializer, valueBytes);
JsonNode valueSchema = getSchema(valueBytes);

return new IcebergChangeEvent(unwrappedRecord.topic(), value, key, valueSchema, keySchema);
return new IcebergChangeEvent(unwrappedRecord.topic(), value, key, valueSchema, keySchema, configuration);
}

private JsonNode getValue(String topic, Deserializer<JsonNode> deserializer, byte[] bytes) {
Expand Down
Loading

0 comments on commit 43e77e3

Please sign in to comment.