From 4f0f188056b0b10f7feed9b5ac126851c911401e Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Mon, 18 Dec 2023 17:26:34 +0530 Subject: [PATCH] uncommented integration test with fix Message sent in previous test is being consumed during the start of this test. Set ttl to the published message so that it is not consumed again. --- .../solace/SolaceConsumerTest.java | 68 +++++++++---------- .../solace/base/SolaceContainer.java | 1 + 2 files changed, 35 insertions(+), 34 deletions(-) diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index a03ca70..afffee3 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -10,6 +10,7 @@ import jakarta.enterprise.context.ApplicationScoped; +import org.awaitility.Durations; import org.eclipse.microprofile.reactive.messaging.Incoming; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Order; @@ -118,6 +119,7 @@ void consumerFailedProcessingPublishToErrorTopic() { .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true) .with("mp.messaging.incoming.in.consumer.queue.error.topic", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION) + .with("mp.messaging.incoming.in.consumer.queue.error.message.ttl", 1000) .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); @@ -131,17 +133,14 @@ void consumerFailedProcessingPublishToErrorTopic() { .start(); Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); - OutboundMessage outboundMessage = messageBuilder.build("2"); + OutboundMessage outboundMessage = messageBuilder.build("1"); publisher.publish(outboundMessage, tp); // Assert on published messages await().untilAsserted(() -> assertThat(app.getReceived().size()).isEqualTo(0)); await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(1)); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } + await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages()).contains("1")); + await().pollDelay(Durations.FIVE_SECONDS).until(() -> true); } @Test @@ -170,12 +169,13 @@ void consumerFailedProcessingMoveToDMQ() { Properties properties = new Properties(); properties.setProperty(SolaceProperties.MessageProperties.PERSISTENT_DMQ_ELIGIBLE, "true"); messageBuilder.fromProperties(properties); - OutboundMessage outboundMessage = messageBuilder.build("1"); + OutboundMessage outboundMessage = messageBuilder.build("12"); publisher.publish(outboundMessage, tp); // Assert on published messages await().untilAsserted(() -> assertThat(app.getReceived().size()).isEqualTo(0)); await().untilAsserted(() -> assertThat(app.getReceivedDMQMessages().size()).isEqualTo(1)); + await().untilAsserted(() -> assertThat(app.getReceivedDMQMessages()).contains("12")); } @Test @@ -200,33 +200,33 @@ void consumerCreateMissingResourceAddSubscriptionPermissionException() { + SolaceContainer.INTEGRATION_TEST_QUEUE_NAME + "' - Topic '" + topic)); } - // @Test - // @Order(7) - // void consumerPublishToErrorTopicPermissionException() { - // MapBasedConfig config = new MapBasedConfig() - // .with("mp.messaging.incoming.in.connector", "quarkus-solace") - // .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) - // .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") - // .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true) - // .with("mp.messaging.incoming.in.consumer.queue.error.topic", - // "publish/deny") - // .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") - // .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) - // .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); - // - // // Run app that consumes messages - // MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class); - // // Produce messages - // PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() - // .build() - // .start(); - // Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); - // OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); - // OutboundMessage outboundMessage = messageBuilder.build("2"); - // publisher.publish(outboundMessage, tp); - // - // await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0)); - // } + @Test + @Order(7) + void consumerPublishToErrorTopicPermissionException() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_QUEUE_NAME) + .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") + .with("mp.messaging.incoming.in.consumer.queue.publish-to-error-topic-on-failure", true) + .with("mp.messaging.incoming.in.consumer.queue.error.topic", + "publish/deny") + .with("mp.messaging.incoming.error-in.connector", "quarkus-solace") + .with("mp.messaging.incoming.error-in.consumer.queue.name", SolaceContainer.INTEGRATION_TEST_ERROR_QUEUE_NAME) + .with("mp.messaging.incoming.error-in.consumer.queue.type", "durable-exclusive"); + + // Run app that consumes messages + MyErrorQueueConsumer app = runApplication(config, MyErrorQueueConsumer.class); + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION); + OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); + OutboundMessage outboundMessage = messageBuilder.build("2"); + publisher.publish(outboundMessage, tp); + + await().untilAsserted(() -> assertThat(app.getReceivedFailedMessages().size()).isEqualTo(0)); + } @ApplicationScoped static class MyConsumer { diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceContainer.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceContainer.java index 6e0b6c8..88496c6 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceContainer.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceContainer.java @@ -107,6 +107,7 @@ private Transferable createConfigurationScript() { updateConfigScript(scriptBuilder, "message-spool message-vpn default"); updateConfigScript(scriptBuilder, "create queue " + INTEGRATION_TEST_ERROR_QUEUE_NAME); updateConfigScript(scriptBuilder, "access-type exclusive"); + updateConfigScript(scriptBuilder, "respect-ttl"); updateConfigScript(scriptBuilder, "max-spool-usage 300"); updateConfigScript(scriptBuilder, "subscription topic " + INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION); updateConfigScript(scriptBuilder, "permission all consume");