Skip to content

Commit

Permalink
Create v3
Browse files Browse the repository at this point in the history
  • Loading branch information
philipp94831 committed Apr 4, 2024
1 parent 9b0eabf commit 975e7f7
Showing 1 changed file with 26 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void tearDown() {
void shouldRunApp() throws InterruptedException {
final String output = "output";
this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults());
final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() {
try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() {
@Override
public void run(final ProducerBuilder builder) {
try (final Producer<String, TestRecord> producer = builder.createProducer()) {
Expand All @@ -84,29 +84,30 @@ public Map<String, Object> createKafkaProperties() {
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return kafkaProperties;
}
});
app.setBrokers(this.kafkaCluster.getBrokerList());
app.setSchemaRegistryUrl("mock://");
app.setOutputTopic(output);
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.run();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")
.build()))
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.getKey()).isEqualTo("foo");
assertThat(kv.getValue().getContent()).isEqualTo("bar");
});
app.clean();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.exists(app.getOutputTopic()))
.as("Output topic is deleted")
.isFalse();
})) {
app.setBrokers(this.kafkaCluster.getBrokerList());
app.setSchemaRegistryUrl("mock://");
app.setOutputTopic(output);
app.setKafkaConfig(Map.of(
ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"
));
app.run();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class)
.with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://")
.build()))
.hasSize(1)
.anySatisfy(kv -> {
assertThat(kv.getKey()).isEqualTo("foo");
assertThat(kv.getValue().getContent()).isEqualTo("bar");
});
app.clean();
delay(TIMEOUT_SECONDS, TimeUnit.SECONDS);
assertThat(this.kafkaCluster.exists(app.getOutputTopic()))
.as("Output topic is deleted")
.isFalse();
}
}
}

0 comments on commit 975e7f7

Please sign in to comment.