Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Partitioned queues #42

Merged
merged 6 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
@MessageLogger(projectCode = "SRMSG", length = 5)
public interface SolaceLogging extends BasicLogger {

SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus");
SolaceLogging log = Logger.getMessageLogger(SolaceLogging.class, "com.solace.quarkus.messaging");

@Once
@LogMessage(level = Logger.Level.INFO)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;

import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.util.Converter;
import com.solace.messaging.util.InteroperabilitySupport;
Expand Down Expand Up @@ -129,4 +130,8 @@ public Map<String, String> getProperties() {
return msg.getProperties();
}

public String getPartitionKey() {
return msg.getProperties().get(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.solace.messaging.config.MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy;
import com.solace.messaging.config.ReceiverActivationPassivationConfiguration;
import com.solace.messaging.config.ReplayStrategy;
import com.solace.messaging.receiver.DirectMessageReceiver;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
Expand Down Expand Up @@ -63,7 +62,6 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
this.gracefulShutdownWaitTimeout = ic.getClientGracefulShutdownWaitTimeout();
DirectMessageReceiver r = solace.createDirectMessageReceiverBuilder().build();
Outcome[] outcomes = new Outcome[] { Outcome.ACCEPTED };
if (ic.getConsumerQueueSupportsNacks()) {
outcomes = new Outcome[] { Outcome.ACCEPTED, Outcome.FAILED, Outcome.REJECTED };
Expand Down Expand Up @@ -193,13 +191,13 @@ public Flow.Publisher<? extends Message<?>> getStream() {
public void waitForUnAcknowledgedMessages() {
try {
receiver.pause();
SolaceLogging.log.info("Waiting for incoming channel messages to be acknowledged");
SolaceLogging.log.infof("Waiting for incoming channel %s messages to be acknowledged", channel);
if (!unacknowledgedMessageTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to be acknowledged."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down Expand Up @@ -246,6 +244,7 @@ public void isAlive(HealthReport.HealthReportBuilder builder) {

@Override
public void onStateChange(ReceiverState receiverState, ReceiverState receiverState1, long l) {

SolaceLogging.log.infof("Consumer state changed from %s to %s on channel %s", receiverState.name(),
receiverState1.name(), channel);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public class SolaceOutboundMetadata {
private final Integer classOfService;
private final String dynamicDestination;

private final String partitionKey;

public static PubSubOutboundMetadataBuilder builder() {
return new PubSubOutboundMetadataBuilder();
}
Expand All @@ -27,7 +29,7 @@ public SolaceOutboundMetadata(Map<String, String> httpContentHeaders,
String applicationMessageType,
Long timeToLive,
String applicationMessageId,
Integer classOfService, String dynamicDestination) {
Integer classOfService, String dynamicDestination, String partitionKey) {
this.httpContentHeaders = httpContentHeaders;
this.expiration = expiration;
this.priority = priority;
Expand All @@ -38,6 +40,7 @@ public SolaceOutboundMetadata(Map<String, String> httpContentHeaders,
this.applicationMessageId = applicationMessageId;
this.classOfService = classOfService;
this.dynamicDestination = dynamicDestination;
this.partitionKey = partitionKey;
}

public Map<String, String> getHttpContentHeaders() {
Expand Down Expand Up @@ -80,6 +83,10 @@ public String getDynamicDestination() {
return dynamicDestination;
}

public String getPartitionKey() {
return partitionKey;
}

public static class PubSubOutboundMetadataBuilder {
private Map<String, String> httpContentHeaders;
private Long expiration;
Expand All @@ -92,6 +99,8 @@ public static class PubSubOutboundMetadataBuilder {
private Integer classOfService;
private String dynamicDestination;

private String partitionKey;

public PubSubOutboundMetadataBuilder setHttpContentHeaders(Map<String, String> httpContentHeader) {
this.httpContentHeaders = httpContentHeaders;
return this;
Expand Down Expand Up @@ -142,9 +151,14 @@ public PubSubOutboundMetadataBuilder setDynamicDestination(String dynamicDestina
return this;
}

public PubSubOutboundMetadataBuilder setPartitionKey(String partitionKey) {
this.partitionKey = partitionKey;
return this;
}

public SolaceOutboundMetadata createPubSubOutboundMetadata() {
return new SolaceOutboundMetadata(httpContentHeaders, expiration, priority, senderId, properties,
applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination);
applicationMessageType, timeToLive, applicationMessageId, classOfService, dynamicDestination, partitionKey);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.solace.messaging.MessagingService;
import com.solace.messaging.PersistentMessagePublisherBuilder;
import com.solace.messaging.PubSubPlusClientException;
import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
import com.solace.messaging.publisher.PersistentMessagePublisher;
Expand Down Expand Up @@ -145,6 +146,10 @@ private Uni<PublishReceipt> publishMessage(PersistentMessagePublisher publisher,
if (metadata.getClassOfService() != null) {
msgBuilder.withClassOfService(metadata.getClassOfService());
}
if (metadata.getPartitionKey() != null) {
msgBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY,
metadata.getPartitionKey());
}

if (metadata.getDynamicDestination() != null) {
topic.set(Topic.of(metadata.getDynamicDestination()));
Expand Down Expand Up @@ -196,13 +201,13 @@ public Flow.Subscriber<? extends Message<?>> getSubscriber() {

public void waitForPublishedMessages() {
try {
SolaceLogging.log.info("Waiting for outgoing messages to be published");
SolaceLogging.log.infof("Waiting for outgoing channel %s messages to be published", channel);
if (!publishedMessagesTracker.awaitEmpty(this.gracefulShutdownWaitTimeout, TimeUnit.MILLISECONDS)) {
SolaceLogging.log.info(String.format("Timed out while waiting for the" +
" remaining messages to get publish acknowledgment."));
SolaceLogging.log.infof("Timed out while waiting for the" +
" remaining messages to be acknowledged on channel %s.", channel);
}
} catch (InterruptedException e) {
SolaceLogging.log.info(String.format("Interrupted while waiting for messages to get acknowledged"));
SolaceLogging.log.infof("Interrupted while waiting for messages on channel %s to get acknowledged", channel);
throw new RuntimeException(e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;

import java.util.List;
import java.util.Properties;
import java.util.*;
import java.util.concurrent.*;

import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -18,6 +17,7 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;

import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.config.SolaceProperties;
import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.OutboundMessageBuilder;
Expand All @@ -36,7 +36,7 @@

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class SolaceConsumerTest extends WeldTestBase {
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus");
private org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getLogger("com.solace.quarkus.messaging");
private SolaceTestAppender solaceTestAppender = new SolaceTestAppender();

private SolaceConsumerTest() {
Expand Down Expand Up @@ -192,6 +192,75 @@ void consumerFailedProcessingMoveToDMQ() {

@Test
@Order(6)
void partitionedQueue() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.consumer-1.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-1.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-1.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-2.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-2.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-2.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-3.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-3.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-3.consumer.queue.type", "durable-non-exclusive")
.with("mp.messaging.incoming.consumer-4.connector", "quarkus-solace")
.with("mp.messaging.incoming.consumer-4.consumer.queue.name",
SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_NAME)
.with("mp.messaging.incoming.consumer-4.consumer.queue.type", "durable-non-exclusive");

// Run app that consumes messages
MyPartitionedQueueConsumer app = runApplication(config, MyPartitionedQueueConsumer.class);

CopyOnWriteArrayList<String> partitionKeys = new CopyOnWriteArrayList<>() {
{
add("Group-1");
add("Group-2");
add("Group-3");
add("Group-4");
}
};
Map<String, Integer> partitionMessages = new HashMap<>() {
{
put(partitionKeys.get(0), 0);
put(partitionKeys.get(1), 0);
put(partitionKeys.get(2), 0);
put(partitionKeys.get(3), 0);
}
};

Random random = new Random();
// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(SolaceContainer.INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION);
for (int i = 0; i < 1000; i++) {
int partitionIndex = random.nextInt(4);
String partitionKey = partitionKeys.get(partitionIndex);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
OutboundMessageBuilder messageBuilder = messagingService.messageBuilder();
messageBuilder.withProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY, partitionKey);
OutboundMessage outboundMessage = messageBuilder.build(Integer.toString(i));
publisher.publish(outboundMessage, tp);
}

// Assert on published and consumed messages
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(0)))
.isEqualTo(partitionMessages.get(partitionKeys.get(0))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(1)))
.isEqualTo(partitionMessages.get(partitionKeys.get(1))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(2)))
.isEqualTo(partitionMessages.get(partitionKeys.get(2))));
await().untilAsserted(() -> assertThat(app.getPartitionMessages().get(partitionKeys.get(3)))
.isEqualTo(partitionMessages.get(partitionKeys.get(3))));
}

@Test
@Order(7)
void consumerPublishToErrorTopicPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down Expand Up @@ -224,7 +293,7 @@ void consumerPublishToErrorTopicPermissionException() {
}

@Test
@Order(7)
@Order(8)
void consumerGracefulCloseTest() {
MapBasedConfig config = new MapBasedConfig()
.with("channel-name", "in")
Expand Down Expand Up @@ -269,7 +338,7 @@ void consumerGracefulCloseTest() {
}

@Test
@Order(8)
@Order(9)
void consumerCreateMissingResourceAddSubscriptionPermissionException() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
Expand Down Expand Up @@ -353,4 +422,51 @@ public List<String> getReceivedFailedMessages() {
return receivedFailedMessages;
}
}

@ApplicationScoped
static class MyPartitionedQueueConsumer {
Map<String, Integer> partitionMessages = new HashMap<>() {
{
put("Group-1", 0);
put("Group-2", 0);
put("Group-3", 0);
put("Group-4", 0);
}
};

@Incoming("consumer-1")
CompletionStage<Void> consumer1(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-2")
CompletionStage<Void> consumer2(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-3")
CompletionStage<Void> consumer3(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

@Incoming("consumer-4")
CompletionStage<Void> consumer4(SolaceInboundMessage<?> msg) {
updatePartitionMessages(msg);
return msg.ack();
}

private void updatePartitionMessages(SolaceInboundMessage<?> msg) {
String partitionKey = msg.getMessage()
.getProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY);
int count = partitionMessages.get(partitionKey);
partitionMessages.put(partitionKey, (count + 1));
}

public Map<String, Integer> getPartitionMessages() {
return partitionMessages;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void startSolaceBroker() {
.withExposedPorts(SolaceContainer.Service.SMF.getPort())
.withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/queue/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
.withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF);
solace.start();
LOGGER.info("Solace broker started: " + solace.getOrigin(SolaceContainer.Service.SMF));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ public class SolaceContainer extends GenericContainer<SolaceContainer> {
public static final String INTEGRATION_TEST_ERROR_QUEUE_NAME = "integration-test-error-queue";
public static final String INTEGRATION_TEST_ERROR_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/queue/error/topic";

public static final String INTEGRATION_TEST_PARTITION_QUEUE_NAME = "integration-test-partition-queue";
public static final String INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION = "quarkus/integration/test/provisioned/partition/queue/topic";

private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("solace/solace-pubsub-standard");

private static final String DEFAULT_VPN = "default";
Expand Down Expand Up @@ -137,6 +140,20 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "exit");

// Partitioned Queue
updateConfigScript(scriptBuilder, "message-spool message-vpn default");
updateConfigScript(scriptBuilder, "create queue " + INTEGRATION_TEST_PARTITION_QUEUE_NAME);
updateConfigScript(scriptBuilder, "access-type non-exclusive");
updateConfigScript(scriptBuilder, "subscription topic " + INTEGRATION_TEST_PARTITION_QUEUE_SUBSCRIPTION);
updateConfigScript(scriptBuilder, "max-spool-usage 300");
updateConfigScript(scriptBuilder, "permission all consume");
updateConfigScript(scriptBuilder, "partition");
updateConfigScript(scriptBuilder, "count 4");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "no shutdown");
updateConfigScript(scriptBuilder, "exit");
updateConfigScript(scriptBuilder, "exit");

// Create VPN if not default
if (!vpn.equals(DEFAULT_VPN)) {
updateConfigScript(scriptBuilder, "create message-vpn " + vpn);
Expand Down
Loading