Skip to content

Commit

Permalink
Update
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Jun 25, 2024
1 parent 3cf0503 commit 566bbb8
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 8 deletions.
2 changes: 1 addition & 1 deletion streams-bootstrap-cli/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ dependencies {
implementation(group = "org.apache.logging.log4j", name = "log4j-slf4j2-impl", version = log4jVersion)

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ private interface Stoppable {
void stop();
}

/**
* Provides access to a {@link CleanUpRunner} and closes the associated {@link ExecutableApp}
*/
@RequiredArgsConstructor
public class CleanableApp implements AutoCloseable, Stoppable {
private final @NonNull ExecutableApp<?, ?, ?> app;
Expand All @@ -339,12 +342,18 @@ public void close() {
KafkaApplication.this.activeApps.remove(this);
}

/**
* Close the app
*/
@Override
public void stop() {
this.app.close();
}
}

/**
* Provides access to a {@link Runner} and closes the associated {@link ExecutableApp}
*/
@RequiredArgsConstructor
public class RunnableApp implements AutoCloseable, Stoppable {
private final @NonNull ExecutableApp<?, ?, ?> app;
Expand All @@ -357,6 +366,9 @@ public void close() {
KafkaApplication.this.activeApps.remove(this);
}

/**
* Close the runner and app
*/
@Override
public void stop() {
this.runner.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ void shouldRunApp() throws InterruptedException {
public ProducerRunnable buildRunnable(final ProducerBuilder builder) {
return () -> {
try (final Producer<String, TestRecord> producer = builder.createProducer()) {
final TestRecord record = TestRecord.newBuilder().setContent("bar").build();
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), "foo", record));
final TestRecord testRecord = TestRecord.newBuilder().setContent("bar").build();
producer.send(new ProducerRecord<>(builder.getTopics().getOutputTopic(), "foo", testRecord));
}
};
}
Expand Down
2 changes: 1 addition & 1 deletion streams-bootstrap-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ dependencies {
implementation(group = "org.jooq", name = "jool", version = "0.9.14")

val junitVersion: String by project
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-api", version = junitVersion)
testImplementation(group = "org.junit.jupiter", name = "junit-jupiter-params", version = junitVersion)
testImplementation(group = "org.junit-pioneer", name = "junit-pioneer", version = "2.2.0")
testRuntimeOnly(group = "org.junit.jupiter", name = "junit-jupiter-engine", version = junitVersion)
val assertJVersion: String by project
testImplementation(group = "org.assertj", name = "assertj-core", version = assertJVersion)
val mockitoVersion: String by project
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class StreamsExecutionOptions {
* Hook that is called after calling {@link KafkaStreams#start()}
*/
@Builder.Default
private final @NonNull Consumer<RunningStreams> onStart = (runningStreams) -> {};
private final @NonNull Consumer<RunningStreams> onStart = runningStreams -> {};
/**
* Configures {@link KafkaStreams#setStateListener(StateListener)}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@ private static ConfiguredStreamsApp<MirrorWithNonDefaultSerde> createApp() {
void shouldMirror() {
final Serde<TestRecord> keySerde = this.getKeySerde();
final Serde<TestRecord> valueSerde = this.getValueSerde();
final TestRecord record = TestRecord.newBuilder()
final TestRecord testRecord = TestRecord.newBuilder()
.setContent("bar")
.build();
this.testTopology.input()
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.add(record, record);
.add(testRecord, testRecord);

this.testTopology.streamOutput()
.withKeySerde(keySerde)
.withValueSerde(valueSerde)
.expectNextRecord().hasKey(record).hasValue(record)
.expectNextRecord().hasKey(testRecord).hasValue(testRecord)
.expectNoMoreRecord();
}

Expand Down

0 comments on commit 566bbb8

Please sign in to comment.