diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala index ffc1824..773d040 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigConstants.scala @@ -158,42 +158,47 @@ object EmsSinkConfigConstants { val CLOSE_EVERY_CONNECTION_DOC = "Connection pool - Explicitly close connections" val CLOSE_EVERY_CONNECTION_DEFAULT_VALUE = false - val FLATTENER_ENABLE_KEY = s"${CONNECTOR_PREFIX}.flattener.enable" + val FLATTENER_ENABLE_KEY = s"$CONNECTOR_PREFIX.flattener.enable" val FLATTENER_ENABLE_DOC = s"Enable flattening of nested records. This is likely to be needed if the source data contains nested objects or collections." val FLATTENER_ENABLE_DEFAULT = true - val FLATTENER_DISCARD_COLLECTIONS_KEY = s"${CONNECTOR_PREFIX}.flattener.collections.discard" + val FLATTENER_DISCARD_COLLECTIONS_KEY = s"$CONNECTOR_PREFIX.flattener.collections.discard" val FLATTENER_DISCARD_COLLECTIONS_DOC = "Discard array and map fields at any level of depth. Note that the default handling of collections by the flattener function is to JSON-encode them as nullable STRING fields." val FLATTENER_DISCARD_COLLECTIONS_DEFAULT = false - val FLATTENER_JSONBLOB_CHUNKS_KEY = s"${CONNECTOR_PREFIX}.flattener.jsonblob.chunks" + val FLATTENER_JSONBLOB_CHUNKS_KEY = s"$CONNECTOR_PREFIX.flattener.jsonblob.chunks" val FLATTENER_JSONBLOB_CHUNKS_DOC = "Encodes the record into a JSON blob broken down into N VARCHAR fields (e.g. `payload_chunk1`, `payload_chunk2`, `...`, `payload_chunkN`)." val FLATTENER_JSONBLOB_CHUNKS_DEFAULT = null - val DECIMAL_CONVERSION_KEY = s"${CONNECTOR_PREFIX}.convert.decimals.to.double" - val DECIMAL_CONVERSION_KEY_DOC = + val DECIMAL_CONVERSION_KEY = s"$CONNECTOR_PREFIX.convert.decimals.to.double" + val DECIMAL_CONVERSION_DOC = s"Convert decimal values into doubles. Valid only for formats with schema (AVRO, Protobuf, JsonSchema)" - val DECIMAL_CONVERSION_KEY_DEFAULT = false + val DECIMAL_CONVERSION_DEFAULT = false - val NULL_PK_KEY = s"${CONNECTOR_PREFIX}.allow.null.pk" + val TRANSFORM_FIELDS_LOWERCASE_KEY = s"$CONNECTOR_PREFIX.convert.lowercase.fields" + val TRANSFORM_FIELDS_LOWERCASE_DOC = + s"Convert all fields to lowercase" + val TRANSFORM_FIELDS_LOWERCASE_DEFAULT = false + + val NULL_PK_KEY = s"$CONNECTOR_PREFIX.allow.null.pk" val NULL_PK_KEY_DOC = s"Allow parsing messages with null values in the columns listed as primary keys. If disabled connector will fail after receiving such a message. NOTE: enabling that will cause data inconsistency issue on the EMS side." val NULL_PK_KEY_DEFAULT = false - val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"${CONNECTOR_PREFIX}.embed.kafka.metadata" + val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"$CONNECTOR_PREFIX.embed.kafka.metadata" val EMBED_KAFKA_EMBEDDED_METADATA_DOC = "Embed Kafka metadata such as partition, offset and timestamp as additional record fields." val EMBED_KAFKA_EMBEDDED_METADATA_DEFAULT = true - val USE_IN_MEMORY_FS_KEY = s"${CONNECTOR_PREFIX}.inmemfs.enable" + val USE_IN_MEMORY_FS_KEY = s"$CONNECTOR_PREFIX.inmemfs.enable" val USE_IN_MEMORY_FS_DOC = "Rather than writing to the host file system, buffer parquet data files in memory" val USE_IN_MEMORY_FS_DEFAULT = false - val SINK_PUT_TIMEOUT_KEY = s"${CONNECTOR_PREFIX}.sink.put.timeout.ms" + val SINK_PUT_TIMEOUT_KEY = s"$CONNECTOR_PREFIX.sink.put.timeout.ms" val SINK_PUT_TIMEOUT_DOC = "The maximum time (in milliseconds) for the connector task to complete the upload of a single Parquet file before being flagged as failed. Note: this value should always be lower than max.poll.interval.ms" val SINK_PUT_TIMEOUT_DEFAULT = 288000L // 4.8 minutes diff --git a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala index 6651252..763a037 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigDef.scala @@ -363,9 +363,16 @@ object EmsSinkConfigDef { .define( DECIMAL_CONVERSION_KEY, Type.BOOLEAN, - DECIMAL_CONVERSION_KEY_DEFAULT, + DECIMAL_CONVERSION_DEFAULT, Importance.MEDIUM, - DECIMAL_CONVERSION_KEY_DOC, + DECIMAL_CONVERSION_DOC, + ) + .define( + TRANSFORM_FIELDS_LOWERCASE_KEY, + Type.BOOLEAN, + TRANSFORM_FIELDS_LOWERCASE_DEFAULT, + Importance.MEDIUM, + TRANSFORM_FIELDS_LOWERCASE_DOC, ) .define( NULL_PK_KEY, diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/PreConversionConfig.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/PreConversionConfig.scala index d72fe08..352adee 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/PreConversionConfig.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/PreConversionConfig.scala @@ -17,14 +17,17 @@ package com.celonis.kafka.connect.transform import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY -import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY_DEFAULT +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_DEFAULT +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_KEY +import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_DEFAULT import com.celonis.kafka.connect.ems.config.PropertiesHelper.getBoolean -final case class PreConversionConfig(convertDecimalsToFloat: Boolean) +final case class PreConversionConfig(convertDecimalsToFloat: Boolean, convertFieldsToLowercase: Boolean) object PreConversionConfig { def extract(props: Map[String, _]): PreConversionConfig = PreConversionConfig( - getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_KEY_DEFAULT), + getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_DEFAULT), + getBoolean(props, TRANSFORM_FIELDS_LOWERCASE_KEY).getOrElse(TRANSFORM_FIELDS_LOWERCASE_DEFAULT), ) } diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversion.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversion.scala index 00c5ac0..6e6ba02 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversion.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversion.scala @@ -33,8 +33,11 @@ trait ConnectConversion { object ConnectConversion { def fromConfig(config: PreConversionConfig): ConnectConversion = - if (config.convertDecimalsToFloat) new RecursiveConversion(DecimalToFloatConversion) - else noOpConversion + if (config.convertFieldsToLowercase || config.convertDecimalsToFloat) { + val inner = if (config.convertDecimalsToFloat) DecimalToFloatConversion else noOpConversion + val fieldNameConversion: String => String = if (config.convertFieldsToLowercase) _.toLowerCase else identity + new RecursiveConversion(inner, fieldNameConversion) + } else noOpConversion val noOpConversion: ConnectConversion = new ConnectConversion { override def convertSchema(originalSchema: Schema): Schema = originalSchema diff --git a/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversion.scala b/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversion.scala index e527130..602cc40 100644 --- a/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversion.scala +++ b/connector/src/main/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversion.scala @@ -24,13 +24,14 @@ import scala.jdk.CollectionConverters._ /** Traverse containers data objects (structs, maps and lists) and apply the inner conversion to the leaves */ -final class RecursiveConversion(innerConversion: ConnectConversion) extends ConnectConversion { +final class RecursiveConversion(innerConversion: ConnectConversion, fieldNameConversion: String => String) + extends ConnectConversion { override def convertSchema(originalSchema: Schema): Schema = originalSchema.`type`() match { case Schema.Type.STRUCT => originalSchema.fields().asScala.foldLeft(SchemaBuilder.struct()) { case (builder, field) => - builder.field(field.name(), convertSchema(field.schema())) + builder.field(fieldNameConversion(field.name()), convertSchema(field.schema())) }.optionalIf(originalSchema.isOptional).build() case Schema.Type.ARRAY => SchemaBuilder.array(convertSchema(originalSchema.valueSchema())).optionalIf(originalSchema.isOptional).build() @@ -46,10 +47,12 @@ final class RecursiveConversion(innerConversion: ConnectConversion) extends Conn connectValue match { case connectValue: Struct => val newStruct = new Struct(targetSchema) - targetSchema.fields().asScala.foreach { field => + originalSchema.fields().asScala.foreach { field => + val newFieldName = fieldNameConversion(field.name()) + val newFieldSchema = targetSchema.field(newFieldName).schema() newStruct.put( - field.name(), - convertValue(connectValue.get(field), originalSchema.field(field.name()).schema(), field.schema()), + newFieldName, + convertValue(connectValue.get(field), field.schema(), newFieldSchema), ) } newStruct diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala index 5ccb57b..9382f21 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/config/EmsSinkConfigTest.scala @@ -61,7 +61,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { http = UnproxiedHttpClientConfig(defaultPoolingConfig), explode = ExplodeConfig.None, orderField = OrderFieldConfig(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName.some), - preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false), + preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false), flattenerConfig = None, embedKafkaMetadata = true, useInMemoryFileSystem = false, @@ -79,19 +79,40 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { test(s"parse PreConversionConfig") { val expectedWithDefault = - anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false)) - val properties = propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY) + anEmsSinkConfig.copy(preConversionConfig = + PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false), + ) + val properties = + propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY).removed(TRANSFORM_FIELDS_LOWERCASE_KEY) parseProperties(properties) shouldBe Right(expectedWithDefault) - val expectedWithConversion = - anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = true)) - val propertiesWithConversion = propertiesFromConfig(expectedWithConversion) - parseProperties(propertiesWithConversion) shouldBe Right(expectedWithConversion) - - val expectedWithoutConversion = - anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false)) - val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversion) - parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversion) + val expectedWithDecimalConversion = + anEmsSinkConfig.copy(preConversionConfig = + PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = false), + ) + val propertiesWithDecimalConversion = propertiesFromConfig(expectedWithDecimalConversion) + parseProperties(propertiesWithDecimalConversion) shouldBe Right(expectedWithDecimalConversion) + + val expectedWithFieldNamesConversion = + anEmsSinkConfig.copy(preConversionConfig = + PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = true), + ) + val propertiesWithFieldNamesConversion = propertiesFromConfig(expectedWithFieldNamesConversion) + parseProperties(propertiesWithFieldNamesConversion) shouldBe Right(expectedWithFieldNamesConversion) + + val expectedWithAllConversions = + anEmsSinkConfig.copy(preConversionConfig = + PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = true), + ) + val propertiesWithAllConversions = propertiesFromConfig(expectedWithAllConversions) + parseProperties(propertiesWithAllConversions) shouldBe Right(expectedWithAllConversions) + + val expectedWithoutConversions = + anEmsSinkConfig.copy(preConversionConfig = + PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false), + ) + val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversions) + parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversions) } test(s"returns an error if AUTHORIZATION_KEY is missing") { @@ -240,23 +261,24 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers { } private def propertiesFromConfig(config: EmsSinkConfig): Map[String, _] = Map( - "name" -> config.sinkName, - ENDPOINT_KEY -> config.url.toString, - TARGET_TABLE_KEY -> config.target, - AUTHORIZATION_KEY -> config.authorization.header, - ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString, - COMMIT_SIZE_KEY -> config.commitPolicy.fileSize, - COMMIT_INTERVAL_KEY -> config.commitPolicy.interval, - COMMIT_RECORDS_KEY -> config.commitPolicy.records, - ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval, - ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries, - TMP_DIRECTORY_KEY -> config.workingDir.toString, - PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","), - CONNECTION_ID_KEY -> config.connectionId.get, - ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull, - FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull, - DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat, - FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined, + "name" -> config.sinkName, + ENDPOINT_KEY -> config.url.toString, + TARGET_TABLE_KEY -> config.target, + AUTHORIZATION_KEY -> config.authorization.header, + ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString, + COMMIT_SIZE_KEY -> config.commitPolicy.fileSize, + COMMIT_INTERVAL_KEY -> config.commitPolicy.interval, + COMMIT_RECORDS_KEY -> config.commitPolicy.records, + ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval, + ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries, + TMP_DIRECTORY_KEY -> config.workingDir.toString, + PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","), + CONNECTION_ID_KEY -> config.connectionId.get, + ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull, + FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull, + DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat, + TRANSFORM_FIELDS_LOWERCASE_KEY -> config.preConversionConfig.convertFieldsToLowercase, + FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined, FLATTENER_DISCARD_COLLECTIONS_KEY -> config.flattenerConfig.map(_.discardCollections).getOrElse( FLATTENER_DISCARD_COLLECTIONS_DEFAULT, ), diff --git a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala index 60e05d5..469c189 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/ems/sink/EmsSinkTaskObfuscationTest.scala @@ -68,7 +68,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD UnproxiedHttpClientConfig(defaultPoolingConfig), ExplodeConfig.None, OrderFieldConfig(Some(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName)), - PreConversionConfig(convertDecimalsToFloat = false), + PreConversionConfig(false, false), None, embedKafkaMetadata = false, useInMemoryFileSystem = false, diff --git a/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala index df51462..5195c5a 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/transform/RecordTransformerTest.scala @@ -45,16 +45,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { // other fields omitted ).asJava - val transformer = - RecordTransformer.fromConfig( - "mySink", - PreConversionConfig(false), - Some(FlattenerConfig(discardCollections = true, jsonBlobChunks = None)), - Nil, - None, - allowNullsAsPks = false, - FieldInserter.embeddedKafkaMetadata(doInsert = true, None), - ) + val transformer = buildTransformer(flatten = true, discardCollections = true) val record1 = sinkRecord(value1) transformer.transform(record1).unsafeRunSync() @@ -77,16 +68,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { "some_field" -> 22, // field type has changed! ).asJava - val transformer = - RecordTransformer.fromConfig( - "mySink", - PreConversionConfig(false), - Some(FlattenerConfig(discardCollections = true, jsonBlobChunks = None)), - Nil, - None, - allowNullsAsPks = false, - FieldInserter.embeddedKafkaMetadata(doInsert = true, None), - ) + val transformer = buildTransformer(flatten = true, discardCollections = true) val record1 = sinkRecord(value1) val genericRecord1 = transformer.transform(record1).unsafeRunSync() @@ -107,7 +89,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { ).asJava val record = sinkRecord(value) - val genericRecord = chunkTransform(record, 2, 20) + val genericRecord = transform(record, flatten = true, jsonBlobChunks = Some(JsonBlobChunks(2, 20))) genericRecord.get("payload_chunk1") shouldBe "{\"heterogeneous_arra" genericRecord.get("payload_chunk2") shouldBe "y\":[\"a\",1,true]}" @@ -120,7 +102,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { ).asJava val record = sinkRecord(value) - val genericRecord = chunkTransform(record, 2, 20) + val genericRecord = transform(record, flatten = true, jsonBlobChunks = Some(JsonBlobChunks(2, 20))) genericRecord.get("payload_chunk1") shouldBe "{\"123454567890123454" genericRecord.get("payload_chunk2") shouldBe "56789\":\"x\",\"\":\"y\"}" @@ -130,7 +112,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { val aBigDecimal = java.math.BigDecimal.valueOf(0.12345) val nestedSchema = SchemaBuilder.struct() .field("nested_decimal", Decimal.schema(5)) - .field("nested_float32", SchemaBuilder.float32().schema()).build() + .field("nested_float32", Schema.FLOAT32_SCHEMA).build() val schema = SchemaBuilder.struct() .field("nested", nestedSchema) @@ -149,7 +131,7 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { struct.put("another_optional_decimal", null) val record = sinkRecord(struct, schema) - val genericRecord = decimalConversionWithNoFlattening(record) + val genericRecord = transform(record, convertDecimals = true) genericRecord.get("nested").asInstanceOf[Record].get("nested_decimal") shouldBe aBigDecimal.doubleValue() genericRecord.get("nested").asInstanceOf[Record].get("nested_float32") shouldBe 1.45f @@ -158,12 +140,34 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { genericRecord.get("another_optional_decimal") shouldBe null } + test("With field to lowercase conversion enabled, field names are lowercased") { + val nestedSchema = SchemaBuilder.struct() + .field("Inner", Schema.STRING_SCHEMA) + + val schema = SchemaBuilder.struct() + .field("nEsted", nestedSchema) + .field("STRING", Schema.STRING_SCHEMA) + + val nestedStruct = new Struct(nestedSchema) + nestedStruct.put("Inner", "abc") + + val struct = new Struct(schema) + struct.put("nEsted", nestedStruct) + struct.put("STRING", "def") + + val record = sinkRecord(struct, schema) + val genericRecord = transform(record, lowercaseFields = true) + + genericRecord.get("nested").asInstanceOf[Record].get("inner") shouldBe "abc" + genericRecord.get("string") shouldBe "def" + } + test("With Chunking disabled, heterogeneous arrays do not prevent flattening") { val value = Map( "heterogeneous_array" -> List[Any]("a", 1, true).asJava, ).asJava val record = sinkRecord(value) - flattenTransform(record) + transform(record, flatten = true) () } @@ -176,42 +180,37 @@ class RecordTransformerTest extends AnyFunSuite with Matchers { ).asJava val record = sinkRecord(value) - flattenTransform(record) + transform(record, flatten = true) () } - private def chunkTransform(record: SinkRecord, maxChunks: Int, chunkSize: Int): GenericRecord = { - val flattenerConfig = Some(FlattenerConfig(discardCollections = false, Some(JsonBlobChunks(maxChunks, chunkSize)))) - val transformer = - RecordTransformer.fromConfig("mySink", - PreConversionConfig(false), - flattenerConfig, - Nil, - None, - false, - FieldInserter.noop, - ) - transformer.transform(record).unsafeRunSync() - } - - private def flattenTransform(record: SinkRecord, discardCollections: Boolean = false): GenericRecord = { - val flattenerConfig = Some(FlattenerConfig(discardCollections = discardCollections, None)) - val transformer = - RecordTransformer.fromConfig("mySink", - PreConversionConfig(false), - flattenerConfig, - Nil, - None, - false, - FieldInserter.noop, - ) - transformer.transform(record).unsafeRunSync() - } - - private def decimalConversionWithNoFlattening(record: SinkRecord): GenericRecord = { - val transformer = - RecordTransformer.fromConfig("mySink", PreConversionConfig(true), None, Nil, None, false, FieldInserter.noop) - transformer.transform(record).unsafeRunSync() + private def transform( + record: SinkRecord, + flatten: Boolean = false, + discardCollections: Boolean = false, + convertDecimals: Boolean = false, + lowercaseFields: Boolean = false, + jsonBlobChunks: Option[JsonBlobChunks] = None): GenericRecord = + buildTransformer(flatten, discardCollections, convertDecimals, lowercaseFields, jsonBlobChunks).transform( + record, + ).unsafeRunSync() + + private def buildTransformer( + flatten: Boolean, + discardCollections: Boolean, + convertDecimals: Boolean = false, + lowercaseFields: Boolean = false, + jsonBlobChunks: Option[JsonBlobChunks] = None): RecordTransformer = { + val flattenerConfig = + Some(FlattenerConfig(discardCollections = discardCollections, jsonBlobChunks)).filter(_ => flatten) + RecordTransformer.fromConfig("mySink", + PreConversionConfig(convertDecimals, lowercaseFields), + flattenerConfig, + Nil, + None, + false, + FieldInserter.noop, + ) } private def sinkRecord(value: Any, schema: Schema = null): SinkRecord = diff --git a/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversionTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversionTest.scala index dd8a92e..3bcabda 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversionTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/ConnectConversionTest.scala @@ -19,19 +19,47 @@ package com.celonis.kafka.connect.transform.conversion import com.celonis.kafka.connect.transform.PreConversionConfig import org.apache.kafka.connect.data.Decimal import org.apache.kafka.connect.data.Schema +import org.apache.kafka.connect.data.SchemaBuilder +import org.apache.kafka.connect.data.Timestamp import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers class ConnectConversionTest extends AnyFunSuite with Matchers { - test("Build conversion from config") { - val conversion = ConnectConversion.fromConfig(PreConversionConfig(convertDecimalsToFloat = true)) + test("Build conversion for converting decimals") { + val conversion = + ConnectConversion.fromConfig(PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = false)) conversion shouldBe a[RecursiveConversion] conversion.convertSchema(Decimal.schema(5)) shouldBe Schema.FLOAT64_SCHEMA conversion.convert(java.math.BigDecimal.ONE, Some(Decimal.schema(5))) shouldBe (1d, Some(Schema.FLOAT64_SCHEMA)) } - test("Build noop conversion if convertDecimalsToFloat") { - val conversion = ConnectConversion.fromConfig(PreConversionConfig(convertDecimalsToFloat = false)) + test("Build conversion for converting field names to lowercase") { + val conversion = + ConnectConversion.fromConfig(PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = true)) + conversion shouldBe a[RecursiveConversion] + + val schema = SchemaBuilder.struct().field("UPPER", Timestamp.SCHEMA).build() + val expectedSchema = SchemaBuilder.struct().field("upper", Timestamp.SCHEMA).build() + + conversion.convertSchema(schema) shouldBe expectedSchema + } + + test("Build conversion for converting field names to lowercase and decimals to floats") { + val conversion = + ConnectConversion.fromConfig(PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = true)) + conversion shouldBe a[RecursiveConversion] + + val schema = SchemaBuilder.struct().field("DEcimal", Decimal.schema(5)).build() + val expectedSchema = SchemaBuilder.struct().field("decimal", Schema.FLOAT64_SCHEMA).build() + + conversion.convertSchema(schema) shouldBe expectedSchema + } + + test("Build noop conversion if all conversions are disabled") { + val conversion = ConnectConversion.fromConfig(PreConversionConfig( + convertDecimalsToFloat = false, + convertFieldsToLowercase = false, + )) conversion shouldBe ConnectConversion.noOpConversion conversion.convertSchema(Decimal.schema(5)) shouldBe Decimal.schema(5) conversion.convert(java.math.BigDecimal.ONE, Some(Decimal.schema(5))) shouldBe (java.math.BigDecimal.ONE, Some( diff --git a/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversionTest.scala b/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversionTest.scala index e53c193..41e7e49 100644 --- a/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversionTest.scala +++ b/connector/src/test/scala/com/celonis/kafka/connect/transform/conversion/RecursiveConversionTest.scala @@ -97,7 +97,40 @@ class RecursiveConversionTest extends AnyFunSuite with Matchers { convertedValue shouldBe expectedValue } - lazy val recursiveConversion = new RecursiveConversion(stringConversion) + test("it applies fieldName conversion") { + val recursiveConversion = new RecursiveConversion(stringConversion, _.toUpperCase) + val nestedSchema = SchemaBuilder.struct().field("a_timestamp", Timestamp.SCHEMA).build() + val nestedStruct = new Struct(nestedSchema) + nestedStruct.put("a_timestamp", aDate) + + val originalSchema = SchemaBuilder.struct() + .field("an_int", Schema.INT32_SCHEMA) + .field("nested", nestedSchema) + .build() + + val originalValue = new Struct(originalSchema) + originalValue.put("an_int", 123) + originalValue.put("nested", nestedStruct) + + val expectedNestedSchema = SchemaBuilder.struct().field("A_TIMESTAMP", Schema.STRING_SCHEMA).build() + val expectedSchema = SchemaBuilder.struct() + .field("AN_INT", Schema.STRING_SCHEMA) + .field("NESTED", expectedNestedSchema) + .build() + + val expectedNestedValue = new Struct(expectedNestedSchema) + expectedNestedValue.put("A_TIMESTAMP", aDate.toString) + + val expectedValue = new Struct(expectedSchema) + expectedValue.put("AN_INT", "123") + expectedValue.put("NESTED", expectedNestedValue) + + val (convertedValue, Some(convertedSchema)) = recursiveConversion.convert(originalValue, Some(originalSchema)) + convertedSchema shouldBe expectedSchema + convertedValue shouldBe expectedValue + } + + lazy val recursiveConversion = new RecursiveConversion(stringConversion, identity) lazy val stringConversion = new ConnectConversion { override def convertSchema(originalSchema: Schema): Schema = Schema.STRING_SCHEMA