Skip to content

Commit

Permalink
Non-flattening MongoDb Debezium SMT (#204)
Browse files Browse the repository at this point in the history
* matf-non-flattening-mongodb-debezium-smt

- adds debezium mongo SMT for converting BSON before/after into typed Struct before/after
  • Loading branch information
tabmatfournier authored Apr 23, 2024
1 parent 423b4a8 commit 21d741e
Show file tree
Hide file tree
Showing 17 changed files with 2,144 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .baseline/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,7 @@

<!-- Referencing guava classes should be allowed in classes within bundled-guava module -->
<suppress files="org.apache.iceberg.GuavaClasses" id="BanUnrelocatedGuavaClasses"/>

<!-- Ignore borrowed classes from debezium -->
<suppress files="[\\/]io.debezium.connector[\\/]" checks="."/>
</suppressions>
5 changes: 4 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,10 @@ subprojects {

spotless {
java {
target "src/**/*.java"
target project.fileTree(project.rootDir) {
include 'src/**/*.java'
exclude 'src/*/io/debezium/**/*.java'
}
googleJavaFormat("1.7")
removeUnusedImports()
licenseHeaderFile "$rootDir/header.txt"
Expand Down
3 changes: 3 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
assertj-ver = "3.24.2"
avro-ver = "1.11.3"
awaitility-ver = "4.2.0"
bson-ver = "4.11.0"
hadoop-ver = "3.3.6"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
Expand All @@ -13,8 +14,10 @@ slf4j-ver = "1.7.36"
testcontainers-ver = "1.17.6"



[libraries]
avro = { module = "org.apache.avro:avro", version.ref = "avro-ver" }
bson = { module = "org.mongodb:bson", version.ref = "bson-ver"}
hadoop-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop-ver" }
hadoop-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop-ver" }
hive-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive-ver" }
Expand Down
23 changes: 23 additions & 0 deletions kafka-connect-transforms/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,3 +141,26 @@ If `nested` is on:

If `nested` is off:
`_kafka_metdata_topic`, `_kafka_metadata_partition`, `_kafka_metadata_offset`, `_kafka_metadata_timestamp`

# MongoDebeziumTransform
_(Experimental)_

The `MongoDebeziumTransform` SMT transforms a Mongo Debezium formatted message with `before`/`after` BSON
strings into `before`/`after` typed Structs that the `DebeziumTransform` SMT expects.

It does not (yet) support renaming columns if mongodb column is not supported by your underlying
catalog type.

## Configuration

| Property | Description |
|---------------------|--------------------------------------------------|
| array_handling_mode | `array` or `document` to set array handling mode |

Value array (the default) will encode arrays as the array datatype. It is user’s responsibility to ensure that
all elements for a given array instance are of the same type. This option is a restricting one but offers
easy processing of arrays by downstream clients.

Value document will convert the array into a struct of structs in the similar way as done by BSON serialization.
The main struct contains fields named _0, _1, _2 etc. where the name represents the index of the element in the array.
Every element is then passed as the value for the given field.
1 change: 1 addition & 0 deletions kafka-connect-transforms/build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
dependencies {
implementation libs.iceberg.guava
implementation libs.bson
implementation libs.slf4j
compileOnly libs.bundles.kafka.connect

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/
package io.debezium.connector.mongodb.transforms;

public enum ArrayEncoding {
ARRAY("array"),
DOCUMENT("document");

private final String value;

ArrayEncoding(String value) {
this.value = value;
}

public String getValue() {
return value;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @return the matching option, or null if no match is found
*/
public static ArrayEncoding parse(String value) {
if (value == null) {
return null;
}
value = value.trim();
for (ArrayEncoding option : ArrayEncoding.values()) {
if (option.getValue().equalsIgnoreCase(value)) {
return option;
}
}
return null;
}

/**
* Determine if the supplied value is one of the predefined options.
*
* @param value the configuration property value; may not be null
* @param defaultValue the default value; may be null
* @return the matching option, or null if no match is found and the non-null default is invalid
*/
public static ArrayEncoding parse(String value, String defaultValue) {
ArrayEncoding mode = parse(value);
if (mode == null && defaultValue != null) {
mode = parse(defaultValue);
}
return mode;
}
}
Loading

0 comments on commit 21d741e

Please sign in to comment.