Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use custom streams implementation to simplify error handling, writing to topics and serde configuration #265

Draft
wants to merge 10 commits into
base: master
Choose a base branch
from

Conversation

philipp94831
Copy link
Member

@philipp94831 philipp94831 commented Jan 6, 2025

New ImproveKStream interface adds new methods to simplify topology creation. It inherits from KStream and therefore no changes to existing code are required.

Writing to topics

Before

final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());

After

final ImprovedKStream<String, String> input = builder.streamInput();
input.toOutputTopic();

Configuring serdes

Before

final Preconfigured<Serde<SpecificRecord>> configuredSerde = Preconfigured.create(new SpecificAvroSerde<>());
final Serde<SpecificRecord> serde = builder.createConfigurator().configureForValues(configuredSerde);
final KStream<String, SpecificRecord> input = builder.streamInput(Consumed.with(null, serde));
input.to(builder.getTopics().getOutputTopic(), Produced.valueSerde(serde));

After

final Preconfigured<Serde<SpecificRecord>> serde = Preconfigured.create(new SpecificAvroSerde<>());
final ImprovedKStream<String, SpecificRecord> input = builder.streamInput(ConfiguredConsumed.valueSerde(serde));
input.toOutputTopic(ConfiguredProduced.valueSerde(serde));

Error handling

Before

final KStream<String, String> input = builder.streamInput();
final KStream<String, ProcessedValue<String, Integer>> processedInput =
        input.mapValues(ErrorCapturingValueMapper.captureErrors(value -> Integer.parseInt(value)));
final KStream<String, Integer> intStream = processedInput.flatMapValues(ProcessedValue::getValues);
intStream.to(builder.getTopics().getOutputTopic());
final KStream<String, DeadLetter> deadLetters = processedInput.flatMapValues(ProcessedValue::getErrors)
        .processValues(AvroDeadLetterConverter.asProcessor("Error parsing integers"));
deadLetters.to(builder.getTopics().getErrorTopic());

After

final ImprovedKStream<String, String> input = builder.streamInput();
final KErrorStream<String, String, String, Integer> processedInput =
        input.mapValuesCapturingErrors(value -> Integer.parseInt(value));
final ImprovedKStream<String, Integer> intStream = processedInput.values();
intStream.toOutputTopic();
final ImprovedKStream<String, DeadLetter> deadLetters = processedInput.errors()
        .processValues(AvroDeadLetterConverter.asProcessor("Error parsing integers"));
deadLetters.toErrorTopic();

@philipp94831 philipp94831 self-assigned this Jan 6, 2025
@philipp94831 philipp94831 changed the title Simplify writing to output topics Use custom streams implementation to simplify error handling, writing to topics and serde configuration Jan 27, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant