A collection of common utilities and reusable classes for Kafka Streams applications.
KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.
Add the following dependency to your project :
- Maven
<dependency>
<groupId>io.streamthoughts</groupId>
<artifactId>kc4streams</artifactId>
<scope>${kc4streams.version}</scope>
</dependency>
- Gradle
implementation group: 'io.streamthoughts', name: 'kc4streams', version: '1.0.0'
KafkaStreams allows you to register handler classes to specify how an exception should be handled.
Here is the three interfaces that you can implement and configure :
-
ProductionExceptionHandler
: Specifies how an exception when attempting to produce a result to Kafka should be handled. -
DeserializationExceptionHandler
: Specifies how an exception when attempting to deserialize an input record should be handled. -
StreamsUncaughtExceptionHandler
: Specifies how an exception when processing a record should be handled.
For example, here is how you can set a custom deserialization exception :
clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());
By default, KafkaStreams only provides a few built-in implementations for those interface that are not sufficient for a production usage.
In addition to the built-in exception handlers that Kafka Streams provides, KC4Streams implement custom handlers that let you send the record to a special Kafka topics acting as a DLQ.
Here is how you can configure them:
// Handling production exception
clientProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());
// Handling deserialization exception
clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());
// Handling stream uncautch exception
var client = new KafkaStreams(buildTopology(), new StreamsConfig(clientProps));
client.setUncaughtExceptionHandler(new DeadLetterTopicStreamUncaughtExceptionHandler(clientProps));
All the exception handlers can be configured with some default properties:
Property |
Description |
Type |
Default |
|
Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic |
|
|
|
The default response that must be returned by the handler [FAIL|CONTINUE] |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue. |
||
|
Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic. |
|
By default, DLQ handlers will send records in error to a dedicated topic named:
-
<[source|sink]-topic-name>.<dlq-suffix>.<application-id>
(e.g.,input-topic-1.error.my-streaming-application
)
Property |
Description |
Type |
Default |
|
Specifies the suffix to be used for naming the DLQ (optional) |
|
|
|
Specifies the name of the DLQ to be used (optional) |
|
|
|
Specifies whether the application-id for Kafka Streams should be used for naming the DLQ. |
|
|
In addition, you can implement custom DLQTopicNameExtractor
class, as follows:
class CustomDLQTopicNameExtractor implements DeadLetterTopicNameExtractor {
public String extract(final byte[] key, final byte[] value, final FailedRecordContext recordContext) {
return recordContext.topic() + "-DLQ";
}
}
Then, you can configure that custom `DeadLetterTopicNameExtractor as follows:
clientProps.put(DLQExceptionHandlerConfig.DLQ_DEFAULT_TOPIC_NAME_EXTRACTOR_CONFIG, CustomDLQTopicNameExtractor.class.getName());
The DLQProductionExceptionHandler
configuration can be overridden with those following properties.
- Configuration
Property |
Description |
Type |
Default |
|
Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic |
|
|
|
The default response that must be returned by the handler [FAIL|CONTINUE] |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue. |
||
|
Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic. |
|
The DLQDeserializationExceptionHandler
configuration can be overridden with those following properties.
- Configuration
Property |
Description |
Type |
Default |
|
Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic |
|
|
|
The default response that must be returned by the handler [FAIL|CONTINUE] |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue. |
||
|
Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic. |
|
The DLQStreamUncaughtExceptionHandler
configuration can be overridden with those following properties.
- Configuration
Property |
Description |
Type |
Default |
|
Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic |
|
|
|
The default response that must be returned by the handler [FAIL|CONTINUE] |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail |
|
|
|
Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue. |
||
|
Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic. |
|
All the exception handlers that we discussed above internally used a singleton instance of the DLQRecordCollector
class
to send records to dedicated DLQs with contextual information about the errors.
The DLQRecordCollector
accepts the following config properties for specifying, for example, whether DLQs topic should be automatically created
- Configuration
Property |
Description |
Type |
Default |
|
Specifies the Producer’s config properties to override |
- |
- |
|
Specifies the AdminClient’s config properties to override |
- |
- |
|
Specifies whether missing DLQ topics should be automatically created. |
|
true |
|
Specifies the number of partitions to be used for DLQ topics. |
|
-1 |
|
Specifies the replication factor to be used for DLQ topics. |
|
-1 |
- Usage
// Create KafkaStreams client configuration
Map<String, Object> streamsConfigs = new HashMap<>();
// Initialize the GlobalDeadLetterTopicCollector.
DLQRecordCollector.getOrCreate(streamsConfigs);
// Create a Kafka Stream Topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> stream = streamsBuilder.stream(INPUT_TOPIC);
stream.mapValues((key, value) -> {
Long output = null;
try {
output = Long.parseLong(value);
} catch (Exception e) {
// Sends the corrupted-record to a DLQ
DLQRecordCollector.get().send(
INPUT_TOPIC + "-DLQ",
key,
value,
Serdes.String().serializer(),
Serdes.String().serializer(),
Failed.withProcessingError((String) streamsConfigs.get(StreamsConfig.APPLICATION_ID_CONFIG), e)
);
}
return output;
});
Each message sent to a DLQ is enriched with headers containing information about the reason for the message’s rejection.
Here’s the list of headers:
Header |
Description |
|
The topic of the record in error. |
|
The partition of the record in error. |
|
The offset of the record in error (empty for production error). |
|
The epoch-timestamp of the error. |
|
The stage of the error [ |
|
The exception message |
|
The exception class name |
|
The exception stacktrace |
|
The stream application id. |
Another solution for dealing with deserialization exception is to return a sentinel-value (e.g. null
, "N/A"
, -1
)
when a corrupted-record (a.k.a. poison-pill) is handle by a Kafka Deserializer
.
The SafeDeserializer
can be used to wrap an existing Deserializer
to catch any Exception
that may be thrown when
attempting to deserialize a record and return a configured (or default) value.
- Creating a SafeDeserializer
SafeDeserializer deserializer = new SafeDeserializer<>(
new GenericAvroSerde().deserializer(), // the delegating deserializer
(GenericRecord)null // the sentinel-object to return when an exception is catch
);
- Configuring a SafeDeserializer
SafeDeserializer<Double> deserializer = new SafeDeserializer<>(
Serdes.Double().deserializer(), // the delegating deserializer
Double.class // the value type
);
Map<String, Object> configs = new HashMap<>();
configs.put(SafeDeserializerConfig.SAFE_DESERIALIZER_DEFAULT_VALUE_CONFIG, 0.0);
deserializer.configure(configs, false);
In addition, you can use the SafeSerde
utility class that allows wrapping an existing Serde
or Deserializer
.
Behind the scene, SafeSerde
uses the SafeDeserializer
for wrapping existing Deserializer.
Serde<String> stringSerde = SafeSerdes.Double();
// or
SafeSerdes.serdeFrom(Serdes.String(), 0.0);
KafkaStreams relies on RocksDB an embedded key-value store to provided persistent storage. Depending on the throughput of your application, you may want to tune internal RocksDB instances.
Kafka Streams allows you to customize the RocksDB settings for a given Store by implementing the interface org.apache.kafka.streams.state.RocksDBConfigSetter
.
The custom implementation must then be configured using :
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);
KC4Streams provides a built-in io.streamthoughts.kc4streams.rocksdb.StreamsRocksDBConfigSetter
that allows
overriding not only some default RocksDB options but also to enable log statistics, for performance debugging, and shared memory usage.
- Configuration
Property |
Description |
Type |
Default |
|
Enable RocksDB statistics |
|
- |
|
Specifies the RocksDB statistics dump period in seconds. |
|
- |
|
Specifies the RocksDB log directory |
`string |
|
|
Specifies the RocksDB log level (see org.rocksdb.InfoLogLevel). |
|
- |
|
Specifies the RocksDB maximum log file size. |
|
- |
|
Specifies the maximum number of memtables build up in memory before they flush to SST files. |
|
|
|
Specifies the size of a single memtable. |
|
- |
|
Enable automatic memory management across all RocksDB instances. |
|
|
|
Specifies the ratio of total cache memory which will be reserved for write buffer manager. This property is only used when |
|
|
|
Specifies the ratio of cache memory that is reserved for high priority blocks (e.g.: indexes filters and compressions blocks). |
|
|
|
Create a block cache with strict capacity limit (i.e. insert to the cache will fail when cache is full). This property is only used when |
|
|
|
Specifies the total size to be used for caching uncompressed data blocks. |
|
|
|
Specifies the compaction style. |
|
- |
|
Specifies the compression type. |
|
- |
|
Specifies the maximum number of open files that can be used per RocksDB instance. |
|
- |
|
Specifies the maximum number of concurrent background jobs (both flushes and compactions combined). |
|
- |
- Example
var streamsConfig = new HashMap<String, Object>();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, StreamsRocksDBConfigSetter.class);
streamsConfig.put(RocksDBConfig.ROCKSDB_MEMORY_MANAGED_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_STATS_ENABLE_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_LOG_DIR_CONFIG, "/tmp/rocksdb-logs");
Note
|
Please read the official documentation for more information: RocksDB Tuning Guide |
We’re an active open source software community. We welcome and value contributions from everyone. Any feedback, bug reports and PRs are greatly appreciated!
To talk with our community about development related topics:
-
Open an issue on GitHub for questions, improvement suggestions or anything related to the use of KC4Streams.
We use GitHub to track all code related issues: https://github.com/streamthoughts/kc4streams/issues.
Copyright 2022 StreamThoughts.
Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License