Skip to content

A collection of common utilities and reusable classes for Kafka Streams applications.

License

Notifications You must be signed in to change notification settings

streamthoughts/kc4streams

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

9 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Java CI Build kc4streams kc4streams kc4streams kc4streams badge

KC4Streams: Kafka Commons for Streams

A collection of common utilities and reusable classes for Kafka Streams applications.

Description

KC4Streams (which stands for Kafka Commons for Streams) is a simple Java library that provides utility classes and standard implementations for most of the Kafka Streams pluggable interfaces.

Built With

KC4 Streams is built with the following dependencies:

  • Java 17

  • Kafka Streams (>=3.1.x)

Getting Started ?

Add the following dependency to your project :

Maven
<dependency>
    <groupId>io.streamthoughts</groupId>
    <artifactId>kc4streams</artifactId>
    <scope>${kc4streams.version}</scope>
</dependency>
Gradle
implementation group: 'io.streamthoughts', name: 'kc4streams', version: '1.0.0'

Usage ?

Error handling

KafkaStreams allows you to register handler classes to specify how an exception should be handled.

Here is the three interfaces that you can implement and configure :

  • ProductionExceptionHandler: Specifies how an exception when attempting to produce a result to Kafka should be handled.

  • DeserializationExceptionHandler: Specifies how an exception when attempting to deserialize an input record should be handled.

  • StreamsUncaughtExceptionHandler: Specifies how an exception when processing a record should be handled.

For example, here is how you can set a custom deserialization exception :

clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogAndContinueExceptionHandler.class.getName());

By default, KafkaStreams only provides a few built-in implementations for those interface that are not sufficient for a production usage.

Kafka Streams and the DLQ (Dead Letter Queue)

In addition to the built-in exception handlers that Kafka Streams provides, KC4Streams implement custom handlers that let you send the record to a special Kafka topics acting as a DLQ.

Here is how you can configure them:

// Handling production exception
clientProps.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());

// Handling deserialization exception
clientProps.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, DeadLetterTopicProductionExceptionHandler.class.getName());

// Handling stream uncautch exception
var client = new KafkaStreams(buildTopology(), new StreamsConfig(clientProps));
client.setUncaughtExceptionHandler(new DeadLetterTopicStreamUncaughtExceptionHandler(clientProps));

All the exception handlers can be configured with some default properties:

Property

Description

Type

Default

exception.handler.dlq.default.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.default.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.default.return-fail-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.default.return-continue-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.default.headers.<key>

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Setting the default DLQ name

By default, DLQ handlers will send records in error to a dedicated topic named:

  • <[source|sink]-topic-name>.<dlq-suffix>.<application-id> (e.g., input-topic-1.error.my-streaming-application)

Property

Description

Type

Default

exception.handler.dlq.default.topic-suffix

Specifies the suffix to be used for naming the DLQ (optional)

string

error.

exception.handler.dlq.default.topic-name

Specifies the name of the DLQ to be used (optional)

string

error.

exception.handler.dlq.default.topic-per-application-id

Specifies whether the application-id for Kafka Streams should be used for naming the DLQ.

boolean

true.

In addition, you can implement custom DLQTopicNameExtractor class, as follows:

class CustomDLQTopicNameExtractor implements DeadLetterTopicNameExtractor {
    public String extract(final byte[] key, final byte[] value, final FailedRecordContext recordContext) {
        return recordContext.topic() + "-DLQ";
    }
}

