Skip to content

Commit

Permalink
Added a JSON payload formatter (#22)
Browse files Browse the repository at this point in the history
* Initial working version of JsonPayloadFormatter and unit tests

* make JsonPayloadFormatter Configurable

* Add key/value schema visibility control

* Use logging for formatter test output; configure with src/test/resources/simplelogger.properties

* Fix schema visibility

* Java import clean-up

* Test lambda function dumps event as json

* Add JsonPayloadFormatter description to README

* Restored connect node

* Use enum for visibility; updates from review comments

* Use overloaded methods

* Add support for batch in JsonPayloadFormatter

* Clean up Invocation payloads section

* Add integer, long, boolean key tests; remove some constants in tests as it actually made the code less understandable

* Cleaned up example avro schema

* Remove testing artifact names from example

* v1.0.0
  • Loading branch information
SgtPepperLHCB authored Sep 9, 2019
1 parent 0c9caca commit 0ece5c1
Show file tree
Hide file tree
Showing 15 changed files with 1,526 additions and 91 deletions.
103 changes: 95 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ _The `kafka-connect-lambda` connector has been tested with `connect-api:2.1.0` a

# Configuring

In addition to the standard [Kafka Connect connector configuration](https://kafka.apache.org/documentation/#connect_configuring) properties, the `kafka-connect-lambda` properties available:
In addition to the standard [Kafka Connect connector configuration](https://kafka.apache.org/documentation/#connect_configuring) properties, the `kafka-connect-lambda` properties available are:

| Property | Required | Default value | Description |
|:---------|:---------|:--------|:------------|
Expand All @@ -32,7 +32,21 @@ In addition to the standard [Kafka Connect connector configuration](https://kafk
| `retry.backoff.millis` | No | `500` | Time to append between invocation retries |
| `retries.max` | No | `5` | Maximum number of invocation retries |
| `topics` | Yes | | Comma-delimited Kafka topics names to sink |
| `payload.formatter.class` | No | `com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter` | Specifies the formatter to use. |
| `payload.formatter.key.schema.visibility` | No | `min` | Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter |
| `payload.formatter.value.schema.visibility` | No | `min` | Determines whether schema (if present) is included. Only applies to JsonPayloadFormatter |

## Formatters

The connector includes two `payload.formatter.class` implementations:

* `com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter`
* `com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter`

Including the full schema information in the invocation payload may result in very large messages. Therefore, use the `schema.visibility` key and value properties to control how much of the schema, if present, to include in the invocation payload: `none`, `min`, or `all` (default=`min`). These settings apply to the `JsonPayloadFormatter` only; The `PlainPayloadFormatter` always includes the `min` schema information.


## Configuration Examples
An example configuration represented as JSON data for use with the [Kafka Connect REST interface](https://docs.confluent.io/current/connect/references/restapi.html):

```json
Expand Down Expand Up @@ -65,18 +79,91 @@ By supplying `com.nordstrom.kafka.connect.auth.AWSAssumeRoleCredentialsProvider`

The default invocation payload is a JSON representation of a [SinkRecord](https://kafka.apache.org/21/javadoc/org/apache/kafka/connect/sink/SinkRecord.html) object, which contains the Kafka message in the `value` field. When `aws.lambda.batch.enabled` is `true`, the invocation payload is an array of these records.

Example payload:
## Avro schema

This simple schema record describes our "hello, world" message.


```
{
"type": "record",
"name": "Hello",
"doc": "An example Avro-encoded `Hello` message.",
"namespace": "com.nordstrom.kafka.example",
"fields": [
{
"name": "language",
"type": {
"type": "enum",
"name": "language",
"symbols": [ "ENGLISH", "FRENCH", "ITALIAN", "SPANISH"
]
}
},
{
"name": "greeting",
"type": "string"
}
]
}
```

### PlainPayloadFormatter

This example uses the following (partial) connector configuration which defaults to `payload.formatter=com.nordstrom.kafka.connect.formatters.PlainPayloadFormatter`:

```json
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false
```

Expected output:


```json
{
"key": "my_key",
"keySchemaName": null,
"value": "Struct{language=ENGLISH,greeting=hello, world}",
"valueSchemaName": "com.nordstrom.kafka.example.Hello",
"topic": "example-stream",
"partition": 1,
"offset": 0,
"key": "",
"timestamp": 1567723257583,
"timestampTypeName": "CreateTime"
}
```

### JsonPayloadFormatter

This example uses the following (partial) connector configuration with key and value schema visibility as `min` (the default):

```json
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
aws.lambda.batch.enabled=false
payload.formatter.class=com.nordstrom.kafka.connect.formatters.JsonPayloadFormatter
```

Expected output:

```json
{
"key": "my_key",
"keySchemaName": null,
"value": "hello world",
"valueSchemaName": "example-value",
"timestamp": 1564961567407,
"keySchemaVersion": null,
"value": {
"language": "ENGLISH",
"greeting": "hello, world"
},
"valueSchemaName": "com.nordstrom.kafka.example.Hello",
"valueSchemaVersion": "1",
"topic": "example-stream",
"partition": 1,
"offset": 0,
"timestamp": 1567723257583,
"timestampTypeName": "CreateTime"
}
```
Expand All @@ -102,7 +189,7 @@ To make sure our Lambda works, invoke it directly and view the result payload in
aws lambda invoke --function-name example-function --payload '{"value": "my example"}' result.txt
```

The function simply sends the `payload` back to you in `result.txt`.
The function simply sends the `payload` back to you in `result.txt` as serialized json.

Use the `describe-stacks` command to fetch the CloudFormation output value for `ExampleFunctionArn`, which we'll need later when setting up our connector configuration:

Expand All @@ -116,7 +203,7 @@ aws cloudformation describe-stacks --stack-name example-lambda-stack --query "St
mvn clean package
```

Once built, a `kafka-connect-lambda` uber-jar is in the `target/` directory.
Once built, a `kafka-connect-lambda` uber-jar is in the `target/plugin` directory.

## Run the connector using Docker Compose

Expand Down
3 changes: 2 additions & 1 deletion config/cloudformation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ Resources:
Role: !GetAtt 'ExampleFunctionRole.Arn'
Code:
ZipFile: |
import json
def handler(event, context):
print(f"hello, {event}")
print(json.dumps(event))
return event
ExampleFunctionRole:
Expand Down
6 changes: 4 additions & 2 deletions config/worker.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ bootstrap.servers=localhost:9092
plugin.path=target/plugin
offset.storage.file.filename=/tmp/connect.offsets

key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8080
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8080
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ services:
depends_on: [broker]
logging: { driver: none }

# NB: run connect locally in stand-alone mode to debug
connect:
image: confluentinc/cp-kafka-connect:5.1.3
ports:
Expand Down
44 changes: 41 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.nordstrom.kafka.connect.lambda</groupId>
<artifactId>kafka-connect-lambda</artifactId>
<version>1.0.4</version>
<version>1.1.0</version>

<name>kafka-connect-lambda</name>
<description>A Kafka Connect Connector for kafka-connect-lambda</description>
Expand All @@ -15,18 +15,48 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>

<kafka.connect-api.version>2.1.0</kafka.connect-api.version>
<slf4j.version>1.7.25</slf4j.version>

<kafka-connect.version>2.1.0</kafka-connect.version>
<!-- NB: must be consistent with version in kafka-connect -->
<jackson.version>2.9.6</jackson.version>
<aws-java-sdk.version>1.11.592</aws-java-sdk.version>
<junit.version>4.12</junit.version>
<mockito-core.version>2.28.2</mockito-core.version>
<google.guava.version>19.0</google.guava.version>
<jackson.version>2.9.6</jackson.version>
</properties>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>${kafka.connect-api.version}</version>
<version>${kafka-connect.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${kafka-connect.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-cbor</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
Expand Down Expand Up @@ -56,6 +86,12 @@
<version>${google.guava.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down Expand Up @@ -143,6 +179,8 @@
<exclude>org.apache.httpcomponents:*</exclude>
<exclude>org.apache.kafka:*</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>com.fasterxml.jackson.core:*</exclude>
<exclude>javax.ws.rs:*</exclude>
</excludes>
</artifactSet>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package com.nordstrom.kafka.connect.formatters;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.sink.SinkRecord;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;

public class JsonPayloadFormatter implements PayloadFormatter, Configurable {
enum SchemaVisibility {
ALL,
MIN,
NONE
}

private final ObjectWriter recordWriter = new ObjectMapper().writerFor(Payload.class);
private final ObjectWriter recordsWriter = new ObjectMapper().writerFor(Payload[].class);
private final JsonConverter converter = new JsonConverter();
private final JsonConverter converterSansSchema = new JsonConverter();
private final JsonDeserializer deserializer = new JsonDeserializer();
private SchemaVisibility keySchemaVisibility = SchemaVisibility.MIN;
private SchemaVisibility valueSchemaVisibility = SchemaVisibility.MIN;

public JsonPayloadFormatter() {
converter.configure(emptyMap(), false);

Map<String, String> configs = new HashMap<>();
configs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
converterSansSchema.configure(configs, false);

deserializer.configure(emptyMap(), false);
}

@Override
public void configure(Map<String, ?> configs) {
keySchemaVisibility = configureSchemaVisibility(configs, "formatter.key.schema.visibility");
valueSchemaVisibility = configureSchemaVisibility(configs, "formatter.value.schema.visibility");
}

private SchemaVisibility configureSchemaVisibility(final Map<String, ?> configs, final String key) {
SchemaVisibility viz = SchemaVisibility.MIN;
final Object visibility = configs.get(key);
if (visibility != null) {
switch (visibility.toString()) {
case "all":
viz = SchemaVisibility.ALL;
break;
case "min":
viz = SchemaVisibility.MIN;
break;
case "none":
viz = SchemaVisibility.NONE;
break;
}
}

return viz;
}

public String format(final SinkRecord record) {
try {
return recordWriter.writeValueAsString(recordToPayload(record));
} catch (JsonProcessingException e) {
throw new PayloadFormattingException(e);
}
}

public String format(final Collection<SinkRecord> records) {
final Payload[] payloads = records
.stream()
.map(this::recordToPayload)
.toArray(Payload[]::new);

try {
return recordsWriter.writeValueAsString(payloads);
} catch (final JsonProcessingException e) {
throw new PayloadFormattingException(e);
}
}

private Payload<Object, Object> recordToPayload(final SinkRecord record) {
Object deserializedKey;
Object deserializedValue;
if (record.keySchema() == null) {
deserializedKey = record.key();
} else {
deserializedKey = deserialize(keySchemaVisibility, record.topic(), record.keySchema(), record.key());
}
if (record.valueSchema() == null) {
deserializedValue = record.value();
} else {
deserializedValue = deserialize(valueSchemaVisibility, record.topic(), record.valueSchema(), record.value());
}

Payload<Object, Object> payload = new Payload<>(record);
payload.setKey(deserializedKey);
payload.setValue(deserializedValue);
if (keySchemaVisibility == SchemaVisibility.NONE) {
payload.setKeySchemaName(null);
payload.setKeySchemaVersion(null);
}
if (valueSchemaVisibility == SchemaVisibility.NONE) {
payload.setValueSchemaName(null);
payload.setValueSchemaVersion(null);
}

return payload;
}

private JsonNode deserialize(final SchemaVisibility schemaVisibility, final String topic, final Schema schema, final Object value) {
if (schemaVisibility == SchemaVisibility.ALL) {
return deserializer.deserialize(topic, converter.fromConnectData(topic, schema, value));
} else {
return deserializer.deserialize(topic, converterSansSchema.fromConnectData(topic, schema, value));
}
}

}
Loading

0 comments on commit 0ece5c1

Please sign in to comment.