Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use custom streams implementation to simplify error handling, writing to topics and serde configuration #265

Open
wants to merge 75 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
ab53ded
Simplify writing to output topics
philipp94831 Jan 6, 2025
59591d5
Simplify writing to output topics
philipp94831 Jan 6, 2025
322eb93
Add error handling
philipp94831 Jan 27, 2025
8f1b481
Merge remote-tracking branch 'origin/master' into feature/improved-ks…
philipp94831 Jan 27, 2025
d321b8c
Add error handling
philipp94831 Jan 27, 2025
5377422
Add error handling
philipp94831 Jan 27, 2025
a5f272e
Add error handling
philipp94831 Jan 27, 2025
0895122
Add error handling
philipp94831 Jan 27, 2025
103a20e
Add error handling
philipp94831 Jan 27, 2025
5780aef
Add error handling
philipp94831 Jan 27, 2025
bac7fe4
Merge remote-tracking branch 'origin/master' into feature/improved-ks…
philipp94831 Feb 13, 2025
4fe684f
Add JavaDoc
philipp94831 Feb 13, 2025
c0cdd70
Add docs
philipp94831 Feb 14, 2025
bb0ca14
Add docs
philipp94831 Feb 14, 2025
b89c2ac
Add docs
philipp94831 Feb 14, 2025
eb012c9
Add tests
philipp94831 Feb 14, 2025
2452aec
Add tests
philipp94831 Feb 14, 2025
dcdbe90
Add tests
philipp94831 Feb 14, 2025
bec9805
Add tests
philipp94831 Feb 14, 2025
21b013c
Add ConfiguredStreamJoined
philipp94831 Feb 14, 2025
0f401eb
Add tests
philipp94831 Feb 14, 2025
e22d8b2
Add tests
philipp94831 Feb 14, 2025
e529a2a
Add ConfiguredJoined
philipp94831 Feb 14, 2025
000360d
Add docs
philipp94831 Feb 14, 2025
efc1a69
Add docs
philipp94831 Feb 14, 2025
a3e2417
Add tests
philipp94831 Feb 14, 2025
0a55285
Add tests
philipp94831 Feb 14, 2025
8d7c664
Add docs
philipp94831 Feb 14, 2025
e2629d9
Rename
philipp94831 Feb 14, 2025
b6903cc
Rename
philipp94831 Feb 14, 2025
0088a57
Rename
philipp94831 Feb 14, 2025
f1d9198
Add docs
philipp94831 Feb 14, 2025
210c6d7
Rename
philipp94831 Feb 14, 2025
d53721a
Add docs
philipp94831 Feb 14, 2025
e687c3e
Add tests
philipp94831 Feb 14, 2025
10adc0c
Add tests
philipp94831 Feb 14, 2025
baefa30
Add tests
philipp94831 Feb 14, 2025
86935e4
Add methods to TopologyBuilder
philipp94831 Feb 14, 2025
2e0f96d
Extend Branched
philipp94831 Feb 14, 2025
b5b9f6d
Extend Branched
philipp94831 Feb 14, 2025
207c6b5
Extend Branched
philipp94831 Feb 14, 2025
104354d
Extend Branched
philipp94831 Feb 14, 2025
e696490
Rename and test
philipp94831 Feb 17, 2025
6bbbcc2
Update tests
philipp94831 Feb 17, 2025
3201fce
Add tests
philipp94831 Feb 17, 2025
8aaeac4
Add tests
philipp94831 Feb 17, 2025
67e2390
Add tests
philipp94831 Feb 17, 2025
23c382c
Add tests
philipp94831 Feb 17, 2025
0e4e9df
Add tests
philipp94831 Feb 17, 2025
d71cf6a
Add tests
philipp94831 Feb 17, 2025
6355003
Add tests
philipp94831 Feb 17, 2025
144f474
Add tests
philipp94831 Feb 17, 2025
34a9634
Add tests
philipp94831 Feb 17, 2025
e0bd7d9
Add tests
philipp94831 Feb 17, 2025
9ddefc8
Add tests
philipp94831 Feb 17, 2025
24b2b36
Add tests
philipp94831 Feb 17, 2025
8a7ab88
Add tests
philipp94831 Feb 17, 2025
62adf8b
Add tests
philipp94831 Feb 17, 2025
d66d250
Add tests
philipp94831 Feb 17, 2025
1251b54
Add tests
philipp94831 Feb 19, 2025
a1849d2
Add tests
philipp94831 Feb 19, 2025
9fc61bc
Add tests
philipp94831 Feb 19, 2025
5761ed6
Add tests
philipp94831 Feb 19, 2025
0b6dae5
Add tests
philipp94831 Feb 19, 2025
3d9ba36
Add tests
philipp94831 Feb 19, 2025
7a368a7
Add tests
philipp94831 Feb 19, 2025
4b45b59
Add tests
philipp94831 Feb 19, 2025
37f3344
Add tests
philipp94831 Feb 19, 2025
e8e71ae
Clean up
philipp94831 Feb 19, 2025
682fc4b
Update
philipp94831 Feb 19, 2025
848bec0
Remove refresh dependencies
philipp94831 Feb 19, 2025
dc85aa3
Clean up
philipp94831 Feb 19, 2025
817f2d6
Update dependencies
philipp94831 Feb 20, 2025
2ee879a
Merge remote-tracking branch 'origin/master' into feature/improved-ks…
philipp94831 Feb 20, 2025
63ce55a
Add topics to TestTopology
philipp94831 Feb 20, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build-and-publish.yaml
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ on:
push:
tags: ["**"]
branches: ["**"]
pull_request:

jobs:
build-and-publish:
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -57,14 +57,14 @@ Create a subclass of `KafkaStreamsApplication` and implement the abstract method
and `getUniqueAppId()`. You can define the topology of your application in `buildTopology()`.