Then, you can configure that custom `DeadLetterTopicNameExtractor as follows:

clientProps.put(DLQExceptionHandlerConfig.DLQ_DEFAULT_TOPIC_NAME_EXTRACTOR_CONFIG, CustomDLQTopicNameExtractor.class.getName());
Handling Production Exceptions

The DLQProductionExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.production.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.production.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.production.return-fail-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.production.return-continue-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.production.headers.<key>

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Deserialization Exceptions

The DLQDeserializationExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.deserialization.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.deserialization.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.deserialization.return-fail-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.deserialization.return-continue-on-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.deserialization.headers.<key>

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Stream Uncaught Exceptions

The DLQStreamUncaughtExceptionHandler configuration can be overridden with those following properties.

Configuration

Property

Description

Type

Default

exception.handler.dlq.streams.topic-extractor

Specifies the fully-classified name of the class to be used for extracting the name of dead-letter topic

class

DefaultDLQTopicNameExtractor

exception.handler.dlq.streams.reponse

The default response that must be returned by the handler [FAIL|CONTINUE]

string

exception.handler.dlq.streams.return-fail-errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must fail

List

exception.handler.dlq.streams.continue.errors

Specifies the comma-separated list of FQCN of the exceptions on which the handler must continue.

exception.handler.dlq.streams.headers.<key>

Specifies the value of a custom record-header to be added to the corrupted record send into the dead-letter topic.

string

Handling Processing Exceptions

All the exception handlers that we discussed above internally used a singleton instance of the DLQRecordCollector class to send records to dedicated DLQs with contextual information about the errors.

The DLQRecordCollector accepts the following config properties for specifying, for example, whether DLQs topic should be automatically created

Configuration

Property

Description

Type

Default

exception.handler.dlq.global.producer.<config>

Specifies the Producer’s config properties to override

-

-

exception.handler.dlq.global.admin.<config>

Specifies the AdminClient’s config properties to override

-

-

exception.handler.dlq.topics.auto-create-enabled

Specifies whether missing DLQ topics should be automatically created.

string

true

exception.handler.dlq.topics.num-partitions

Specifies the number of partitions to be used for DLQ topics.

integer

-1

exception.handler.dlq.topics.replication-factors

Specifies the replication factor to be used for DLQ topics.

short

-1

Usage
// Create KafkaStreams client configuration
Map<String, Object> streamsConfigs = new HashMap<>();

// Initialize the GlobalDeadLetterTopicCollector.
DLQRecordCollector.getOrCreate(streamsConfigs);

// Create a Kafka Stream Topology
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, String> stream = streamsBuilder.stream(INPUT_TOPIC);
stream.mapValues((key, value) -> {
    Long output = null;
    try {
        output = Long.parseLong(value);
    } catch (Exception e) {
        // Sends the corrupted-record to a DLQ
        DLQRecordCollector.get().send(
                INPUT_TOPIC + "-DLQ",
                key,
                value,
                Serdes.String().serializer(),
                Serdes.String().serializer(),
                Failed.withProcessingError((String) streamsConfigs.get(StreamsConfig.APPLICATION_ID_CONFIG), e)
        );
    }
    return output;
});

Recording the failure reason using message headers

Each message sent to a DLQ is enriched with headers containing information about the reason for the message’s rejection.

Here’s the list of headers:

Header

Description

__streams.errors.topic

The topic of the record in error.

__streams.errors.partition

The partition of the record in error.

__streams.errors.offset

The offset of the record in error (empty for production error).

__streams.errors.timestamp

The epoch-timestamp of the error.

__streams.errors.stage

The stage of the error [DESERIALIZATION | PRODUCTION | PROCESSING | STREAMS]

__streams.errors.exception.message

The exception message

__streams.errors.exception.class.name

The exception class name

__streams.errors.exception.stacktrace

The exception stacktrace

__streams.errors.application.id

The stream application id.

SafeDeserializer & SafeSerde

Another solution for dealing with deserialization exception is to return a sentinel-value (e.g. null, "N/A", -1) when a corrupted-record (a.k.a. poison-pill) is handle by a Kafka Deserializer.

The SafeDeserializer can be used to wrap an existing Deserializer to catch any Exception that may be thrown when attempting to deserialize a record and return a configured (or default) value.

Creating a SafeDeserializer
SafeDeserializer deserializer = new SafeDeserializer<>(
    new GenericAvroSerde().deserializer(), // the delegating deserializer
    (GenericRecord)null     			   // the sentinel-object to return when an exception is catch
);
Configuring a SafeDeserializer
SafeDeserializer<Double> deserializer = new SafeDeserializer<>(
    Serdes.Double().deserializer(), // the delegating deserializer
    Double.class    		        // the value type
);

Map<String, Object> configs = new HashMap<>();
configs.put(SafeDeserializerConfig.SAFE_DESERIALIZER_DEFAULT_VALUE_CONFIG, 0.0);
deserializer.configure(configs, false);

In addition, you can use the SafeSerde utility class that allows wrapping an existing Serde or Deserializer.

Behind the scene, SafeSerde uses the SafeDeserializer for wrapping existing Deserializer.

Serde<String> stringSerde = SafeSerdes.Double();
// or
SafeSerdes.serdeFrom(Serdes.String(), 0.0);

RocksDB

How to tune internal RocksDB state stores ?

KafkaStreams relies on RocksDB an embedded key-value store to provided persistent storage. Depending on the throughput of your application, you may want to tune internal RocksDB instances. Kafka Streams allows you to customize the RocksDB settings for a given Store by implementing the interface org.apache.kafka.streams.state.RocksDBConfigSetter.

The custom implementation must then be configured using :

streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, CustomRocksDBConfig.class);

KC4Streams provides a built-in io.streamthoughts.kc4streams.rocksdb.StreamsRocksDBConfigSetter that allows overriding not only some default RocksDB options but also to enable log statistics, for performance debugging, and shared memory usage.

Configuration

Property

Description

Type

Default

rocksdb.stats.enable

Enable RocksDB statistics

boolean

-

rocksdb.stats.dump.period.sec

Specifies the RocksDB statistics dump period in seconds.

integer

-

rocksdb.log.dir

Specifies the RocksDB log directory

`string

