Skip to content

Commit

Permalink
Performance Tests
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Jan 17, 2024
1 parent 0aa38bc commit cd0c544
Show file tree
Hide file tree
Showing 8 changed files with 514 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import io.quarkiverse.solace.base.SolaceContainer;
import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.quarkiverse.solace.incoming.SolaceIncomingChannel;
import io.quarkiverse.solace.logging.SolaceTestAppender;
import io.smallrye.mutiny.Multi;
Expand All @@ -50,7 +51,7 @@ void consumer() {
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages");

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);
Expand All @@ -59,7 +60,7 @@ void consumer() {
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
Topic tp = Topic.of("quarkus/integration/test/replay/messages");
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
Expand All @@ -79,7 +80,7 @@ void consumerReplay() {
.with("mp.messaging.incoming.in.consumer.queue.type", "durable-exclusive")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages")
.with("mp.messaging.incoming.in.consumer.queue.replay.strategy", "all-messages");

// Run app that consumes messages
Expand Down Expand Up @@ -294,8 +295,9 @@ static class MyConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
void in(InboundMessage msg) {
received.add(msg.getPayloadAsString());
CompletionStage<Void> in(SolaceInboundMessage<byte[]> msg) {
received.add(msg.getMessage().getPayloadAsString());
return msg.ack();
}

public List<String> getReceived() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SolaceProcessorTest extends WeldTestBase {

@Test
void consumer() {
String processedTopic = topic + "-processed";
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ public class SolaceBaseTest {
public void initTopic(TestInfo testInfo) {
String cn = testInfo.getTestClass().map(Class::getSimpleName).orElse(UUID.randomUUID().toString());
String mn = testInfo.getTestMethod().map(Method::getName).orElse(UUID.randomUUID().toString());
// topic = cn + "/" + mn + "/" + UUID.randomUUID().getMostSignificantBits();
topic = "quarkus/integration/test/default/topic";
topic = "quarkus/integration/test/default/" + cn + "/" + mn + "/" + UUID.randomUUID().getMostSignificantBits();
}

@BeforeEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,10 @@ public void startSolaceBroker() {
solace = createSolaceContainer()
.withCredentials("user", "pass")
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withPublishTopic("quarkus/integration/test/default/topic", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/queue/topic", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/queue/error/topic", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/topic/1", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/topic/2", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/topic/3", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/topic/4", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/topic/5", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/topic-processed", SolaceContainer.Service.SMF);
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/queue/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF);
solace.start();
LOGGER.info("Solace broker started: " + solace.getOrigin(SolaceContainer.Service.SMF));
await().until(() -> solace.isRunning());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolaceProducerHealthCheck extends WeldTestBase {
public class SolacePublisherHealthCheck extends WeldTestBase {
@Test
void publisherHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
package io.quarkiverse.solace.perf;

import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
import com.solace.messaging.resources.Topic;
import com.solace.messaging.resources.TopicSubscription;

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

@Disabled
public class EndToEndPerformanceTest extends WeldTestBase {

private static final int COUNT = 100000;

private static final int TIMEOUT_IN_SECONDS = 400;

@Test
public void endToEndPerformanceTesttWithBackPressureWaitAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic);

// Run app that consumes messages
runApplication(config, MyProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@Test
public void endToEndPerformanceTesttWithBackPressureWaitAndNoWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);

// Run app that consumes messages
runApplication(config, MyProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

//
@Test
public void endToEndPerformanceTesttWithBackPressureElasticAndWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic");

// Run app that consumes messages
runApplication(config, MyProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

//
@Test
public void endToEndPerformanceTesttWithBackPressureElasticAndNoWaitForPublishReceipt() {
String processedTopic = topic + "/processed";
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
.with("mp.messaging.outgoing.out.connector", "quarkus-solace")
.with("mp.messaging.outgoing.out.producer.topic", processedTopic)
.with("mp.messaging.outgoing.out.producer.back-pressure.strategy", "elastic")
.with("mp.messaging.outgoing.out.producer.waitForPublishReceipt", false);

// Run app that consumes messages
runApplication(config, MyProcessor.class);

List<String> received = new CopyOnWriteArrayList<>();

// Start listening processed messages
PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
.withMessageAutoAcknowledgement()
.withSubscriptions(TopicSubscription.of(processedTopic))
.build(Queue.nonDurableExclusiveQueue());
receiver.receiveAsync(inboundMessage -> received.add(inboundMessage.getPayloadAsString()));
receiver.start();

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> received.size() == COUNT);
}

@ApplicationScoped
static class MyProcessor {
@Incoming("in")
@Outgoing("out")
Multi<Message<String>> in(SolaceInboundMessage<byte[]> msg) {
// return messagingService.messageBuilder().build(payload);
return Multi.createFrom().items(msg.getMessage().getPayloadAsString())
.map(p -> Message.of(p).withAck(() -> {
msg.ack();
return CompletableFuture.completedFuture(null);
}));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.quarkiverse.solace.perf;

import static org.awaitility.Awaitility.await;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.LongAdder;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.resources.Topic;

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolaceConsumerPerformanceTest extends WeldTestBase {
private static final int COUNT = 100000;
private static final int TIMEOUT_IN_SECONDS = 400;

@Test
public void solaceConsumerPerformanceTest() {
// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();

MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);
// .with("mp.messaging.incoming.in.client.graceful-shutdown", false);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);

await().until(() -> isStarted() && isReady());

Topic tp = Topic.of(topic);
for (int i = 0; i < COUNT; i++) {
publisher.publish(String.valueOf(i + 1), tp);
}

await()
.atMost(Duration.ofSeconds(TIMEOUT_IN_SECONDS))
.until(() -> app.getCount() == COUNT);
long start = app.getStart();
long end = System.currentTimeMillis();

System.out.println("Total time : " + (end - start) + " ms");

}

@ApplicationScoped
static class MyConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();
LongAdder count = new LongAdder();
long start;

@Incoming("in")
public CompletionStage<Void> in(SolaceInboundMessage<byte[]> msg) {
if (count.longValue() == 0L) {
start = System.currentTimeMillis();
}
count.increment();
return msg.ack();
}

public List<String> getReceived() {
return received;
}

public long getStart() {
return start;
}

public long getCount() {
return count.longValue();
}
}
}
Loading

0 comments on commit cd0c544

Please sign in to comment.