Skip to content

Commit

Permalink
DP-3392: Option to lowercase field names (#64)
Browse files Browse the repository at this point in the history
* DP-3392: Accept a fieldname transformer in RecursiveConversion

* DP-3392: Implement conversion parsing

* DP-3392: Add tests in record transformer for converting field to lowercase

* DP-3392: Clean up RecordTransformerTest
  • Loading branch information
nicmart authored May 22, 2024
1 parent f020cc8 commit 81156dc
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -158,42 +158,47 @@ object EmsSinkConfigConstants {
val CLOSE_EVERY_CONNECTION_DOC = "Connection pool - Explicitly close connections"
val CLOSE_EVERY_CONNECTION_DEFAULT_VALUE = false

val FLATTENER_ENABLE_KEY = s"${CONNECTOR_PREFIX}.flattener.enable"
val FLATTENER_ENABLE_KEY = s"$CONNECTOR_PREFIX.flattener.enable"
val FLATTENER_ENABLE_DOC =
s"Enable flattening of nested records. This is likely to be needed if the source data contains nested objects or collections."
val FLATTENER_ENABLE_DEFAULT = true

val FLATTENER_DISCARD_COLLECTIONS_KEY = s"${CONNECTOR_PREFIX}.flattener.collections.discard"
val FLATTENER_DISCARD_COLLECTIONS_KEY = s"$CONNECTOR_PREFIX.flattener.collections.discard"
val FLATTENER_DISCARD_COLLECTIONS_DOC =
"Discard array and map fields at any level of depth. Note that the default handling of collections by the flattener function is to JSON-encode them as nullable STRING fields."
val FLATTENER_DISCARD_COLLECTIONS_DEFAULT = false

val FLATTENER_JSONBLOB_CHUNKS_KEY = s"${CONNECTOR_PREFIX}.flattener.jsonblob.chunks"
val FLATTENER_JSONBLOB_CHUNKS_KEY = s"$CONNECTOR_PREFIX.flattener.jsonblob.chunks"
val FLATTENER_JSONBLOB_CHUNKS_DOC =
"Encodes the record into a JSON blob broken down into N VARCHAR fields (e.g. `payload_chunk1`, `payload_chunk2`, `...`, `payload_chunkN`)."
val FLATTENER_JSONBLOB_CHUNKS_DEFAULT = null

val DECIMAL_CONVERSION_KEY = s"${CONNECTOR_PREFIX}.convert.decimals.to.double"
val DECIMAL_CONVERSION_KEY_DOC =
val DECIMAL_CONVERSION_KEY = s"$CONNECTOR_PREFIX.convert.decimals.to.double"
val DECIMAL_CONVERSION_DOC =
s"Convert decimal values into doubles. Valid only for formats with schema (AVRO, Protobuf, JsonSchema)"
val DECIMAL_CONVERSION_KEY_DEFAULT = false
val DECIMAL_CONVERSION_DEFAULT = false

val NULL_PK_KEY = s"${CONNECTOR_PREFIX}.allow.null.pk"
val TRANSFORM_FIELDS_LOWERCASE_KEY = s"$CONNECTOR_PREFIX.convert.lowercase.fields"
val TRANSFORM_FIELDS_LOWERCASE_DOC =
s"Convert all fields to lowercase"
val TRANSFORM_FIELDS_LOWERCASE_DEFAULT = false

val NULL_PK_KEY = s"$CONNECTOR_PREFIX.allow.null.pk"
val NULL_PK_KEY_DOC =
s"Allow parsing messages with null values in the columns listed as primary keys. If disabled connector will fail after receiving such a message. NOTE: enabling that will cause data inconsistency issue on the EMS side."
val NULL_PK_KEY_DEFAULT = false

val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"${CONNECTOR_PREFIX}.embed.kafka.metadata"
val EMBED_KAFKA_EMBEDDED_METADATA_KEY = s"$CONNECTOR_PREFIX.embed.kafka.metadata"
val EMBED_KAFKA_EMBEDDED_METADATA_DOC =
"Embed Kafka metadata such as partition, offset and timestamp as additional record fields."
val EMBED_KAFKA_EMBEDDED_METADATA_DEFAULT = true

val USE_IN_MEMORY_FS_KEY = s"${CONNECTOR_PREFIX}.inmemfs.enable"
val USE_IN_MEMORY_FS_KEY = s"$CONNECTOR_PREFIX.inmemfs.enable"
val USE_IN_MEMORY_FS_DOC =
"Rather than writing to the host file system, buffer parquet data files in memory"
val USE_IN_MEMORY_FS_DEFAULT = false

val SINK_PUT_TIMEOUT_KEY = s"${CONNECTOR_PREFIX}.sink.put.timeout.ms"
val SINK_PUT_TIMEOUT_KEY = s"$CONNECTOR_PREFIX.sink.put.timeout.ms"
val SINK_PUT_TIMEOUT_DOC =
"The maximum time (in milliseconds) for the connector task to complete the upload of a single Parquet file before being flagged as failed. Note: this value should always be lower than max.poll.interval.ms"
val SINK_PUT_TIMEOUT_DEFAULT = 288000L // 4.8 minutes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,9 +363,16 @@ object EmsSinkConfigDef {
.define(
DECIMAL_CONVERSION_KEY,
Type.BOOLEAN,
DECIMAL_CONVERSION_KEY_DEFAULT,
DECIMAL_CONVERSION_DEFAULT,
Importance.MEDIUM,
DECIMAL_CONVERSION_KEY_DOC,
DECIMAL_CONVERSION_DOC,
)
.define(
TRANSFORM_FIELDS_LOWERCASE_KEY,
Type.BOOLEAN,
TRANSFORM_FIELDS_LOWERCASE_DEFAULT,
Importance.MEDIUM,
TRANSFORM_FIELDS_LOWERCASE_DOC,
)
.define(
NULL_PK_KEY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
package com.celonis.kafka.connect.transform

import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_KEY_DEFAULT
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.DECIMAL_CONVERSION_DEFAULT
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_KEY
import com.celonis.kafka.connect.ems.config.EmsSinkConfigConstants.TRANSFORM_FIELDS_LOWERCASE_DEFAULT
import com.celonis.kafka.connect.ems.config.PropertiesHelper.getBoolean

final case class PreConversionConfig(convertDecimalsToFloat: Boolean)
final case class PreConversionConfig(convertDecimalsToFloat: Boolean, convertFieldsToLowercase: Boolean)

object PreConversionConfig {
def extract(props: Map[String, _]): PreConversionConfig =
PreConversionConfig(
getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_KEY_DEFAULT),
getBoolean(props, DECIMAL_CONVERSION_KEY).getOrElse(DECIMAL_CONVERSION_DEFAULT),
getBoolean(props, TRANSFORM_FIELDS_LOWERCASE_KEY).getOrElse(TRANSFORM_FIELDS_LOWERCASE_DEFAULT),
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ trait ConnectConversion {

object ConnectConversion {
def fromConfig(config: PreConversionConfig): ConnectConversion =
if (config.convertDecimalsToFloat) new RecursiveConversion(DecimalToFloatConversion)
else noOpConversion
if (config.convertFieldsToLowercase || config.convertDecimalsToFloat) {
val inner = if (config.convertDecimalsToFloat) DecimalToFloatConversion else noOpConversion
val fieldNameConversion: String => String = if (config.convertFieldsToLowercase) _.toLowerCase else identity
new RecursiveConversion(inner, fieldNameConversion)
} else noOpConversion

val noOpConversion: ConnectConversion = new ConnectConversion {
override def convertSchema(originalSchema: Schema): Schema = originalSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,14 @@ import scala.jdk.CollectionConverters._

/** Traverse containers data objects (structs, maps and lists) and apply the inner conversion to the leaves
*/
final class RecursiveConversion(innerConversion: ConnectConversion) extends ConnectConversion {
final class RecursiveConversion(innerConversion: ConnectConversion, fieldNameConversion: String => String)
extends ConnectConversion {

override def convertSchema(originalSchema: Schema): Schema =
originalSchema.`type`() match {
case Schema.Type.STRUCT =>
originalSchema.fields().asScala.foldLeft(SchemaBuilder.struct()) { case (builder, field) =>
builder.field(field.name(), convertSchema(field.schema()))
builder.field(fieldNameConversion(field.name()), convertSchema(field.schema()))
}.optionalIf(originalSchema.isOptional).build()
case Schema.Type.ARRAY =>
SchemaBuilder.array(convertSchema(originalSchema.valueSchema())).optionalIf(originalSchema.isOptional).build()
Expand All @@ -46,10 +47,12 @@ final class RecursiveConversion(innerConversion: ConnectConversion) extends Conn
connectValue match {
case connectValue: Struct =>
val newStruct = new Struct(targetSchema)
targetSchema.fields().asScala.foreach { field =>
originalSchema.fields().asScala.foreach { field =>
val newFieldName = fieldNameConversion(field.name())
val newFieldSchema = targetSchema.field(newFieldName).schema()
newStruct.put(
field.name(),
convertValue(connectValue.get(field), originalSchema.field(field.name()).schema(), field.schema()),
newFieldName,
convertValue(connectValue.get(field), field.schema(), newFieldSchema),
)
}
newStruct
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
http = UnproxiedHttpClientConfig(defaultPoolingConfig),
explode = ExplodeConfig.None,
orderField = OrderFieldConfig(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName.some),
preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false),
preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
flattenerConfig = None,
embedKafkaMetadata = true,
useInMemoryFileSystem = false,
Expand All @@ -79,19 +79,40 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {

test(s"parse PreConversionConfig") {
val expectedWithDefault =
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false))
val properties = propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY)
anEmsSinkConfig.copy(preConversionConfig =
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
)
val properties =
propertiesFromConfig(expectedWithDefault).removed(DECIMAL_CONVERSION_KEY).removed(TRANSFORM_FIELDS_LOWERCASE_KEY)
parseProperties(properties) shouldBe Right(expectedWithDefault)

val expectedWithConversion =
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = true))
val propertiesWithConversion = propertiesFromConfig(expectedWithConversion)
parseProperties(propertiesWithConversion) shouldBe Right(expectedWithConversion)

val expectedWithoutConversion =
anEmsSinkConfig.copy(preConversionConfig = PreConversionConfig(convertDecimalsToFloat = false))
val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversion)
parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversion)
val expectedWithDecimalConversion =
anEmsSinkConfig.copy(preConversionConfig =
PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = false),
)
val propertiesWithDecimalConversion = propertiesFromConfig(expectedWithDecimalConversion)
parseProperties(propertiesWithDecimalConversion) shouldBe Right(expectedWithDecimalConversion)

val expectedWithFieldNamesConversion =
anEmsSinkConfig.copy(preConversionConfig =
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = true),
)
val propertiesWithFieldNamesConversion = propertiesFromConfig(expectedWithFieldNamesConversion)
parseProperties(propertiesWithFieldNamesConversion) shouldBe Right(expectedWithFieldNamesConversion)

val expectedWithAllConversions =
anEmsSinkConfig.copy(preConversionConfig =
PreConversionConfig(convertDecimalsToFloat = true, convertFieldsToLowercase = true),
)
val propertiesWithAllConversions = propertiesFromConfig(expectedWithAllConversions)
parseProperties(propertiesWithAllConversions) shouldBe Right(expectedWithAllConversions)

val expectedWithoutConversions =
anEmsSinkConfig.copy(preConversionConfig =
PreConversionConfig(convertDecimalsToFloat = false, convertFieldsToLowercase = false),
)
val propertiesWithoutConversion = propertiesFromConfig(expectedWithoutConversions)
parseProperties(propertiesWithoutConversion) shouldBe Right(expectedWithoutConversions)
}

test(s"returns an error if AUTHORIZATION_KEY is missing") {
Expand Down Expand Up @@ -240,23 +261,24 @@ class EmsSinkConfigTest extends AnyFunSuite with Matchers {
}

private def propertiesFromConfig(config: EmsSinkConfig): Map[String, _] = Map(
"name" -> config.sinkName,
ENDPOINT_KEY -> config.url.toString,
TARGET_TABLE_KEY -> config.target,
AUTHORIZATION_KEY -> config.authorization.header,
ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString,
COMMIT_SIZE_KEY -> config.commitPolicy.fileSize,
COMMIT_INTERVAL_KEY -> config.commitPolicy.interval,
COMMIT_RECORDS_KEY -> config.commitPolicy.records,
ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval,
ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries,
TMP_DIRECTORY_KEY -> config.workingDir.toString,
PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","),
CONNECTION_ID_KEY -> config.connectionId.get,
ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull,
FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull,
DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat,
FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined,
"name" -> config.sinkName,
ENDPOINT_KEY -> config.url.toString,
TARGET_TABLE_KEY -> config.target,
AUTHORIZATION_KEY -> config.authorization.header,
ERROR_POLICY_KEY -> config.errorPolicyConfig.policyType.toString,
COMMIT_SIZE_KEY -> config.commitPolicy.fileSize,
COMMIT_INTERVAL_KEY -> config.commitPolicy.interval,
COMMIT_RECORDS_KEY -> config.commitPolicy.records,
ERROR_RETRY_INTERVAL -> config.errorPolicyConfig.retryConfig.interval,
ERROR_POLICY_RETRIES_KEY -> config.errorPolicyConfig.retryConfig.retries,
TMP_DIRECTORY_KEY -> config.workingDir.toString,
PRIMARY_KEYS_KEY -> config.primaryKeys.mkString(","),
CONNECTION_ID_KEY -> config.connectionId.get,
ORDER_FIELD_NAME_KEY -> config.orderField.name.orNull,
FALLBACK_VARCHAR_LENGTH_KEY -> config.fallbackVarCharLengths.orNull,
DECIMAL_CONVERSION_KEY -> config.preConversionConfig.convertDecimalsToFloat,
TRANSFORM_FIELDS_LOWERCASE_KEY -> config.preConversionConfig.convertFieldsToLowercase,
FLATTENER_ENABLE_KEY -> config.flattenerConfig.isDefined,
FLATTENER_DISCARD_COLLECTIONS_KEY -> config.flattenerConfig.map(_.discardCollections).getOrElse(
FLATTENER_DISCARD_COLLECTIONS_DEFAULT,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class EmsSinkTaskObfuscationTest extends AnyFunSuite with Matchers with WorkingD
UnproxiedHttpClientConfig(defaultPoolingConfig),
ExplodeConfig.None,
OrderFieldConfig(Some(EmbeddedKafkaMetadataFieldInserter.CelonisOrderFieldName)),
PreConversionConfig(convertDecimalsToFloat = false),
PreConversionConfig(false, false),
None,
embedKafkaMetadata = false,
useInMemoryFileSystem = false,
Expand Down
Loading

0 comments on commit 81156dc

Please sign in to comment.