```java
import com.bakdata.kafka.KStreamX;
import com.bakdata.kafka.KafkaStreamsApplication;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import java.util.Map;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

public class MyStreamsApplication extends KafkaStreamsApplication<StreamsApp> {
public static void main(final String[] args) {
@@ -76,11 +76,11 @@ public class MyStreamsApplication extends KafkaStreamsApplication<StreamsApp> {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();
final KStreamX<String, String> input = builder.streamInput();

// your topology

input.to(builder.getTopics().getOutputTopic());
input.toOutput();
}

@Override
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@ org.gradle.parallel=true
kafkaVersion=3.8.1
testContainersVersion=1.20.5
confluentVersion=7.8.0
fluentKafkaVersion=3.0.0
fluentKafkaVersion=3.1.0
junitVersion=5.11.4
mockitoVersion=5.15.2
assertJVersion=3.27.2
Original file line number Diff line number Diff line change
@@ -192,7 +192,7 @@ public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput().to(builder.getTopics().getOutputTopic());
builder.streamInput().toOutputTopic();
}

@Override
@@ -265,7 +265,7 @@ void shouldExitWithSuccessCodeOnShutdown() {
@Override
public void buildTopology(final TopologyBuilder builder) {
builder.streamInput(Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
.to(builder.getTopics().getOutputTopic());
.toOutputTopic();
}

@Override
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -28,7 +28,6 @@
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
@Getter
@@ -49,8 +48,8 @@ public StreamsApp createApp() {
return new StreamsApp() {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());
final KStreamX<String, String> input = builder.streamInput();
input.toOutputTopic();
}

@Override
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ void shouldClean() {
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
awaitClosed(app.createExecutableApp());
app.clean();

try (final ImprovedAdminClient admin = testClient.admin()) {
@@ -119,7 +119,7 @@ void shouldReset() {
this.runAndAssertContent(expectedValues, "All entries are once in the input topic after the 1st run", app);

// Wait until all stream applications are completely stopped before triggering cleanup
this.awaitClosed(app.createExecutableApp());
awaitClosed(app.createExecutableApp());
app.reset();

try (final ImprovedAdminClient admin = testClient.admin()) {
@@ -158,7 +158,7 @@ private void runApp(final KafkaStreamsApplication<?> app) {
// run in Thread because the application blocks indefinitely
new Thread(app).start();
// Wait until stream application has consumed all data
this.awaitProcessing(app.createExecutableApp());
awaitProcessing(app.createExecutableApp());
}

private CloseFlagApp createCloseFlagApplication() {
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,20 +24,20 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.KStreamX;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
import com.bakdata.kafka.TopologyBuilder;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;

@NoArgsConstructor
public class Mirror implements StreamsApp {
@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> input = builder.streamInput();
input.to(builder.getTopics().getOutputTopic());
final KStreamX<String, String> input = builder.streamInput();
input.toOutputTopic();
}

@Override
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -24,6 +24,8 @@

package com.bakdata.kafka.test_applications;

import com.bakdata.kafka.KStreamX;
import com.bakdata.kafka.KTableX;
import com.bakdata.kafka.SerdeConfig;
import com.bakdata.kafka.StreamsApp;
import com.bakdata.kafka.StreamsTopicConfig;
@@ -33,8 +35,6 @@
import lombok.NoArgsConstructor;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serdes.StringSerde;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;

@@ -43,15 +43,15 @@ public class WordCount implements StreamsApp {

@Override
public void buildTopology(final TopologyBuilder builder) {
final KStream<String, String> textLines = builder.streamInput();
final KStreamX<String, String> textLines = builder.streamInput();

final Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
final KTable<String, Long> wordCounts = textLines
final KTableX<String, Long> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word)
.count(Materialized.as("counts"));

wordCounts.toStream().to(builder.getTopics().getOutputTopic(), Produced.valueSerde(Serdes.Long()));
wordCounts.toStream().toOutputTopic(Produced.valueSerde(Serdes.Long()));
}

@Override
1 change: 1 addition & 0 deletions streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ dependencies {
)
implementation(group = "org.jooq", name = "jool", version = "0.9.15")
implementation(group = "io.github.resilience4j", name = "resilience4j-retry", version = "1.7.1")
api(group = "com.bakdata.kafka", name = "error-handling-core", version = "1.6.1")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.Map;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

/**
* Extends the {@link BranchedKStream} interface by adding methods to simplify Serde configuration, error handling,
* and topic access
* @param <K> type of keys
* @param <V> type of values
*/
public interface BranchedKStreamX<K, V> extends BranchedKStream<K, V> {

@Override
BranchedKStreamX<K, V> branch(Predicate<? super K, ? super V> predicate);

@Override
BranchedKStreamX<K, V> branch(Predicate<? super K, ? super V> predicate, Branched<K, V> branched);

/**
* @see #branch(Predicate, Branched)
*/
BranchedKStreamX<K, V> branch(Predicate<? super K, ? super V> predicate, BranchedX<K, V> branched);

/**
* @deprecated Use {@link #defaultBranchX()} instead.
*/
@Deprecated(since = "3.6.0")
@Override
Map<String, KStream<K, V>> defaultBranch();

/**
* @see BranchedKStream#defaultBranch()
*/
Map<String, KStreamX<K, V>> defaultBranchX();

/**
* @deprecated Use {@link #defaultBranchX(Branched)} instead.
*/
@Deprecated(since = "3.6.0")
@Override
Map<String, KStream<K, V>> defaultBranch(Branched<K, V> branched);

/**
* @see BranchedKStream#defaultBranch(Branched)
*/
Map<String, KStreamX<K, V>> defaultBranchX(Branched<K, V> branched);

/**
* @see #defaultBranch(Branched)
*/
Map<String, KStreamX<K, V>> defaultBranch(BranchedX<K, V> branched);

/**
* @deprecated Use {@link #noDefaultBranchX()} instead.
*/
@Deprecated(since = "3.6.0")
@Override
Map<String, KStream<K, V>> noDefaultBranch();

/**
* @see BranchedKStream#noDefaultBranch()
*/
Map<String, KStreamX<K, V>> noDefaultBranchX();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.Map;
import java.util.Map.Entry;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.BranchedKStream;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;

@RequiredArgsConstructor
class BranchedKStreamXImpl<K, V> implements BranchedKStreamX<K, V> {

private final @NonNull BranchedKStream<K, V> wrapped;
private final @NonNull StreamsContext context;

private static <K, V, T> Map<String, T> wrap(
final Map<String, ? extends KStream<K, V>> streamMap,
final Function<? super Entry<String, ? extends KStream<K, V>>, ? extends T> wrapValue) {
return streamMap.entrySet()
.stream()
.collect(Collectors.toMap(Entry::getKey, wrapValue));
}

@Override
public BranchedKStreamX<K, V> branch(final Predicate<? super K, ? super V> predicate) {
return this.context.wrap(this.wrapped.branch(predicate));
}

@Override
public BranchedKStreamX<K, V> branch(final Predicate<? super K, ? super V> predicate,
final Branched<K, V> branched) {
return this.context.wrap(this.wrapped.branch(predicate, branched));
}

@Override
public BranchedKStreamX<K, V> branch(final Predicate<? super K, ? super V> predicate,
final BranchedX<K, V> branched) {
return this.branch(predicate, branched.configure(this.context));
}

@Override
public Map<String, KStream<K, V>> defaultBranch() {
return this.wrap(this.defaultBranchInternal());
}

@Override
public Map<String, KStreamX<K, V>> defaultBranchX() {
return this.wrapX(this.defaultBranchInternal());
}

@Override
public Map<String, KStream<K, V>> defaultBranch(final Branched<K, V> branched) {
return this.wrap(this.defaultBranchInternal(branched));
}

@Override
public Map<String, KStreamX<K, V>> defaultBranchX(final Branched<K, V> branched) {
return this.wrapX(this.defaultBranchInternal(branched));
}

@Override
public Map<String, KStreamX<K, V>> defaultBranch(final BranchedX<K, V> branched) {
return this.wrapX(this.defaultBranchInternal(branched.configure(this.context)));
}

@Override
public Map<String, KStream<K, V>> noDefaultBranch() {
return this.wrap(this.noDefaultBranchInternal());
}

@Override
public Map<String, KStreamX<K, V>> noDefaultBranchX() {
return this.wrapX(this.noDefaultBranchInternal());
}

private Map<String, KStream<K, V>> noDefaultBranchInternal() {
return this.wrapped.noDefaultBranch();
}

private Map<String, KStream<K, V>> defaultBranchInternal() {
return this.wrapped.defaultBranch();
}

private Map<String, KStream<K, V>> defaultBranchInternal(final Branched<K, V> branched) {
return this.wrapped.defaultBranch(branched);
}

private Map<String, KStream<K, V>> wrap(final Map<String, ? extends KStream<K, V>> streamMap) {
return BranchedKStreamXImpl.<K, V, KStream<K, V>>wrap(streamMap, this::wrapValue);
}

private Map<String, KStreamX<K, V>> wrapX(final Map<String, ? extends KStream<K, V>> streamMap) {
return BranchedKStreamXImpl.<K, V, KStreamX<K, V>>wrap(streamMap, this::wrapValue);
}

private KStreamX<K, V> wrapValue(final Entry<String, ? extends KStream<K, V>> entry) {
return this.context.wrap(entry.getValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.kafka.streams.kstream.Branched;
import org.apache.kafka.streams.kstream.KStream;

/**
* Extends the {@link org.apache.kafka.streams.kstream.Branched} interface by adding methods to simplify Serde
* configuration, error handling,
* and topic access
* @param <K> type of keys
* @param <V> type of values
*/
public final class BranchedX<K, V> extends ModifierChain<Branched<K, V>, StreamsContext, BranchedX<K, V>> {

private BranchedX(final Function<StreamsContext, Branched<K, V>> initializer) {
super(initializer);
}

/**
* @see Branched#withConsumer(Consumer)
*/
public static <K, V> BranchedX<K, V> withConsumer(final Consumer<? super KStreamX<K, V>> chain) {
return new BranchedX<>(context -> Branched.withConsumer(asConsumer(chain, context)));
}

/**
* @see Branched#withFunction(Function)
*/
public static <K, V> BranchedX<K, V> withFunction(
final Function<? super KStreamX<K, V>, ? extends KStream<K, V>> chain) {
return new BranchedX<>(context -> Branched.withFunction(asFunction(chain, context)));
}

/**
* @see Branched#as(String)
*/
public static <K, V> BranchedX<K, V> as(final String name) {
return new BranchedX<>(context -> Branched.as(name));
}

private static <K, V> Consumer<KStream<K, V>> asConsumer(final Consumer<? super KStreamX<K, V>> chain,
final StreamsContext context) {
return stream -> chain.accept(context.wrap(stream));
}

private static <K, V> Function<KStream<K, V>, KStream<K, V>> asFunction(
final Function<? super KStreamX<K, V>, ? extends KStream<K, V>> chain, final StreamsContext context) {
return stream -> chain.apply(context.wrap(stream));
}

/**
* @see Branched#withName(String)
*/
public BranchedX<K, V> withName(final String name) {
return this.modify(branched -> branched.withName(name));
}

@Override
protected BranchedX<K, V> newInstance(final Function<StreamsContext, Branched<K, V>> initializer) {
return new BranchedX<>(initializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;

/**
* Extends the {@link CogroupedKStream} interface by adding methods to simplify Serde configuration, error handling,
* and topic access
* @param <K> type of keys
* @param <VOut> type of values
*/
public interface CogroupedKStreamX<K, VOut> extends CogroupedKStream<K, VOut> {

@Override
<VIn> CogroupedKStreamX<K, VOut> cogroup(KGroupedStream<K, VIn> groupedStream,
Aggregator<? super K, ? super VIn, VOut> aggregator);

@Override
KTableX<K, VOut> aggregate(Initializer<VOut> initializer);

@Override
KTableX<K, VOut> aggregate(Initializer<VOut> initializer, Named named);

@Override
KTableX<K, VOut> aggregate(Initializer<VOut> initializer,
Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Materialized)
*/
KTableX<K, VOut> aggregate(Initializer<VOut> initializer,
MaterializedX<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, VOut> aggregate(Initializer<VOut> initializer, Named named,
Materialized<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Named, Materialized)
*/
KTableX<K, VOut> aggregate(Initializer<VOut> initializer, Named named,
MaterializedX<K, VOut, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<W extends Window> TimeWindowedCogroupedKStreamX<K, VOut> windowedBy(Windows<W> windows);

@Override
TimeWindowedCogroupedKStreamX<K, VOut> windowedBy(SlidingWindows windows);

@Override
SessionWindowedCogroupedKStreamX<K, VOut> windowedBy(SessionWindows windows);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.CogroupedKStream;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;

@RequiredArgsConstructor
class CogroupedStreamXImpl<K, V> implements CogroupedKStreamX<K, V> {

private final @NonNull CogroupedKStream<K, V> wrapped;
private final @NonNull StreamsContext context;

@Override
public <VIn> CogroupedKStreamX<K, V> cogroup(final KGroupedStream<K, VIn> groupedStream,
final Aggregator<? super K, ? super VIn, V> aggregator) {
final KGroupedStream<K, VIn> other = StreamsContext.maybeUnwrap(groupedStream);
return this.context.wrap(this.wrapped.cogroup(other, aggregator));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer) {
return this.context.wrap(this.wrapped.aggregate(initializer));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer, final Named named) {
return this.context.wrap(this.wrapped.aggregate(initializer, named));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, materialized));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, named, materialized));
}

@Override
public KTableX<K, V> aggregate(final Initializer<V> initializer, final Named named,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <W extends Window> TimeWindowedCogroupedKStreamX<K, V> windowedBy(final Windows<W> windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public TimeWindowedCogroupedKStreamX<K, V> windowedBy(final SlidingWindows windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public SessionWindowedCogroupedKStreamX<K, V> windowedBy(final SessionWindows windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}
}
181 changes: 181 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/ConsumedX.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.Topology.AutoOffsetReset;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Consumed} using {@link Configurator}
* @param <K> type of keys
* @param <V> type of values
* @see Consumed
*/
public final class ConsumedX<K, V> extends ModifierChain<Consumed<K, V>, Configurator, ConsumedX<K, V>> {

private ConsumedX(final Function<Configurator, Consumed<K, V>> initializer) {
super(initializer);
}

/**
* Create an instance of {@code ConsumedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code ConsumedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> ConsumedX<K, V> keySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return with(keySerde, Preconfigured.defaultSerde());
}

/**
* Create an instance of {@code ConsumedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code ConsumedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> ConsumedX<K, V> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* Create an instance of {@code ConsumedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code ConsumedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> ConsumedX<K, V> valueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return with(Preconfigured.defaultSerde(), valueSerde);
}

/**
* Create an instance of {@code ConsumedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code ConsumedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> ConsumedX<K, V> valueSerde(final Serde<V> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Consumed#with(Serde, Serde)
*/
public static <K, V> ConsumedX<K, V> with(final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V>> valueSerde) {
return new ConsumedX<>(configurator -> Consumed.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde)));
}

/**
* @see Consumed#with(Serde, Serde)
*/
public static <K, V> ConsumedX<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde));
}

/**
* @see Consumed#with(TimestampExtractor)
*/
public static <K, V> ConsumedX<K, V> with(final TimestampExtractor timestampExtractor) {
return new ConsumedX<>(configurator -> Consumed.with(timestampExtractor));
}

/**
* @see Consumed#with(AutoOffsetReset)
*/
public static <K, V> ConsumedX<K, V> with(final AutoOffsetReset resetPolicy) {
return new ConsumedX<>(configurator -> Consumed.with(resetPolicy));
}

/**
* @see Consumed#as(String)
*/
public static <K, V> ConsumedX<K, V> as(final String processorName) {
return new ConsumedX<>(configurator -> Consumed.as(processorName));
}

/**
* @see Consumed#withKeySerde(Serde)
*/
public ConsumedX<K, V> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify((consumed, configurator) -> consumed.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Consumed#withKeySerde(Serde)
*/
public ConsumedX<K, V> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Consumed#withValueSerde(Serde)
*/
public ConsumedX<K, V> withValueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return this.modify(
(consumed, configurator) -> consumed.withValueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Consumed#withValueSerde(Serde)
*/
public ConsumedX<K, V> withValueSerde(final Serde<V> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Consumed#withOffsetResetPolicy(AutoOffsetReset)
*/
public ConsumedX<K, V> withOffsetResetPolicy(final AutoOffsetReset offsetResetPolicy) {
return this.modify(consumed -> consumed.withOffsetResetPolicy(offsetResetPolicy));
}

/**
* @see Consumed#withTimestampExtractor(TimestampExtractor)
*/
public ConsumedX<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
return this.modify(consumed -> consumed.withTimestampExtractor(timestampExtractor));
}

/**
* @see Consumed#withName(String)
*/
public ConsumedX<K, V> withName(final String processorName) {
return this.modify(consumed -> consumed.withName(processorName));
}

@Override
protected ConsumedX<K, V> newInstance(final Function<Configurator, Consumed<K, V>> initializer) {
return new ConsumedX<>(initializer);
}

}
135 changes: 135 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/GroupedX.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Grouped;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Grouped} using {@link Configurator}
* @param <K> type of keys
* @param <V> type of values
* @see Grouped
*/
public final class GroupedX<K, V> extends ModifierChain<Grouped<K, V>, Configurator, GroupedX<K, V>> {

private GroupedX(final Function<Configurator, Grouped<K, V>> initializer) {
super(initializer);
}

/**
* @see Grouped#keySerde(Serde)
*/
public static <K, V> GroupedX<K, V> keySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return new GroupedX<>(configurator -> Grouped.keySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Grouped#keySerde(Serde)
*/
public static <K, V> GroupedX<K, V> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* @see Grouped#valueSerde(Serde)
*/
public static <K, V> GroupedX<K, V> valueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return new GroupedX<>(configurator -> Grouped.valueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Grouped#valueSerde(Serde)
*/
public static <K, V> GroupedX<K, V> valueSerde(final Serde<V> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Grouped#with(Serde, Serde)
*/
public static <K, V> GroupedX<K, V> with(final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V>> valueSerde) {
return new GroupedX<>(configurator -> Grouped.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde)));
}

/**
* @see Grouped#with(Serde, Serde)
*/
public static <K, V> GroupedX<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde));
}

/**
* @see Grouped#as(String)
*/
public static <K, V> GroupedX<K, V> as(final String name) {
return new GroupedX<>(configurator -> Grouped.as(name));
}

/**
* @see Grouped#withKeySerde(Serde)
*/
public GroupedX<K, V> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify((grouped, configurator) -> grouped.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Grouped#withKeySerde(Serde)
*/
public GroupedX<K, V> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Grouped#withValueSerde(Serde)
*/
public GroupedX<K, V> withValueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return this.modify(
(grouped, configurator) -> grouped.withValueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Grouped#withValueSerde(Serde)
*/
public GroupedX<K, V> withValueSerde(final Serde<V> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Grouped#withName(String)
*/
public GroupedX<K, V> withName(final String processorName) {
return this.modify(grouped -> grouped.withName(processorName));
}

@Override
protected GroupedX<K, V> newInstance(final Function<Configurator, Grouped<K, V>> initializer) {
return new GroupedX<>(initializer);
}

}
179 changes: 179 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/JoinedX.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.time.Duration;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Joined;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Joined} using {@link Configurator}
* @param <K> type of keys
* @param <V1> this value type
* @param <V2> other value type
* @see Joined
*/
public final class JoinedX<K, V1, V2> extends ModifierChain<Joined<K, V1, V2>, Configurator, JoinedX<K, V1, V2>> {

private JoinedX(final Function<Configurator, Joined<K, V1, V2>> initializer) {
super(initializer);
}

/**
* @see Joined#keySerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> keySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return new JoinedX<>(configurator -> Joined.keySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Joined#keySerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* @see Joined#valueSerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> valueSerde(final Preconfigured<? extends Serde<V1>> valueSerde) {
return new JoinedX<>(configurator -> Joined.valueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Joined#valueSerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> valueSerde(final Serde<V1> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Joined#otherValueSerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> otherValueSerde(
final Preconfigured<? extends Serde<V2>> otherValueSerde) {
return new JoinedX<>(configurator -> Joined.otherValueSerde(configurator.configureForValues(otherValueSerde)));
}

/**
* @see Joined#otherValueSerde(Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> otherValueSerde(final Serde<V2> otherValueSerde) {
return otherValueSerde(Preconfigured.create(otherValueSerde));
}

/**
* @see Joined#with(Serde, Serde, Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> with(
final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V1>> valueSerde,
final Preconfigured<? extends Serde<V2>> otherValueSerde) {
return new JoinedX<>(configurator -> Joined.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde), configurator.configureForValues(otherValueSerde)));
}

/**
* @see Joined#with(Serde, Serde, Serde)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> with(
final Serde<K> keySerde,
final Serde<V1> valueSerde,
final Serde<V2> otherValueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde),
Preconfigured.create(otherValueSerde));
}

/**
* @see Joined#as(String)
*/
public static <K, V1, V2> JoinedX<K, V1, V2> as(final String name) {
return new JoinedX<>(configurator -> Joined.as(name));
}

/**
* @see Joined#withKeySerde(Serde)
*/
public JoinedX<K, V1, V2> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify((joined, configurator) -> joined.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Joined#withKeySerde(Serde)
*/
public JoinedX<K, V1, V2> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Joined#withValueSerde(Serde)
*/
public JoinedX<K, V1, V2> withValueSerde(final Preconfigured<? extends Serde<V1>> valueSerde) {
return this.modify(
(joined, configurator) -> joined.withValueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Joined#withValueSerde(Serde)
*/
public JoinedX<K, V1, V2> withValueSerde(final Serde<V1> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Joined#withOtherValueSerde(Serde)
*/
public JoinedX<K, V1, V2> withOtherValueSerde(final Preconfigured<? extends Serde<V2>> otherValueSerde) {
return this.modify(
(joined, configurator) -> joined.withOtherValueSerde(configurator.configureForValues(otherValueSerde)));
}

/**
* @see Joined#withOtherValueSerde(Serde)
*/
public JoinedX<K, V1, V2> withOtherValueSerde(final Serde<V2> otherValueSerde) {
return this.withOtherValueSerde(Preconfigured.create(otherValueSerde));
}

/**
* @see Joined#withName(String)
*/
public JoinedX<K, V1, V2> withName(final String name) {
return this.modify(joined -> joined.withName(name));
}

/**
* @see Joined#withGracePeriod(Duration)
*/
public JoinedX<K, V1, V2> withGracePeriod(final Duration gracePeriod) {
return this.modify(joined -> joined.withGracePeriod(gracePeriod));
}

@Override
protected JoinedX<K, V1, V2> newInstance(final Function<Configurator, Joined<K, V1, V2>> initializer) {
return new JoinedX<>(initializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.streams.kstream.Named;

/**
* {@link KStreamX} that contains successfully processed records and errors of a previous operation
* @param <K> type of keys in the original {@link KStreamX}
* @param <V> type of values in the original {@link KStreamX}
* @param <KR> type of keys in the processed {@link KStreamX}
* @param <VR> type of values in the processed {@link KStreamX}
*/
public interface KErrorStream<K, V, KR, VR> {

/**
* Get the stream of successfully processed values
* @return stream of processed values
*/
KStreamX<KR, VR> values();

/**
* Get the stream of successfully processed values
* @param named name of the processor
* @return stream of processed values
*/
KStreamX<KR, VR> values(Named named);

/**
* Get the stream of errors that occurred during processing
* @return stream of errors
*/
KStreamX<K, ProcessingError<V>> errors();

/**
* Get the stream of errors that occurred during processing
* @param named name of the processor
* @return stream of errors
*/
KStreamX<K, ProcessingError<V>> errors(Named named);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;

/**
* Extends the {@link KGroupedStream} interface by adding methods to simplify Serde configuration, error handling,
* and topic access
* @param <K> type of keys
* @param <V> type of values
*/
public interface KGroupedStreamX<K, V> extends KGroupedStream<K, V> {

@Override
KTableX<K, Long> count();

@Override
KTableX<K, Long> count(Named named);

@Override
KTableX<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #count(Materialized)
*/
KTableX<K, Long> count(MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, Long> count(Named named, Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #count(Named, Materialized)
*/
KTableX<K, Long> count(Named named,
MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, V> reduce(Reducer<V> reducer);

@Override
KTableX<K, V> reduce(Reducer<V> reducer, Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Materialized)
*/
KTableX<K, V> reduce(Reducer<V> reducer,
MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, V> reduce(Reducer<V> reducer, Named named,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Named, Materialized)
*/
KTableX<K, V> reduce(Reducer<V> reducer, Named named,
MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Materialized)
*/
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Named named, Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Named, Materialized)
*/
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator,
Named named, MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<W extends Window> TimeWindowedKStreamX<K, V> windowedBy(Windows<W> windows);

@Override
TimeWindowedKStreamX<K, V> windowedBy(SlidingWindows windows);

@Override
SessionWindowedKStreamX<K, V> windowedBy(SessionWindows windows);

@Override
<VOut> CogroupedKStreamX<K, VOut> cogroup(Aggregator<? super K, ? super V, VOut> aggregator);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.AccessLevel;
import lombok.Getter;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.SlidingWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.state.KeyValueStore;

@RequiredArgsConstructor
class KGroupedStreamXImpl<K, V> implements KGroupedStreamX<K, V> {

@Getter(AccessLevel.PACKAGE)
private final @NonNull KGroupedStream<K, V> wrapped;
private final @NonNull StreamsContext context;

@Override
public KTableX<K, Long> count() {
return this.context.wrap(this.wrapped.count());
}

@Override
public KTableX<K, Long> count(final Named named) {
return this.context.wrap(this.wrapped.count(named));
}

@Override
public KTableX<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.count(materialized));
}

@Override
public KTableX<K, Long> count(
final MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, Long> count(final Named named,
final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.count(named, materialized));
}

@Override
public KTableX<K, Long> count(final Named named,
final MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(named, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> reducer) {
return this.context.wrap(this.wrapped.reduce(reducer));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> reducer,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.reduce(reducer, materialized));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> reducer,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(reducer, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> reducer, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.reduce(reducer, named, materialized));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> reducer, final Named named,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(reducer, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator) {
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, materialized));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, aggregator, materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, aggregator, named, materialized));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator, final Named named,
final MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, aggregator, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public <W extends Window> TimeWindowedKStreamX<K, V> windowedBy(final Windows<W> windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public TimeWindowedKStreamX<K, V> windowedBy(final SlidingWindows windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public SessionWindowedKStreamX<K, V> windowedBy(final SessionWindows windows) {
return this.context.wrap(this.wrapped.windowedBy(windows));
}

@Override
public <VOut> CogroupedKStreamX<K, VOut> cogroup(final Aggregator<? super K, ? super V, VOut> aggregator) {
return this.context.wrap(this.wrapped.cogroup(aggregator));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.state.KeyValueStore;

/**
* Extends the {@link KGroupedTable} interface by adding methods to simplify Serde configuration, error handling, and
* topic access
* @param <K> type of keys
* @param <V> type of values
*/
public interface KGroupedTableX<K, V> extends KGroupedTable<K, V> {

@Override
KTableX<K, Long> count(Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #count(Materialized)
*/
KTableX<K, Long> count(MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, Long> count(Named named, Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #count(Named, Materialized)
*/
KTableX<K, Long> count(Named named,
MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, Long> count();

@Override
KTableX<K, Long> count(Named named);

@Override
KTableX<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Reducer, Materialized)
*/
KTableX<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor,
MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Named named,
Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Reducer, Named, Materialized)
*/
KTableX<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor, Named named,
MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized);

@Override
KTableX<K, V> reduce(Reducer<V> adder, Reducer<V> subtractor);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Aggregator, Materialized)
*/
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor,
MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor, Named named,
Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Aggregator, Named, Materialized)
*/
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor, Named named,
MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor);

@Override
<VR> KTableX<K, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> adder,
Aggregator<? super K, ? super V, VR> subtractor, Named named);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KGroupedTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.state.KeyValueStore;

@RequiredArgsConstructor
class KGroupedTableXImpl<K, V> implements KGroupedTableX<K, V> {

private final @NonNull KGroupedTable<K, V> wrapped;
private final @NonNull StreamsContext context;

@Override
public KTableX<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.count(materialized));
}

@Override
public KTableX<K, Long> count(
final MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, Long> count(final Named named,
final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.count(named, materialized));
}

@Override
public KTableX<K, Long> count(final Named named,
final MaterializedX<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
return this.count(named, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, Long> count() {
return this.context.wrap(this.wrapped.count());
}

@Override
public KTableX<K, Long> count(final Named named) {
return this.context.wrap(this.wrapped.count(named));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(adder, subtractor, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Named named,
final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.reduce(adder, subtractor, materialized));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor, final Named named,
final MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
return this.reduce(adder, subtractor, named, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<K, V> reduce(final Reducer<V> adder, final Reducer<V> subtractor) {
return this.context.wrap(this.wrapped.reduce(adder, subtractor));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor,
final MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, adder, subtractor, materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor, final Named named,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, materialized));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor, final Named named,
final MaterializedX<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, adder, subtractor, named,
materialized.configure(this.context.getConfigurator()));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor) {
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor));
}

@Override
public <VR> KTableX<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> adder,
final Aggregator<? super K, ? super V, VR> subtractor, final Named named) {
return this.context.wrap(this.wrapped.aggregate(initializer, adder, subtractor, named));
}
}
1,071 changes: 1,071 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamX.java

Large diffs are not rendered by default.

1,153 changes: 1,153 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/KStreamXImpl.java

Large diffs are not rendered by default.

391 changes: 391 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/KTableX.java

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.streams.kstream.Named;

@RequiredArgsConstructor
class KeyValueKErrorStream<K, V, KR, VR> implements KErrorStream<K, V, KR, VR> {
private final @NonNull KStreamX<KR, ProcessedKeyValue<K, V, VR>> stream;

@Override
public KStreamX<KR, VR> values() {
return this.stream.flatMapValues(ProcessedKeyValue::getValues);
}

@Override
public KStreamX<KR, VR> values(final Named named) {
return this.stream.flatMapValues(ProcessedKeyValue::getValues, named);
}

@Override
public KStreamX<K, ProcessingError<V>> errors() {
return this.stream.flatMap(ProcessedKeyValue::getErrors);
}

@Override
public KStreamX<K, ProcessingError<V>> errors(final Named named) {
return this.stream.flatMap(ProcessedKeyValue::getErrors, named);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.time.Duration;
import java.util.Map;
import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Materialized} using {@link Configurator}
* @param <K> type of keys
* @param <V> type of values
* @param <S> type of state store
* @see Materialized
*/
public final class MaterializedX<K, V, S extends StateStore> extends
ModifierChain<Materialized<K, V, S>, Configurator, MaterializedX<K, V, S>> {

private MaterializedX(final Function<Configurator, Materialized<K, V, S>> initializer) {
super(initializer);
}

/**
* Create an instance of {@code MaterializedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code MaterializedX}
* @param <K> type of keys
* @param <V> type of values
* @param <S> type of state store
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> keySerde(
final Preconfigured<? extends Serde<K>> keySerde) {
return with(keySerde, Preconfigured.defaultSerde());
}

/**
* Create an instance of {@code MaterializedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code MaterializedX}
* @param <K> type of keys
* @param <V> type of values
* @param <S> type of state store
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* Create an instance of {@code MaterializedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code MaterializedX}
* @param <K> type of keys
* @param <V> type of values
* @param <S> type of state store
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> valueSerde(
final Preconfigured<? extends Serde<V>> valueSerde) {
return with(Preconfigured.defaultSerde(), valueSerde);
}

/**
* Create an instance of {@code MaterializedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code MaterializedX}
* @param <K> type of keys
* @param <V> type of values
* @param <S> type of state store
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> valueSerde(final Serde<V> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Materialized#with(Serde, Serde)
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> with(
final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V>> valueSerde) {
return new MaterializedX<>(configurator -> Materialized.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde)));
}

/**
* @see Materialized#with(Serde, Serde)
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> with(
final Serde<K> keySerde,
final Serde<V> valueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde));
}

/**
* @see Materialized#as(String)
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> as(final String storeName) {
return new MaterializedX<>(configurator -> Materialized.as(storeName));
}

/**
* @see Materialized#as(DslStoreSuppliers)
*/
public static <K, V, S extends StateStore> MaterializedX<K, V, S> as(
final DslStoreSuppliers storeSuppliers) {
return new MaterializedX<>(configurator -> Materialized.as(storeSuppliers));
}

/**
* @see Materialized#as(WindowBytesStoreSupplier)
*/
public static <K, V> MaterializedX<K, V, WindowStore<Bytes, byte[]>> as(final WindowBytesStoreSupplier supplier) {
return new MaterializedX<>(configurator -> Materialized.as(supplier));
}

/**
* @see Materialized#as(SessionBytesStoreSupplier)
*/
public static <K, V> MaterializedX<K, V, SessionStore<Bytes, byte[]>> as(final SessionBytesStoreSupplier supplier) {
return new MaterializedX<>(configurator -> Materialized.as(supplier));
}

/**
* @see Materialized#as(KeyValueBytesStoreSupplier)
*/
public static <K, V> MaterializedX<K, V, KeyValueStore<Bytes, byte[]>> as(
final KeyValueBytesStoreSupplier supplier) {
return new MaterializedX<>(configurator -> Materialized.as(supplier));
}

/**
* @see Materialized#withKeySerde(Serde)
*/
public MaterializedX<K, V, S> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify(
(materialized, configurator) -> materialized.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Materialized#withKeySerde(Serde)
*/
public MaterializedX<K, V, S> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Materialized#withValueSerde(Serde)
*/
public MaterializedX<K, V, S> withValueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return this.modify((materialized, configurator) -> materialized.withValueSerde(
configurator.configureForValues(valueSerde)));
}

/**
* @see Materialized#withValueSerde(Serde)
*/
public MaterializedX<K, V, S> withValueSerde(final Serde<V> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Materialized#withRetention(Duration)
*/
public MaterializedX<K, V, S> withRetention(final Duration retention) {
return this.modify(materialized -> materialized.withRetention(retention));
}

/**
* @see Materialized#withStoreType(DslStoreSuppliers)
*/
public MaterializedX<K, V, S> withStoreType(final DslStoreSuppliers storeSuppliers) {
return this.modify(materialized -> materialized.withStoreType(storeSuppliers));
}

/**
* @see Materialized#withLoggingEnabled(Map)
*/
public MaterializedX<K, V, S> withLoggingEnabled(final Map<String, String> config) {
return this.modify(materialized -> materialized.withLoggingEnabled(config));
}

/**
* @see Materialized#withLoggingDisabled()
*/
public MaterializedX<K, V, S> withLoggingDisabled() {
return this.modify(Materialized::withLoggingDisabled);
}

/**
* @see Materialized#withCachingDisabled()
*/
public MaterializedX<K, V, S> withCachingDisabled() {
return this.modify(Materialized::withCachingDisabled);
}

/**
* @see Materialized#withCachingEnabled()
*/
public MaterializedX<K, V, S> withCachingEnabled() {
return this.modify(Materialized::withCachingEnabled);
}

@Override
protected MaterializedX<K, V, S> newInstance(final Function<Configurator, Materialized<K, V, S>> initializer) {
return new MaterializedX<>(initializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.BiFunction;
import java.util.function.Function;
import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor(access = AccessLevel.PROTECTED)
abstract class ModifierChain<T, C, SELF> {

private final @NonNull Function<C, T> initializer;

protected abstract SELF newInstance(Function<C, T> initializer);

protected final SELF modify(final BiFunction<? super T, ? super C, ? extends T> modifier) {
return this.newInstance(context -> this.modify(context, modifier));
}

protected final SELF modify(final Function<? super T, ? extends T> modifier) {
return this.newInstance(context -> this.modify(context, modifier));
}

final T configure(final C context) {
return this.initializer.apply(context);
}

private T modify(final C context, final BiFunction<? super T, ? super C, ? extends T> modifier) {
final T configure = this.configure(context);
return modifier.apply(configure, context);
}

private T modify(final C context, final Function<? super T, ? extends T> modifier) {
final T configure = this.configure(context);
return modifier.apply(configure);
}

}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* MIT License
*
* Copyright (c) 2024 bakdata
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
@@ -28,6 +28,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import lombok.AccessLevel;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
@@ -116,10 +117,12 @@ public static <S extends Serializer<T>, T> Preconfigured<S> create(final S seria
}

private static <S extends Serde<T>, T> ConfigurableSerde<S, T> configurable(final S serde) {
Objects.requireNonNull(serde, "Use Preconfigured#defaultSerde instead");
return new ConfigurableSerde<>(serde);
}

private static <S extends Serializer<T>, T> ConfigurableSerializer<S, T> configurable(final S serializer) {
Objects.requireNonNull(serializer, "Use Preconfigured#defaultSerializer instead");
return new ConfigurableSerializer<>(serializer);
}

149 changes: 149 additions & 0 deletions streams-bootstrap-core/src/main/java/com/bakdata/kafka/ProducedX.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.StreamPartitioner;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Produced} using {@link Configurator}
* @param <K> type of keys
* @param <V> type of values
* @see Produced
*/
public final class ProducedX<K, V> extends ModifierChain<Produced<K, V>, Configurator, ProducedX<K, V>> {

private ProducedX(final Function<Configurator, Produced<K, V>> initializer) {
super(initializer);
}

/**
* @see Produced#keySerde(Serde)
*/
public static <K, V> ProducedX<K, V> keySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return new ProducedX<>(configurator -> Produced.keySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Produced#keySerde(Serde)
*/
public static <K, V> ProducedX<K, V> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* @see Produced#valueSerde(Serde)
*/
public static <K, V> ProducedX<K, V> valueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return new ProducedX<>(configurator -> Produced.valueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Produced#valueSerde(Serde)
*/
public static <K, V> ProducedX<K, V> valueSerde(final Serde<V> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Produced#with(Serde, Serde)
*/
public static <K, V> ProducedX<K, V> with(final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V>> valueSerde) {
return new ProducedX<>(configurator -> Produced.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde)));
}

/**
* @see Produced#with(Serde, Serde)
*/
public static <K, V> ProducedX<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde));
}

/**
* @see Produced#as(String)
*/
public static <K, V> ProducedX<K, V> as(final String processorName) {
return new ProducedX<>(configurator -> Produced.as(processorName));
}

/**
* @see Produced#streamPartitioner(StreamPartitioner)
*/
public static <K, V> ProducedX<K, V> streamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
return new ProducedX<>(configurator -> Produced.streamPartitioner(partitioner));
}

/**
* @see Produced#withKeySerde(Serde)
*/
public ProducedX<K, V> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify((produced, configurator) -> produced.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Produced#withKeySerde(Serde)
*/
public ProducedX<K, V> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Produced#withValueSerde(Serde)
*/
public ProducedX<K, V> withValueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return this.modify(
(produced, configurator) -> produced.withValueSerde(configurator.configureForValues(valueSerde)));
}

/**
* @see Produced#withValueSerde(Serde)
*/
public ProducedX<K, V> withValueSerde(final Serde<V> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Produced#withStreamPartitioner(StreamPartitioner)
*/
public ProducedX<K, V> withStreamPartitioner(final StreamPartitioner<? super K, ? super V> partitioner) {
return this.modify(produced -> produced.withStreamPartitioner(partitioner));
}

/**
* @see Produced#withName(String)
*/
public ProducedX<K, V> withName(final String name) {
return this.modify(produced -> produced.withName(name));
}

@Override
protected ProducedX<K, V> newInstance(final Function<Configurator, Produced<K, V>> initializer) {
return new ProducedX<>(initializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import java.util.function.Function;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;

/**
* Use {@link Preconfigured} to lazily configure {@link Serde} for {@link Repartitioned} using {@link Configurator}
* @param <K> type of keys
* @param <V> type of values
* @see Repartitioned
*/
public final class RepartitionedX<K, V> extends ModifierChain<Repartitioned<K, V>, Configurator, RepartitionedX<K, V>> {

private RepartitionedX(final Function<Configurator, Repartitioned<K, V>> initializer) {
super(initializer);
}

/**
* Create an instance of {@code RepartitionedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code RepartitionedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> RepartitionedX<K, V> keySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return with(keySerde, Preconfigured.defaultSerde());
}

/**
* Create an instance of {@code RepartitionedX} with provided key serde
* @param keySerde Serde to use for keys
* @return a new instance of {@code RepartitionedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> RepartitionedX<K, V> keySerde(final Serde<K> keySerde) {
return keySerde(Preconfigured.create(keySerde));
}

/**
* Create an instance of {@code RepartitionedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code RepartitionedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> RepartitionedX<K, V> valueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return with(Preconfigured.defaultSerde(), valueSerde);
}

/**
* Create an instance of {@code RepartitionedX} with provided value serde
* @param valueSerde Serde to use for values
* @return a new instance of {@code RepartitionedX}
* @param <K> type of keys
* @param <V> type of values
*/
public static <K, V> RepartitionedX<K, V> valueSerde(final Serde<V> valueSerde) {
return valueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Repartitioned#with(Serde, Serde)
*/
public static <K, V> RepartitionedX<K, V> with(final Preconfigured<? extends Serde<K>> keySerde,
final Preconfigured<? extends Serde<V>> valueSerde) {
return new RepartitionedX<>(configurator -> Repartitioned.with(configurator.configureForKeys(keySerde),
configurator.configureForValues(valueSerde)));
}

/**
* @see Repartitioned#with(Serde, Serde)
*/
public static <K, V> RepartitionedX<K, V> with(final Serde<K> keySerde, final Serde<V> valueSerde) {
return with(Preconfigured.create(keySerde), Preconfigured.create(valueSerde));
}

/**
* @see Repartitioned#as(String)
*/
public static <K, V> RepartitionedX<K, V> as(final String name) {
return new RepartitionedX<>(configurator -> Repartitioned.as(name));
}

/**
* @see Repartitioned#numberOfPartitions(int)
*/
public static <K, V> RepartitionedX<K, V> numberOfPartitions(final int numberOfPartitions) {
return new RepartitionedX<>(configurator -> Repartitioned.numberOfPartitions(numberOfPartitions));
}

/**
* @see Repartitioned#streamPartitioner(StreamPartitioner)
*/
public static <K, V> RepartitionedX<K, V> streamPartitioner(final StreamPartitioner<K, V> partitioner) {
return new RepartitionedX<>(configurator -> Repartitioned.streamPartitioner(partitioner));
}

/**
* @see Repartitioned#withNumberOfPartitions(int)
*/
public RepartitionedX<K, V> withNumberOfPartitions(final int numberOfPartitions) {
return this.modify(repartitioned -> repartitioned.withNumberOfPartitions(numberOfPartitions));
}

/**
* @see Repartitioned#withKeySerde(Serde)
*/
public RepartitionedX<K, V> withKeySerde(final Preconfigured<? extends Serde<K>> keySerde) {
return this.modify(
(repartitioned, configurator) -> repartitioned.withKeySerde(configurator.configureForKeys(keySerde)));
}

/**
* @see Repartitioned#withKeySerde(Serde)
*/
public RepartitionedX<K, V> withKeySerde(final Serde<K> keySerde) {
return this.withKeySerde(Preconfigured.create(keySerde));
}

/**
* @see Repartitioned#withValueSerde(Serde)
*/
public RepartitionedX<K, V> withValueSerde(final Preconfigured<? extends Serde<V>> valueSerde) {
return this.modify((repartitioned, configurator) -> repartitioned.withValueSerde(
configurator.configureForValues(valueSerde)));
}

/**
* @see Repartitioned#withValueSerde(Serde)
*/
public RepartitionedX<K, V> withValueSerde(final Serde<V> valueSerde) {
return this.withValueSerde(Preconfigured.create(valueSerde));
}

/**
* @see Repartitioned#withStreamPartitioner(StreamPartitioner)
*/
public RepartitionedX<K, V> withStreamPartitioner(final StreamPartitioner<K, V> partitioner) {
return this.modify(repartitioned -> repartitioned.withStreamPartitioner(partitioner));
}

/**
* @see Repartitioned#withName(String)
*/
public RepartitionedX<K, V> withName(final String name) {
return this.modify(repartitioned -> repartitioned.withName(name));
}

@Override
protected RepartitionedX<K, V> newInstance(final Function<Configurator, Repartitioned<K, V>> initializer) {
return new RepartitionedX<>(initializer);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;

/**
* Extends the {@link SessionWindowedCogroupedKStream} interface by adding methods to simplify Serde configuration,
* error handling, and topic access
* @param <K> type of keys
* @param <VOut> type of values
*/
public interface SessionWindowedCogroupedKStreamX<K, VOut> extends SessionWindowedCogroupedKStream<K, VOut> {

@Override
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger);

@Override
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger,
Named named);

@Override
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger,
Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Merger, Materialized)
*/
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger,
MaterializedX<K, VOut, SessionStore<Bytes, byte[]>> materialized);

@Override
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger,
Named named, Materialized<K, VOut, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Merger, Named, Materialized)
*/
KTableX<Windowed<K>, VOut> aggregate(Initializer<VOut> initializer, Merger<? super K, VOut> sessionMerger,
Named named, MaterializedX<K, VOut, SessionStore<Bytes, byte[]>> materialized);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.SessionWindowedCogroupedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;

@RequiredArgsConstructor
class SessionWindowedCogroupedStreamXImpl<K, V> implements SessionWindowedCogroupedKStreamX<K, V> {

private final @NonNull SessionWindowedCogroupedKStream<K, V> wrapped;
private final @NonNull StreamsContext context;

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger) {
return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger));
}

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger,
final Named named) {
return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger, named));
}

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger,
final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger, materialized));
}

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger,
final MaterializedX<K, V, SessionStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, sessionMerger, materialized.configure(this.context.getConfigurator()));
}

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger,
final Named named, final Materialized<K, V, SessionStore<Bytes, byte[]>> materialized) {
return this.context.wrap(this.wrapped.aggregate(initializer, sessionMerger, materialized));
}

@Override
public KTableX<Windowed<K>, V> aggregate(final Initializer<V> initializer,
final Merger<? super K, V> sessionMerger,
final Named named, final MaterializedX<K, V, SessionStore<Bytes, byte[]>> materialized) {
return this.aggregate(initializer, sessionMerger, named,
materialized.configure(this.context.getConfigurator()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* MIT License
*
* Copyright (c) 2025 bakdata
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package com.bakdata.kafka;

import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.SessionWindowedKStream;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.state.SessionStore;

/**
* Extends the {@link SessionWindowedKStream} interface by adding methods to simplify Serde configuration, error
* handling, and topic access
* @param <K> type of keys
* @param <V> type of values
*/
public interface SessionWindowedKStreamX<K, V> extends SessionWindowedKStream<K, V> {

@Override
KTableX<Windowed<K>, Long> count();

@Override
KTableX<Windowed<K>, Long> count(Named named);

@Override
KTableX<Windowed<K>, Long> count(Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #count(Materialized)
*/
KTableX<Windowed<K>, Long> count(MaterializedX<K, Long, SessionStore<Bytes, byte[]>> materialized);

@Override
KTableX<Windowed<K>, Long> count(Named named,
Materialized<K, Long, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #count(Named, Materialized)
*/
KTableX<Windowed<K>, Long> count(Named named,
MaterializedX<K, Long, SessionStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger);

@Override
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger, Named named);

@Override
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger, Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Merger, Materialized)
*/
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger,
MaterializedX<K, VR, SessionStore<Bytes, byte[]>> materialized);

@Override
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger, Named named,
Materialized<K, VR, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #aggregate(Initializer, Aggregator, Merger, Named, Materialized)
*/
<VR> KTableX<Windowed<K>, VR> aggregate(Initializer<VR> initializer,
Aggregator<? super K, ? super V, VR> aggregator,
Merger<? super K, VR> sessionMerger, Named named,
MaterializedX<K, VR, SessionStore<Bytes, byte[]>> materialized);

@Override
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer);

@Override
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer, Named named);

@Override
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer,
Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Materialized)
*/
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer,
MaterializedX<K, V, SessionStore<Bytes, byte[]>> materialized);

@Override
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer, Named named,
Materialized<K, V, SessionStore<Bytes, byte[]>> materialized);

/**
* @see #reduce(Reducer, Named, Materialized)
*/
KTableX<Windowed<K>, V> reduce(Reducer<V> reducer, Named named,
MaterializedX<K, V, SessionStore<Bytes, byte[]>> materialized);

@Override
SessionWindowedKStreamX<K, V> emitStrategy(EmitStrategy emitStrategy);
}
Loading