From cd0c5441e518e2e138c28c05e70c52b14fdf9405 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Wed, 17 Jan 2024 15:26:40 +0530 Subject: [PATCH 1/2] Performance Tests --- .../solace/SolaceConsumerTest.java | 12 +- .../solace/SolaceProcessorTest.java | 2 +- .../solace/base/SolaceBaseTest.java | 3 +- .../solace/base/SolaceBrokerExtension.java | 13 +- ...k.java => SolacePublisherHealthCheck.java} | 2 +- .../solace/perf/EndToEndPerformanceTest.java | 212 ++++++++++++++++++ .../perf/SolaceConsumerPerformanceTest.java | 89 ++++++++ .../perf/SolacePublisherPerformanceTest.java | 199 ++++++++++++++++ 8 files changed, 514 insertions(+), 18 deletions(-) rename pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/{SolaceProducerHealthCheck.java => SolacePublisherHealthCheck.java} (98%) create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolaceConsumerPerformanceTest.java create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolacePublisherPerformanceTest.java diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java index 69f9090..43100dd 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceConsumerTest.java @@ -27,6 +27,7 @@ import io.quarkiverse.solace.base.SolaceContainer; import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; import io.quarkiverse.solace.incoming.SolaceIncomingChannel; import io.quarkiverse.solace.logging.SolaceTestAppender; import io.smallrye.mutiny.Multi; @@ -50,7 +51,7 @@ void consumer() { .with("mp.messaging.incoming.in.consumer.queue.name", queue) .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages"); // Run app that consumes messages MyConsumer app = runApplication(config, MyConsumer.class); @@ -59,7 +60,7 @@ void consumer() { PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() .build() .start(); - Topic tp = Topic.of(topic); + Topic tp = Topic.of("quarkus/integration/test/replay/messages"); publisher.publish("1", tp); publisher.publish("2", tp); publisher.publish("3", tp); @@ -79,7 +80,7 @@ void consumerReplay() { .with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") - .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages") .with("mp.messaging.incoming.in.consumer.queue.replay.strategy", "all-messages"); // Run app that consumes messages @@ -294,8 +295,9 @@ static class MyConsumer { private final List received = new CopyOnWriteArrayList<>(); @Incoming("in") - void in(InboundMessage msg) { - received.add(msg.getPayloadAsString()); + CompletionStage in(SolaceInboundMessage msg) { + received.add(msg.getMessage().getPayloadAsString()); + return msg.ack(); } public List getReceived() { diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceProcessorTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceProcessorTest.java index 007f8e4..a14fa64 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceProcessorTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/SolaceProcessorTest.java @@ -27,7 +27,7 @@ public class SolaceProcessorTest extends WeldTestBase { @Test void consumer() { - String processedTopic = topic + "-processed"; + String processedTopic = topic + "/processed"; MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBaseTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBaseTest.java index e154ed0..3ce5673 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBaseTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBaseTest.java @@ -28,8 +28,7 @@ public class SolaceBaseTest { public void initTopic(TestInfo testInfo) { String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString()); String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString()); - // topic = cn + "/" + mn + "/" + UUID.randomUUID().getMostSignificantBits(); - topic = "quarkus/integration/test/default/topic"; + topic = "quarkus/integration/test/default/" + cn + "/" + mn + "/" + UUID.randomUUID().getMostSignificantBits(); } @BeforeEach diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBrokerExtension.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBrokerExtension.java index 4e6a37b..cf9993c 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBrokerExtension.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/base/SolaceBrokerExtension.java @@ -45,15 +45,10 @@ public void startSolaceBroker() { solace = createSolaceContainer() .withCredentials("user", "pass") .withExposedPorts(SolaceContainer.Service.SMF.getPort()) - .withPublishTopic("quarkus/integration/test/default/topic", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/provisioned/queue/topic", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/provisioned/queue/error/topic", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/dynamic/topic/1", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/dynamic/topic/2", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/dynamic/topic/3", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/dynamic/topic/4", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/dynamic/topic/5", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/default/topic-processed", SolaceContainer.Service.SMF); + .withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF) + .withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF) + .withPublishTopic("quarkus/integration/test/provisioned/queue/>", SolaceContainer.Service.SMF) + .withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF); solace.start(); LOGGER.info("Solace broker started: " + solace.getOrigin(SolaceContainer.Service.SMF)); await().until(() -> solace.isRunning()); diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolacePublisherHealthCheck.java similarity index 98% rename from pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java rename to pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolacePublisherHealthCheck.java index ad85137..93d9a8d 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolacePublisherHealthCheck.java @@ -22,7 +22,7 @@ import io.smallrye.reactive.messaging.health.HealthReport; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; -public class SolaceProducerHealthCheck extends WeldTestBase { +public class SolacePublisherHealthCheck extends WeldTestBase { @Test void publisherHealthCheck() { MapBasedConfig config = new MapBasedConfig() diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java new file mode 100644 index 0000000..31c91c1 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java @@ -0,0 +1,212 @@ +package io.quarkiverse.solace.perf; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.PersistentMessagePublisher; +import com.solace.messaging.receiver.PersistentMessageReceiver; +import com.solace.messaging.resources.Queue; +import com.solace.messaging.resources.Topic; +import com.solace.messaging.resources.TopicSubscription; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +@Disabled +public class EndToEndPerformanceTest extends WeldTestBase { + + private static final int COUNT = 100000; + + private static final int TIMEOUT_IN_SECONDS = 400; + + @Test + public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic); + + // Run app that consumes messages + runApplication(config, MyProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndPerformanceTesttWithBackPressureWaitAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + // + @Test + public void endToEndPerformanceTesttWithBackPressureElasticAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); + + // Run app that consumes messages + runApplication(config, MyProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + // + @Test + public void endToEndPerformanceTesttWithBackPressureElasticAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @ApplicationScoped + static class MyProcessor { + @Incoming("in") + @Outgoing("out") + Multi> in(SolaceInboundMessage msg) { + // return messagingService.messageBuilder().build(payload); + return Multi.createFrom().items(msg.getMessage().getPayloadAsString()) + .map(p -> Message.of(p).withAck(() -> { + msg.ack(); + return CompletableFuture.completedFuture(null); + })); + } + + } +} diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolaceConsumerPerformanceTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolaceConsumerPerformanceTest.java new file mode 100644 index 0000000..d5da51d --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolaceConsumerPerformanceTest.java @@ -0,0 +1,89 @@ +package io.quarkiverse.solace.perf; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.PersistentMessagePublisher; +import com.solace.messaging.resources.Topic; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceConsumerPerformanceTest extends WeldTestBase { + private static final int COUNT = 100000; + private static final int TIMEOUT_IN_SECONDS = 400; + + @Test + public void solaceConsumerPerformanceTest() { + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + // .with("mp.messaging.incoming.in.client.graceful-shutdown", false); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + await().until(() -> isStarted() && isReady()); + + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long start = app.getStart(); + long end = System.currentTimeMillis(); + + System.out.println("Total time : " + (end - start) + " ms"); + + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + LongAdder count = new LongAdder(); + long start; + + @Incoming("in") + public CompletionStage in(SolaceInboundMessage msg) { + if (count.longValue() == 0L) { + start = System.currentTimeMillis(); + } + count.increment(); + return msg.ack(); + } + + public List getReceived() { + return received; + } + + public long getStart() { + return start; + } + + public long getCount() { + return count.longValue(); + } + } +} diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolacePublisherPerformanceTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolacePublisherPerformanceTest.java new file mode 100644 index 0000000..4e35e6d --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/SolacePublisherPerformanceTest.java @@ -0,0 +1,199 @@ +package io.quarkiverse.solace.perf; + +import static org.awaitility.Awaitility.await; + +import java.time.Duration; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.LongAdder; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.*; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.receiver.PersistentMessageReceiver; +import com.solace.messaging.resources.Queue; +import com.solace.messaging.resources.TopicSubscription; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolacePublisherPerformanceTest extends WeldTestBase { + + private static final int COUNT = 100000; + private static final int TIMEOUT_IN_SECONDS = 400; + + @Test + void publisherPerformanceTestWithBackPressureWaitAndWaitForPublishReceipt() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + @Test + void publisherPerformanceTestWithBackPressureWaitAndNoWaitForPublishReceipt() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic) + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + @Test + void publisherPerformanceTestWithBackPressureElasticAndWaitForPublishReceipt() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + @Test + void publisherPerformanceTestWithBackPressureElasticAndNoWaitForPublishReceipt() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> { + received.add(inboundMessage.getPayloadAsString()); + }); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + long start = System.currentTimeMillis(); + // app.run(); + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> app.getCount() == COUNT); + long end = System.currentTimeMillis(); + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + + System.out.println("Total time : " + (end - start) + " ms"); + long duration = end - start; + double speed = (COUNT * 1.0) / (duration / 1000.0); + System.out.println(speed + " messages/ms"); + } + + @ApplicationScoped + static class MyApp { + LongAdder count = new LongAdder(); + + @Outgoing("out") + Multi> out() { + + return Multi.createFrom().range(0, COUNT) + .map(payload -> Message.of(payload).withAck(() -> { + count.increment(); + return CompletableFuture.completedFuture(null); + })); + } + + public long getCount() { + return count.longValue(); + } + } +} From ec3369dd1960ede789d11ed13caddb8156e9b402 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Fri, 19 Jan 2024 19:42:43 +0530 Subject: [PATCH 2/2] Added blocking processor test and removed duplicate entry in pom.xml --- pubsub-plus-connector/pom.xml | 5 - .../solace/perf/EndToEndPerformanceTest.java | 174 ++++++++++++++++++ 2 files changed, 174 insertions(+), 5 deletions(-) diff --git a/pubsub-plus-connector/pom.xml b/pubsub-plus-connector/pom.xml index 3d55cff..4c8d169 100644 --- a/pubsub-plus-connector/pom.xml +++ b/pubsub-plus-connector/pom.xml @@ -72,11 +72,6 @@ junit-jupiter-api test - - org.junit.jupiter - junit-jupiter-params - test - org.assertj assertj-core diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java index 31c91c1..e2c2ee4 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/perf/EndToEndPerformanceTest.java @@ -24,6 +24,7 @@ import io.quarkiverse.solace.base.WeldTestBase; import io.quarkiverse.solace.incoming.SolaceInboundMessage; import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.annotations.Blocking; import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; @Disabled @@ -209,4 +210,177 @@ Multi> in(SolaceInboundMessage msg) { } } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureWaitAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic"); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @Test + public void endToEndBlockingProcessorPerformanceTesttWithBackPressureElasticAndNoWaitForPublishReceipt() { + String processedTopic = topic + "/processed"; + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic) + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", processedTopic) + .with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic") + .with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false); + + // Run app that consumes messages + runApplication(config, MyBlockingProcessor.class); + + List received = new CopyOnWriteArrayList<>(); + + // Start listening processed messages + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withMessageAutoAcknowledgement() + .withSubscriptions(TopicSubscription.of(processedTopic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + for (int i = 0; i < COUNT; i++) { + publisher.publish(String.valueOf(i + 1), tp); + } + + await() + .atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS)) + .until(() -> received.size() == COUNT); + } + + @ApplicationScoped + static class MyBlockingProcessor { + @Incoming("in") + @Outgoing("out") + @Blocking(ordered = false) + Message in(SolaceInboundMessage msg) { + return Message.of(msg.getMessage().getPayloadAsString()).withAck(() -> { + msg.ack(); + return CompletableFuture.completedFuture(null); + }); + } + } }