diff --git a/InsertSourcePartitionOrOffsetValue.md b/InsertSourcePartitionOrOffsetValue.md new file mode 100644 index 0000000..fcf72bc --- /dev/null +++ b/InsertSourcePartitionOrOffsetValue.md @@ -0,0 +1,88 @@ +# InsertSourcePartitionOrOffsetValue + +## Description + +The `InsertSourcePartitionOrOffsetValue` transformation in Kafka Connect allows you to insert headers into SourceRecords based on partition or offset values. This is useful for adding metadata to your data records before they are sent to destinations like AWS S3, Azure Datalake, or GCP Storage. + +## Note + +This SMT only works with source connectors. + +## Configuration + +To use this transformation, you need to configure it in your Kafka Connect connector properties. + +### Configuration Properties + +| Configuration Property | Description | Optionality | Default Value | +|------------------------|---------------------------------------------------------------|-------------|----------------| +| `offset.fields` | Comma-separated list of fields to retrieve from the offset | Optional | Empty list | +| `offset.prefix` | Optional prefix for offset keys | Optional | `"offset."` | +| `partition.fields` | Comma-separated list of fields to retrieve from the partition | Required | Empty list | +| `partition.prefix` | Optional prefix for partition keys | Optional | `"partition."` | + +- **Default Value**: Specifies the default value assigned if no value is explicitly provided in the configuration. + +These properties allow you to customize which fields from the offset and partition of a SourceRecord are added as headers, along with specifying optional prefixes for the header keys. Adjust these configurations based on your specific use case and data requirements. + +### Example Configuration + +```properties +transforms=InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts +transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix +``` + +### Explanation of Configuration + +- `transforms`: This property lists the transformations to be applied to the records. +- `transforms.InsertSourcePartitionOrOffsetValue.type`: Specifies the class implementing the transformation (`InsertSourcePartitionOrOffsetValue` in this case). +- `transforms.InsertSourcePartitionOrOffsetValue.offset.fields`: Defines the fields from the offset to be inserted as headers in the SourceRecord. Replace `path,line,ts` with the actual field names you want to extract from the offset. +- `transforms.InsertSourcePartitionOrOffsetValue.partition.fields`: Defines the fields from the partition to be inserted as headers in the SourceRecord. Replace `container,prefix` with the actual field names you want to extract from the partition. + +## Example Usage with Cloud Connectors + +### AWS S3, Azure Datalake or GCP Storage + +When using this transformation with AWS S3, you can configure your Kafka Connect connector as follows: + +```properties +transforms=InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts +transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix +``` + +To customise the header prefix you can also set the header values: + +Replace `path,line,ts` and `container,prefix` with the actual field names you are interested in extracting from the partition or offset. + +By using `InsertSourcePartitionOrOffsetValue` transformation, you can enrich your data records with additional metadata headers based on partition or offset values before they are delivered to your cloud storage destinations. + + +### Using the Prefix Feature in InsertSourcePartitionOrOffsetValue Transformation + +The prefix feature in `InsertSourcePartitionOrOffsetValue` allows you to prepend a consistent identifier to each header key added based on partition or offset values from SourceRecords. + +#### Configuration + +Configure the transformation in your Kafka Connect connector properties: + +```properties +transforms=InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.type=io.lenses.connect.smt.header.InsertSourcePartitionOrOffsetValue +transforms.InsertSourcePartitionOrOffsetValue.offset.fields=path,line,ts +transforms.InsertSourcePartitionOrOffsetValue.offset.prefix=offset. +transforms.InsertSourcePartitionOrOffsetValue.partition.fields=container,prefix +transforms.InsertSourcePartitionOrOffsetValue.partition.prefix=partition. +``` + +- `offset.prefix`: Specifies the prefix for headers derived from offset values. Default is `"offset."`. +- `partition.prefix`: Specifies the prefix for headers derived from partition values. Default is `"partition."`. + +#### Example Usage + +By setting `offset.prefix=offset.` and `partition.prefix=partition.`, headers added based on offset and partition fields will have keys prefixed accordingly in the SourceRecord headers. + +This configuration ensures clarity and organization when inserting metadata headers into your Kafka records, distinguishing them based on their source (offset or partition). Adjust prefixes (`offset.prefix` and `partition.prefix`) as per your naming conventions or requirements. \ No newline at end of file diff --git a/README.md b/README.md index 27fe326..600848f 100644 --- a/README.md +++ b/README.md @@ -42,14 +42,33 @@ To check the code style run: mvn checkstyle:check ``` +## Formatter + To format the code run: ```bash mvn com.coveo:fmt-maven-plugin:format ``` +## License Headers + To add license header, run: ```bash mvn license:format -``` \ No newline at end of file +``` + +## Dependency Check + +To run the dependency check: + +1. **Getting NVD API Key:** + - Visit the [National Vulnerability Database (NVD) Website](https://nvd.nist.gov), sign up or log in to obtain your API key. + +2. **Setting NVD API Key in Environment Variable:** + - **Mac (zsh):** Add `export NVD_API_KEY=your_api_key_here` to `~/.zshrc`. + - **Windows:** Use `setx NVD_API_KEY "your_api_key_here"` in Command Prompt (Admin). + - **Linux (bash):** Add `export NVD_API_KEY=your_api_key_here` to `~/.bashrc`. + +3. **Running `mvn verify` for Dependency Check Plugin:** + - Run `mvn verify` to execute it. This will put a `dependency-check-report.html` in the target directory which will give you details of the scan result. \ No newline at end of file diff --git a/pom.xml b/pom.xml index 8b994ce..6b463d2 100644 --- a/pom.xml +++ b/pom.xml @@ -30,10 +30,10 @@ jar - 8 - 8 + 11 + 11 UTF-8 - 1.8 + 11 3.7.0 ${project.basedir}/checkstyle/java.header @@ -165,6 +165,34 @@ + + + org.owasp + dependency-check-maven + 9.2.0 + + 5 + NVD_API_KEY + false + + + ${basedir}/target + + kafka-connect-smt-*.jar + + + + + + + check-dependencies + verify + + check + + + + diff --git a/src/main/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValue.java b/src/main/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValue.java new file mode 100644 index 0000000..0249fc0 --- /dev/null +++ b/src/main/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValue.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.Transformation; + +/** + * A Kafka Connect transformation that inserts headers based on partition or offset values from + * SourceRecords. + */ +public class InsertSourcePartitionOrOffsetValue implements Transformation { + + /** Default prefix for offset headers. */ + public static final String DEFAULT_PREFIX_OFFSET = "offset."; + + /** Default prefix for partition headers. */ + public static final String DEFAULT_PREFIX_PARTITION = "partition."; + + Configuration offsetConfig; + Configuration partitionConfig; + + /** Internal class to hold configuration details for fields and prefixes. */ + static class Configuration { + + private final List fields; + private final String prefix; + + /** + * Constructs a Configuration instance. + * + * @param fields List of fields to retrieve. + * @param prefix Prefix to prepend to each field. + */ + public Configuration(final List fields, final String prefix) { + this.fields = fields; + this.prefix = prefix; + } + + /** + * Retrieves the list of fields. + * + * @return List of fields. + */ + public List getFields() { + return fields; + } + + /** + * Retrieves the prefix. + * + * @return Prefix. + */ + public String getPrefix() { + return prefix; + } + } + + private static final String KEY_PARTITION_FIELDS = "partition.fields"; + private static final String KEY_PARTITION_PREFIX = "partition.prefix"; + + private static final String KEY_OFFSET_FIELDS = "offset.fields"; + private static final String KEY_OFFSET_PREFIX = "offset.prefix"; + + @Override + public SourceRecord apply(SourceRecord sourceRecord) { + addHeadersFromConfig(offsetConfig, sourceRecord, sourceRecord.sourceOffset()); + addHeadersFromConfig(partitionConfig, sourceRecord, sourceRecord.sourcePartition()); + return sourceRecord; + } + + private void addHeadersFromConfig( + Configuration offsetConfig, SourceRecord sourceRecord, Map partitionOrOffsetMap) { + offsetConfig + .getFields() + .forEach( + f -> + sourceRecord + .headers() + .addString(offsetConfig.getPrefix() + f, (String) partitionOrOffsetMap.get(f))); + } + + @Override + public ConfigDef config() { + return new ConfigDef() + .define( + KEY_OFFSET_FIELDS, + ConfigDef.Type.LIST, + Collections.emptyList(), + ConfigDef.Importance.HIGH, + "Comma-separated list of fields to retrieve from the offset") + .define( + KEY_OFFSET_PREFIX, + ConfigDef.Type.STRING, + DEFAULT_PREFIX_OFFSET, + ConfigDef.Importance.LOW, + "Optional prefix for offset keys") + .define( + KEY_PARTITION_FIELDS, + ConfigDef.Type.LIST, + Collections.emptyList(), + ConfigDef.Importance.HIGH, + "Comma-separated list of fields to retrieve from the partition") + .define( + KEY_PARTITION_PREFIX, + ConfigDef.Type.STRING, + DEFAULT_PREFIX_PARTITION, + ConfigDef.Importance.LOW, + "Optional prefix for partition keys"); + } + + @Override + public void close() { + // nothing to close + } + + @Override + public void configure(Map map) { + offsetConfig = + new Configuration( + getFields(map, KEY_OFFSET_FIELDS), + getPrefix(map, KEY_OFFSET_PREFIX, DEFAULT_PREFIX_OFFSET)); + partitionConfig = + new Configuration( + getFields(map, KEY_PARTITION_FIELDS), + getPrefix(map, KEY_PARTITION_PREFIX, DEFAULT_PREFIX_PARTITION)); + } + + private static String getPrefix(Map map, String prefix, String defaultPrefix) { + return Optional.ofNullable((String) map.get(prefix)).orElse(defaultPrefix); + } + + private List getFields(Map map, String offsetFields) { + return Optional.ofNullable(map.get(offsetFields)).stream() + .map(p -> extractList(offsetFields, p)) + .flatMap(p -> p.stream().map(Object::toString)) + .collect(Collectors.toList()); + } + + private static List extractList(String offsetFields, Object p) { + if (p instanceof List) { + return ((List) p); + } else if (p instanceof String) { + var split = ((String) p).split(","); + return Arrays.asList(split); + } else { + throw new IllegalStateException( + offsetFields + " should be a List but they are a " + p.getClass().getName()); + } + } +} diff --git a/src/test/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValueTest.java b/src/test/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValueTest.java new file mode 100644 index 0000000..588e3e9 --- /dev/null +++ b/src/test/java/io/lenses/connect/smt/header/InsertSourcePartitionOrOffsetValueTest.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at: http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable + * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" + * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License + * for the specific language governing permissions and limitations under the License. + */ +package io.lenses.connect.smt.header; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class InsertSourcePartitionOrOffsetValueTest { + + private InsertSourcePartitionOrOffsetValue transform; + + @BeforeEach + public void setUp() { + transform = new InsertSourcePartitionOrOffsetValue(); + } + + @Test + void testApplyAddsOffsetHeaders() { + Map sourceOffset = + Map.of( + "field1", "value1", + "field2", "value2"); + + SourceRecord record = + new SourceRecord( + null, + sourceOffset, + "test-topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.STRING_SCHEMA, + "value", + 0L, + new ConnectHeaders()); + + Map config = + Map.of( + "offset.fields", + List.of("field1", "field2"), + "offset.prefix", + "offset.", + "partition.fields", + Collections.emptyList(), + "partition.prefix", + ""); + + transform.configure(config); + + SourceRecord transformedRecord = transform.apply(record); + + assertEquals("value1", transformedRecord.headers().lastWithName("offset.field1").value()); + assertEquals("value2", transformedRecord.headers().lastWithName("offset.field2").value()); + } + + @Test + void testApplyAddsPartitionHeaders() { + Map sourcePartition = + Map.of( + "partField1", "partValue1", + "partField2", "partValue2"); + + SourceRecord record = + new SourceRecord( + sourcePartition, + null, + "test-topic", + 0, + Schema.STRING_SCHEMA, + "key", + Schema.STRING_SCHEMA, + "value", + 0L, + new ConnectHeaders()); + + Map config = + Map.of( + "partition.fields", + List.of("partField1", "partField2"), + "partition.prefix", + "partition..", + "offset.fields", + Collections.emptyList(), + "offset.prefix", + ""); + + transform.configure(config); + + SourceRecord transformedRecord = transform.apply(record); + + assertEquals( + "partValue1", transformedRecord.headers().lastWithName("partition..partField1").value()); + assertEquals( + "partValue2", transformedRecord.headers().lastWithName("partition..partField2").value()); + } + + @Test + void testConfigure() { + Map config = + Map.of( + "offset.fields", + List.of("field1", "field2"), + "offset.prefix", + "offset.", + "partition.fields", + List.of("partField1", "partField2"), + "partition.prefix", + "partition.."); + + transform.configure(config); + + assertEquals(List.of("field1", "field2"), transform.offsetConfig.getFields()); + assertEquals("offset.", transform.offsetConfig.getPrefix()); + assertEquals(List.of("partField1", "partField2"), transform.partitionConfig.getFields()); + assertEquals("partition..", transform.partitionConfig.getPrefix()); + } + + @Test + void testClose() { + try (InsertSourcePartitionOrOffsetValue transform = new InsertSourcePartitionOrOffsetValue()) { + + // Since the close method does nothing, we simply ensure it runs without errors + assertDoesNotThrow(transform::close); + } + } +}