From 975e7f7fc2ce900826fdaade485ccb9fc6c6a15e Mon Sep 17 00:00:00 2001 From: Philipp Schirmer Date: Thu, 4 Apr 2024 12:37:56 +0200 Subject: [PATCH] Create v3 --- .../kafka/integration/RunProducerAppTest.java | 51 ++++++++++--------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java index 70357e57..babba1b8 100644 --- a/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java +++ b/streams-bootstrap-picocli/src/test/java/com/bakdata/kafka/integration/RunProducerAppTest.java @@ -69,7 +69,7 @@ void tearDown() { void shouldRunApp() throws InterruptedException { final String output = "output"; this.kafkaCluster.createTopic(TopicConfig.withName(output).useDefaults()); - final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() { + try (final KafkaProducerApplication app = new SimpleKafkaProducerApplication(() -> new ProducerApp() { @Override public void run(final ProducerBuilder builder) { try (final Producer producer = builder.createProducer()) { @@ -84,29 +84,30 @@ public Map createKafkaProperties() { kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return kafkaProperties; } - }); - app.setBrokers(this.kafkaCluster.getBrokerList()); - app.setSchemaRegistryUrl("mock://"); - app.setOutputTopic(output); - app.setKafkaConfig(Map.of( - ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" - )); - app.run(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class) - .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) - .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) - .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://") - .build())) - .hasSize(1) - .anySatisfy(kv -> { - assertThat(kv.getKey()).isEqualTo("foo"); - assertThat(kv.getValue().getContent()).isEqualTo("bar"); - }); - app.clean(); - delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); - assertThat(this.kafkaCluster.exists(app.getOutputTopic())) - .as("Output topic is deleted") - .isFalse(); + })) { + app.setBrokers(this.kafkaCluster.getBrokerList()); + app.setSchemaRegistryUrl("mock://"); + app.setOutputTopic(output); + app.setKafkaConfig(Map.of( + ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000" + )); + app.run(); + delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(this.kafkaCluster.read(ReadKeyValues.from(output, String.class, TestRecord.class) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SpecificAvroDeserializer.class) + .with(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://") + .build())) + .hasSize(1) + .anySatisfy(kv -> { + assertThat(kv.getKey()).isEqualTo("foo"); + assertThat(kv.getValue().getContent()).isEqualTo("bar"); + }); + app.clean(); + delay(TIMEOUT_SECONDS, TimeUnit.SECONDS); + assertThat(this.kafkaCluster.exists(app.getOutputTopic())) + .as("Output topic is deleted") + .isFalse(); + } } }