rocksdb.log.level

Specifies the RocksDB log level (see org.rocksdb.InfoLogLevel).

string

-

rocksdb.log.max.file.size

Specifies the RocksDB maximum log file size.

integer

-

rocksdb.max.write.buffer.number

Specifies the maximum number of memtables build up in memory before they flush to SST files.

integer

rocksdb.write.buffer.size

Specifies the size of a single memtable.

long

-

rocksdb.memory.managed

Enable automatic memory management across all RocksDB instances.

boolean

false

rocksdb.memory.write.buffer.ratio

Specifies the ratio of total cache memory which will be reserved for write buffer manager. This property is only used when rocksdb.memory.managed is set to true.

double

0.5

rocksdb.memory.high.prio.pool.ratio

Specifies the ratio of cache memory that is reserved for high priority blocks (e.g.: indexes filters and compressions blocks).

double

0.1

rocksdb.memory.strict.capacity.limit

Create a block cache with strict capacity limit (i.e. insert to the cache will fail when cache is full). This property is only used when rocksdb.memory.managed is set to true or rocksdb.block.cache.size is set.

boolean

false

rocksdb.block.cache.size

Specifies the total size to be used for caching uncompressed data blocks.

long

false

rocksdb.compaction.style

Specifies the compaction style.

string

-

rocksdb.compression.type

Specifies the compression type.

string

-

rocksdb.files.open

Specifies the maximum number of open files that can be used per RocksDB instance.

long

-

rocksdb.max.background.jobs

Specifies the maximum number of concurrent background jobs (both flushes and compactions combined).

integer

-

Example
var streamsConfig = new HashMap<String, Object>();
streamsConfig.put(StreamsConfig.ROCKSDB_CONFIG_SETTER_CLASS_CONFIG, StreamsRocksDBConfigSetter.class);
streamsConfig.put(RocksDBConfig.ROCKSDB_MEMORY_MANAGED_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_STATS_ENABLE_CONFIG, true);
streamsConfig.put(RocksDBConfig.ROCKSDB_LOG_DIR_CONFIG, "/tmp/rocksdb-logs");
Note
Please read the official documentation for more information: RocksDB Tuning Guide

StateListener

KafkaStreams allows you to register a StateRestoreListener for listening to various states of the restoration process of a StateStore.

You can set the LoggingStateRestoreListener implementation for logging the restoration process.

Contribute to KC4Streams

We’re an active open source software community. We welcome and value contributions from everyone. Any feedback, bug reports and PRs are greatly appreciated!

Talk to us

To talk with our community about development related topics:

  • Open an issue on GitHub for questions, improvement suggestions or anything related to the use of KC4Streams.

Issue Tracker

We use GitHub to track all code related issues: https://github.com/streamthoughts/kc4streams/issues.

Development

To build this project (using Maven Wrapper)

./mvwn clean package

Licence

Copyright 2022 StreamThoughts.

Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License