diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/i18n/SolaceLogging.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/i18n/SolaceLogging.java index e769662..fed69ab 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/i18n/SolaceLogging.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/i18n/SolaceLogging.java @@ -11,7 +11,7 @@ @MessageLogger(projectCode = "SRMSG", length = 5) public interface SolaceLogging extends BasicLogger { - SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus"); + SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus.messaging"); @Once @LogMessage(level = Logger.Level.INFO) diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java index 6c48c35..ab2eb50 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceInboundMetadata.java @@ -4,6 +4,7 @@ import java.util.Map; import com.solace.messaging.PubSubPlusClientException; +import com.solace.messaging.config.SolaceConstants; import com.solace.messaging.receiver.InboundMessage; import com.solace.messaging.util.Converter; import com.solace.messaging.util.InteroperabilitySupport; @@ -129,4 +130,8 @@ public Map getProperties() { return msg.getProperties(); } + public String getPartitionKey() { + return msg.getProperties().get(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); + } + } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java index 899c2ff..9b5b9b7 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java @@ -22,7 +22,6 @@ import com.solace.messaging.config.MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy; import com.solace.messaging.config.ReceiverActivationPassivationConfiguration; import com.solace.messaging.config.ReplayStrategy; -import com.solace.messaging.receiver.DirectMessageReceiver; import com.solace.messaging.receiver.InboundMessage; import com.solace.messaging.receiver.PersistentMessageReceiver; import com.solace.messaging.resources.Queue; @@ -63,7 +62,6 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); this.gracefulShutdown = ic.getClientGracefulShutdown(); this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout(); - DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build(); Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED }; if (ic.getConsumerQueueSupportsNacks()) { outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED }; @@ -193,13 +191,13 @@ public Flow.Publisher> getStream() { public void waitForUnAcknowledgedMessages() { try { receiver.pause(); - SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged"); + SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel); if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { - SolaceLogging.log.info(String.format("Timed out while waiting for the" + - " remaining messages to be acknowledged.")); + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); } } catch (InterruptedException e) { - SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged")); + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); throw new RuntimeException(e); } } @@ -246,6 +244,7 @@ public void isAlive(HealthReport.HealthReportBuilder builder) { @Override public void onStateChange(ReceiverState receiverState, ReceiverState receiverState1, long l) { - + SolaceLogging.log.infof("Consumer state changed from %s to %s on channel %s", receiverState.name(), + receiverState1.name(), channel); } } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java index bfcb224..bd23ef0 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutboundMetadata.java @@ -15,6 +15,8 @@ public class SolaceOutboundMetadata { private final Integer classOfService; private final String dynamicDestination; + private final String partitionKey; + public static PubSubOutboundMetadataBuilder builder() { return new PubSubOutboundMetadataBuilder(); } @@ -27,7 +29,7 @@ public SolaceOutboundMetadata(Map httpContentHeaders, String applicationMessageType, Long timeToLive, String applicationMessageId, - Integer classOfService, String dynamicDestination) { + Integer classOfService, String dynamicDestination, String partitionKey) { this.httpContentHeaders = httpContentHeaders; this.expiration = expiration; this.priority = priority; @@ -38,6 +40,7 @@ public SolaceOutboundMetadata(Map httpContentHeaders, this.applicationMessageId = applicationMessageId; this.classOfService = classOfService; this.dynamicDestination = dynamicDestination; + this.partitionKey = partitionKey; } public Map getHttpContentHeaders() { @@ -80,6 +83,10 @@ public String getDynamicDestination() { return dynamicDestination; } + public String getPartitionKey() { + return partitionKey; + } + public static class PubSubOutboundMetadataBuilder { private Map httpContentHeaders; private Long expiration; @@ -92,6 +99,8 @@ public static class PubSubOutboundMetadataBuilder { private Integer classOfService; private String dynamicDestination; + private String partitionKey; + public PubSubOutboundMetadataBuilder setHttpContentHeaders(Map httpContentHeader) { this.httpContentHeaders = httpContentHeaders; return this; @@ -142,9 +151,14 @@ public PubSubOutboundMetadataBuilder setDynamicDestination(String dynamicDestina return this; } + public PubSubOutboundMetadataBuilder setPartitionKey(String partitionKey) { + this.partitionKey = partitionKey; + return this; + } + public SolaceOutboundMetadata createPubSubOutboundMetadata() { return new SolaceOutboundMetadata(httpContentHeaders, expiration, priority, senderId, properties, - applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination); + applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination, partitionKey); } } } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java index 36de3cc..c39582a 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java @@ -13,6 +13,7 @@ import com.solace.messaging.MessagingService; import com.solace.messaging.PersistentMessagePublisherBuilder; import com.solace.messaging.PubSubPlusClientException; +import com.solace.messaging.config.SolaceConstants; import com.solace.messaging.publisher.OutboundMessage; import com.solace.messaging.publisher.OutboundMessageBuilder; import com.solace.messaging.publisher.PersistentMessagePublisher; @@ -145,6 +146,10 @@ private Uni publishMessage(PersistentMessagePublisher publisher, if (metadata.getClassOfService() != null) { msgBuilder.withClassOfService(metadata.getClassOfService()); } + if (metadata.getPartitionKey() != null) { + msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, + metadata.getPartitionKey()); + } if (metadata.getDynamicDestination() != null) { topic.set(Topic.of(metadata.getDynamicDestination())); @@ -196,13 +201,13 @@ public Flow.Subscriber> getSubscriber() { public void waitForPublishedMessages() { try { - SolaceLogging.log.info("Waiting for outgoing messages to be published"); + SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel); if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) { - SolaceLogging.log.info(String.format("Timed out while waiting for the" + - " remaining messages to get publish acknowledgment.")); + SolaceLogging.log.infof("Timed out while waiting for the" + + " remaining messages to be acknowledged on channel %s.", channel); } } catch (InterruptedException e) { - SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged")); + SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel); throw new RuntimeException(e); } } diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java index 947d3e9..ca679c0 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceConsumerTest.java @@ -4,8 +4,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; -import java.util.List; -import java.util.Properties; +import java.util.*; import java.util.concurrent.*; import jakarta.enterprise.context.ApplicationScoped; @@ -18,6 +17,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestMethodOrder; +import com.solace.messaging.config.SolaceConstants; import com.solace.messaging.config.SolaceProperties; import com.solace.messaging.publisher.OutboundMessage; import com.solace.messaging.publisher.OutboundMessageBuilder; @@ -36,7 +36,7 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class SolaceConsumerTest extends WeldTestBase { - private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus"); + private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus.messaging"); private SolaceTestAppender solaceTestAppender = new SolaceTestAppender(); private SolaceConsumerTest() { @@ -192,6 +192,75 @@ void consumerFailedProcessingMoveToDMQ() { @Test @Order(6) + void partitionedQueue() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-1.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-1.consumer.queue.type", "durable-non-exclusive") + .with("mp.messaging.incoming.consumer-2.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-2.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive") + .with("mp.messaging.incoming.consumer-3.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-3.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-3.consumer.queue.type", "durable-non-exclusive") + .with("mp.messaging.incoming.consumer-4.connector", "quarkus-solace") + .with("mp.messaging.incoming.consumer-4.consumer.queue.name", + SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME) + .with("mp.messaging.incoming.consumer-4.consumer.queue.type", "durable-non-exclusive"); + + // Run app that consumes messages + MyPartitionedQueueConsumer app = runApplication(config, MyPartitionedQueueConsumer.class); + + CopyOnWriteArrayList partitionKeys = new CopyOnWriteArrayList<>() { + { + add("Group-1"); + add("Group-2"); + add("Group-3"); + add("Group-4"); + } + }; + Map partitionMessages = new HashMap<>() { + { + put(partitionKeys.get(0), 0); + put(partitionKeys.get(1), 0); + put(partitionKeys.get(2), 0); + put(partitionKeys.get(3), 0); + } + }; + + Random random = new Random(); + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION); + for (int i = 0; i < 1000; i++) { + int partitionIndex = random.nextInt(4); + String partitionKey = partitionKeys.get(partitionIndex); + int count = partitionMessages.get(partitionKey); + partitionMessages.put(partitionKey, (count + 1)); + OutboundMessageBuilder messageBuilder = messagingService.messageBuilder(); + messageBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey); + OutboundMessage outboundMessage = messageBuilder.build(Integer.toString(i)); + publisher.publish(outboundMessage, tp); + } + + // Assert on published and consumed messages + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(0))) + .isEqualTo(partitionMessages.get(partitionKeys.get(0)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(1))) + .isEqualTo(partitionMessages.get(partitionKeys.get(1)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(2))) + .isEqualTo(partitionMessages.get(partitionKeys.get(2)))); + await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(3))) + .isEqualTo(partitionMessages.get(partitionKeys.get(3)))); + } + + @Test + @Order(7) void consumerPublishToErrorTopicPermissionException() { MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") @@ -224,7 +293,7 @@ void consumerPublishToErrorTopicPermissionException() { } @Test - @Order(7) + @Order(8) void consumerGracefulCloseTest() { MapBasedConfig config = new MapBasedConfig() .with("channel-name", "in") @@ -269,7 +338,7 @@ void consumerGracefulCloseTest() { } @Test - @Order(8) + @Order(9) void consumerCreateMissingResourceAddSubscriptionPermissionException() { MapBasedConfig config = new MapBasedConfig() .with("mp.messaging.incoming.in.connector", "quarkus-solace") @@ -353,4 +422,51 @@ public List getReceivedFailedMessages() { return receivedFailedMessages; } } + + @ApplicationScoped + static class MyPartitionedQueueConsumer { + Map partitionMessages = new HashMap<>() { + { + put("Group-1", 0); + put("Group-2", 0); + put("Group-3", 0); + put("Group-4", 0); + } + }; + + @Incoming("consumer-1") + CompletionStage consumer1(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-2") + CompletionStage consumer2(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-3") + CompletionStage consumer3(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + @Incoming("consumer-4") + CompletionStage consumer4(SolaceInboundMessage msg) { + updatePartitionMessages(msg); + return msg.ack(); + } + + private void updatePartitionMessages(SolaceInboundMessage msg) { + String partitionKey = msg.getMessage() + .getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY); + int count = partitionMessages.get(partitionKey); + partitionMessages.put(partitionKey, (count + 1)); + } + + public Map getPartitionMessages() { + return partitionMessages; + } + } } diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java index 7b7d3be..2dab5f6 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceBrokerExtension.java @@ -47,7 +47,7 @@ public void startSolaceBroker() { .withExposedPorts(SolaceContainer.Service.SMF.getPort()) .withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF) .withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF) - .withPublishTopic("quarkus/integration/test/provisioned/queue/>", SolaceContainer.Service.SMF) + .withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF) .withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF); solace.start(); LOGGER.info("Solace broker started: " + solace.getOrigin(SolaceContainer.Service.SMF)); diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java index 2211e3d..8beae7e 100644 --- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java +++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java @@ -24,6 +24,9 @@ public class SolaceContainer extends GenericContainer { public static final String INTEGRATION_TEST_ERROR_QUEUE_NAME = "integration-test-error-queue"; public static final String INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/queue/error/topic"; + public static final String INTEGRATION_TEST_PARTITION_QUEUE_NAME = "integration-test-partition-queue"; + public static final String INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/partition/queue/topic"; + private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("solace/solace-pubsub-standard"); private static final String DEFAULT_VPN = "default"; @@ -137,6 +140,20 @@ private Transferable createConfigurationScript() { updateConfigScript(scriptBuilder, "exit"); updateConfigScript(scriptBuilder, "exit"); + // Partitioned Queue + updateConfigScript(scriptBuilder, "message-spool message-vpn default"); + updateConfigScript(scriptBuilder, "create queue " + INTEGRATION_TEST_PARTITION_QUEUE_NAME); + updateConfigScript(scriptBuilder, "access-type non-exclusive"); + updateConfigScript(scriptBuilder, "subscription topic " + INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION); + updateConfigScript(scriptBuilder, "max-spool-usage 300"); + updateConfigScript(scriptBuilder, "permission all consume"); + updateConfigScript(scriptBuilder, "partition"); + updateConfigScript(scriptBuilder, "count 4"); + updateConfigScript(scriptBuilder, "exit"); + updateConfigScript(scriptBuilder, "no shutdown"); + updateConfigScript(scriptBuilder, "exit"); + updateConfigScript(scriptBuilder, "exit"); + // Create VPN if not default if (!vpn.equals(DEFAULT_VPN)) { updateConfigScript(scriptBuilder, "create message-vpn " + vpn);