From e41fb74c1114556e2b430e360157f2b0259fdf5d Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Wed, 20 Dec 2023 10:14:03 +0530 Subject: [PATCH] Updated documentation and removed unused constructor args --- .../quarkus-solace-extension-incoming.adoc | 10 ++++- .../quarkiverse/solace/SolaceConnector.java | 2 +- .../solace/fault/SolaceDiscard.java | 5 +-- .../solace/fault/SolaceErrorTopic.java | 15 +------ .../quarkiverse/solace/fault/SolaceFail.java | 6 +-- .../solace/fault/SolaceFailureHandler.java | 41 ++++++------------ .../incoming/SolaceIncomingChannel.java | 5 +-- .../solace/samples/HelloConsumer.java | 42 +++++++++---------- .../src/main/resources/application.properties | 15 +++---- 9 files changed, 55 insertions(+), 86 deletions(-) diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc index 79e4590..498a628 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc @@ -183,7 +183,15 @@ a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-sola [.description] -- -Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore`, `fail`, `discard` (default). +Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`. + +`ignore` - Mark the message as IGNORED, will continue processing with next message. + +`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. + +`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version. + +`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue. // ifdef::add-copy-button-to-env-var[] // Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[] diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index db90b89..131a5cf 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -53,7 +53,7 @@ @ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100") // TODO implement consumer concurrency //@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1") -@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore`, `fail`, `discard` (default)", defaultValue = "discard") +@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") @ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error") @ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false") @ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.") diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java index 5009978..2d22f2d 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceDiscard.java @@ -4,7 +4,6 @@ import org.eclipse.microprofile.reactive.messaging.Metadata; -import com.solace.messaging.MessagingService; import com.solace.messaging.config.MessageAcknowledgementConfiguration; import com.solace.messaging.receiver.AcknowledgementSupport; @@ -16,12 +15,10 @@ public class SolaceDiscard implements SolaceFailureHandler { private final String channel; private final AcknowledgementSupport ackSupport; - private final MessagingService solace; - public SolaceDiscard(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { + public SolaceDiscard(String channel, AcknowledgementSupport ackSupport) { this.channel = channel; this.ackSupport = ackSupport; - this.solace = solace; } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java index c06d925..11f2e7b 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceErrorTopic.java @@ -24,7 +24,6 @@ public class SolaceErrorTopic implements SolaceFailureHandler { private String errorTopic; private boolean dmqEligible; private Long timeToLive; - private boolean supportsNacks; public SolaceErrorTopic(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { this.channel = channel; @@ -49,10 +48,6 @@ public void setTimeToLive(Long timeToLive) { this.timeToLive = timeToLive; } - public void setSupportsNacks(boolean supportsNacks) { - this.supportsNacks = supportsNacks; - } - @Override public CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata) { PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler @@ -70,15 +65,9 @@ public CompletionStage handle(SolaceInboundMessage msg, Throwable reaso .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED)) .runSubscriptionOn(msg::runOnMessageContext) .subscribeAsCompletionStage(); - } else { - if (supportsNacks) { - return Uni.createFrom().voidItem() - .invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.FAILED)) - .runSubscriptionOn(msg::runOnMessageContext) - .subscribeAsCompletionStage(); - } } - return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO :: Restart receiver to redeliver message - needed when nacks are not supported. + return Uni.createFrom(). failure(reason) + .emitOn(msg::runOnMessageContext).subscribeAsCompletionStage(); } } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java index 96d40d7..cd6888e 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFail.java @@ -4,7 +4,6 @@ import org.eclipse.microprofile.reactive.messaging.Metadata; -import com.solace.messaging.MessagingService; import com.solace.messaging.config.MessageAcknowledgementConfiguration; import com.solace.messaging.receiver.AcknowledgementSupport; @@ -17,12 +16,9 @@ public class SolaceFail implements SolaceFailureHandler { private final String channel; private final AcknowledgementSupport ackSupport; - private final MessagingService solace; - - public SolaceFail(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { + public SolaceFail(String channel, AcknowledgementSupport ackSupport) { this.channel = channel; this.ackSupport = ackSupport; - this.solace = solace; } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java index 7635427..15eff4f 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/fault/SolaceFailureHandler.java @@ -11,9 +11,22 @@ public interface SolaceFailureHandler { enum Strategy { + /** + * Mark the message as IGNORED, will continue processing with next message. + */ IGNORE, + /** + * Mark the message as FAILED, broker will redeliver the message. + */ FAIL, + /** + * Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured + * for queue and DMQ Eligible is set on message. + */ DISCARD, + /** + * Will publish the message to configured error topic, on success the message will be acknowledged in the queue. + */ ERROR_TOPIC; public static Strategy from(String s) { @@ -34,33 +47,5 @@ public static Strategy from(String s) { } } - // private final String channel; - // private final AcknowledgementSupport ackSupport; - // - // private final MessagingService solace; - - // public SolaceFailureHandler(String channel, AcknowledgementSupport ackSupport, MessagingService solace) { - // this.channel = channel; - // this.ackSupport = ackSupport; - // this.solace = solace; - // } - CompletionStage handle(SolaceInboundMessage msg, Throwable reason, Metadata metadata); - // { - // MessageAcknowledgementConfiguration.Outcome outcome; - // if (metadata != null) { - // outcome = metadata.get(SettleMetadata.class) - // .map(SettleMetadata::getOutcome) - // .orElseGet(() -> messageOutCome /* TODO get outcome from reason */); - // } else { - // outcome = messageOutCome != null ? messageOutCome - // : MessageAcknowledgementConfiguration.Outcome.FAILED; - // } - // - // SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage()); - // return Uni.createFrom().voidItem() - // .invoke(() -> ackSupport.settle(msg.getMessage(), outcome)) - // .runSubscriptionOn(msg::runOnMessageContext) - // .subscribeAsCompletionStage(); - // } } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index f98ce88..c9e9edb 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -123,9 +123,9 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu case IGNORE: return new SolaceIgnoreFailure(ic.getChannel()); case FAIL: - return new SolaceFail(ic.getChannel(), receiver, solace); + return new SolaceFail(ic.getChannel(), receiver); case DISCARD: - return new SolaceDiscard(ic.getChannel(), receiver, solace); + return new SolaceDiscard(ic.getChannel(), receiver); case ERROR_TOPIC: SolaceErrorTopic solaceErrorTopic = new SolaceErrorTopic(ic.getChannel(), receiver, solace); if (ic.getConsumerQueueErrorTopic().isEmpty()) { @@ -135,7 +135,6 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu solaceErrorTopic.setDmqEligible(ic.getConsumerQueueErrorMessageDmqEligible().booleanValue()); solaceErrorTopic.setTimeToLive(ic.getConsumerQueueErrorMessageTtl().orElse(null)); solaceErrorTopic.setMaxDeliveryAttempts(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts()); - solaceErrorTopic.setSupportsNacks(ic.getConsumerQueueSupportsNacks()); return solaceErrorTopic; default: throw ex.illegalArgumentInvalidFailureStrategy(strategy); diff --git a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java index 0711a15..c312634 100644 --- a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java @@ -45,26 +45,26 @@ Message consumeAndPublish(SolaceInboundMessage p) { * * @param p */ - // @Incoming("dynamic-destination-in") - // @Outgoing("dynamic-destination-out") - // @Acknowledgment(Acknowledgment.Strategy.MANUAL) - // Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { - // Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); - // SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - // .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message - // .createPubSubOutboundMetadata(); - // Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { - // CompletableFuture completableFuture = new CompletableFuture(); - // p.ack(); - // completableFuture.complete(null); - // return completableFuture; - // }, (throwable) -> { - // CompletableFuture completableFuture = new CompletableFuture(); - // p.nack(throwable, p.getMetadata()); - // completableFuture.complete(null); - // return completableFuture; - // }); - // return outboundMessage; - // } + @Incoming("dynamic-destination-in") + @Outgoing("dynamic-destination-out") + @Acknowledgment(Acknowledgment.Strategy.MANUAL) + Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { + Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); + SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() + .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message + .createPubSubOutboundMetadata(); + Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { + CompletableFuture completableFuture = new CompletableFuture(); + p.ack(); + completableFuture.complete(null); + return completableFuture; + }, (throwable) -> { + CompletableFuture completableFuture = new CompletableFuture(); + p.nack(throwable, p.getMetadata()); + completableFuture.complete(null); + return completableFuture; + }); + return outboundMessage; + } } diff --git a/samples/hello-connector-solace/src/main/resources/application.properties b/samples/hello-connector-solace/src/main/resources/application.properties index 7f1fcf2..72cebfd 100644 --- a/samples/hello-connector-solace/src/main/resources/application.properties +++ b/samples/hello-connector-solace/src/main/resources/application.properties @@ -5,24 +5,19 @@ quarkus.solace.authentication.basic.password= mp.messaging.outgoing.hello-out.connector=quarkus-solace mp.messaging.outgoing.hello-out.producer.topic= -#mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait -#mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1 -#mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false mp.messaging.incoming.hello-in.connector=quarkus-solace -mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true +mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true mp.messaging.incoming.hello-in.consumer.queue.name= mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive -mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false -mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true -mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error +#mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=error_topic +#mp.messaging.incoming.hello-in.consumer.queue.error.topic= mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace -mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true +mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true mp.messaging.incoming.dynamic-destination-in.consumer.queue.name= mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive -mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false -mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true +mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=ignore mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace