Skip to content

Commit

Permalink
Updated documentation and removed unused constructor args
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Dec 20, 2023
1 parent cd74c09 commit e41fb74
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,15 @@ a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-sola

[.description]
--
Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore`, `fail`, `discard` (default).
Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.

`ignore` - Mark the message as IGNORED, will continue processing with next message.

`fail` - Mark the message as FAILED, broker will redeliver the message. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

`discard` - Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured for queue and DMQ Eligible is set on message otherwise message will be lost. Nacks are supported on event brokers 10.2.1 and later, so enable this strategy based on broker version.

`error_topic` - Will publish the message to configured error topic, on success the message will be acknowledged in the queue.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
@ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore`, `fail`, `discard` (default)", defaultValue = "discard")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
@ConnectorAttribute(name = "consumer.queue.error.topic", type = "string", direction = INCOMING, description = "The error topic where message should be published in case of error")
@ConnectorAttribute(name = "consumer.queue.error.message.dmq-eligible", type = "boolean", direction = INCOMING, description = "Whether error message is eligible to move to dead message queue", defaultValue = "false")
@ConnectorAttribute(name = "consumer.queue.error.message.ttl", type = "long", direction = INCOMING, description = "TTL for Error message before moving to dead message queue.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import org.eclipse.microprofile.reactive.messaging.Metadata;

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.receiver.AcknowledgementSupport;

Expand All @@ -16,12 +15,10 @@
public class SolaceDiscard implements SolaceFailureHandler {
private final String channel;
private final AcknowledgementSupport ackSupport;
private final MessagingService solace;

public SolaceDiscard(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
public SolaceDiscard(String channel, AcknowledgementSupport ackSupport) {
this.channel = channel;
this.ackSupport = ackSupport;
this.solace = solace;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class SolaceErrorTopic implements SolaceFailureHandler {
private String errorTopic;
private boolean dmqEligible;
private Long timeToLive;
private boolean supportsNacks;

public SolaceErrorTopic(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
this.channel = channel;
Expand All @@ -49,10 +48,6 @@ public void setTimeToLive(Long timeToLive) {
this.timeToLive = timeToLive;
}

public void setSupportsNacks(boolean supportsNacks) {
this.supportsNacks = supportsNacks;
}

@Override
public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata) {
PersistentMessagePublisher.PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler
Expand All @@ -70,15 +65,9 @@ public CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reaso
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.ACCEPTED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
} else {
if (supportsNacks) {
return Uni.createFrom().voidItem()
.invoke(() -> ackSupport.settle(msg.getMessage(), MessageAcknowledgementConfiguration.Outcome.FAILED))
.runSubscriptionOn(msg::runOnMessageContext)
.subscribeAsCompletionStage();
}
}

return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO :: Restart receiver to redeliver message - needed when nacks are not supported.
return Uni.createFrom().<Void> failure(reason)
.emitOn(msg::runOnMessageContext).subscribeAsCompletionStage();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

import org.eclipse.microprofile.reactive.messaging.Metadata;

import com.solace.messaging.MessagingService;
import com.solace.messaging.config.MessageAcknowledgementConfiguration;
import com.solace.messaging.receiver.AcknowledgementSupport;

Expand All @@ -17,12 +16,9 @@ public class SolaceFail implements SolaceFailureHandler {
private final String channel;
private final AcknowledgementSupport ackSupport;

private final MessagingService solace;

public SolaceFail(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
public SolaceFail(String channel, AcknowledgementSupport ackSupport) {
this.channel = channel;
this.ackSupport = ackSupport;
this.solace = solace;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,22 @@
public interface SolaceFailureHandler {

enum Strategy {
/**
* Mark the message as IGNORED, will continue processing with next message.
*/
IGNORE,
/**
* Mark the message as FAILED, broker will redeliver the message.
*/
FAIL,
/**
* Mark the message as REJECTED, broker will discard the message. The message will be moved to DMQ if DMQ is configured
* for queue and DMQ Eligible is set on message.
*/
DISCARD,
/**
* Will publish the message to configured error topic, on success the message will be acknowledged in the queue.
*/
ERROR_TOPIC;

public static Strategy from(String s) {
Expand All @@ -34,33 +47,5 @@ public static Strategy from(String s) {
}
}

// private final String channel;
// private final AcknowledgementSupport ackSupport;
//
// private final MessagingService solace;

// public SolaceFailureHandler(String channel, AcknowledgementSupport ackSupport, MessagingService solace) {
// this.channel = channel;
// this.ackSupport = ackSupport;
// this.solace = solace;
// }

CompletionStage<Void> handle(SolaceInboundMessage<?> msg, Throwable reason, Metadata metadata);
// {
// MessageAcknowledgementConfiguration.Outcome outcome;
// if (metadata != null) {
// outcome = metadata.get(SettleMetadata.class)
// .map(SettleMetadata::getOutcome)
// .orElseGet(() -> messageOutCome /* TODO get outcome from reason */);
// } else {
// outcome = messageOutCome != null ? messageOutCome
// : MessageAcknowledgementConfiguration.Outcome.FAILED;
// }
//
// SolaceLogging.log.messageSettled(channel, outcome.toString().toLowerCase(), reason.getMessage());
// return Uni.createFrom().voidItem()
// .invoke(() -> ackSupport.settle(msg.getMessage(), outcome))
// .runSubscriptionOn(msg::runOnMessageContext)
// .subscribeAsCompletionStage();
// }
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu
case IGNORE:
return new SolaceIgnoreFailure(ic.getChannel());
case FAIL:
return new SolaceFail(ic.getChannel(), receiver, solace);
return new SolaceFail(ic.getChannel(), receiver);
case DISCARD:
return new SolaceDiscard(ic.getChannel(), receiver, solace);
return new SolaceDiscard(ic.getChannel(), receiver);
case ERROR_TOPIC:
SolaceErrorTopic solaceErrorTopic = new SolaceErrorTopic(ic.getChannel(), receiver, solace);
if (ic.getConsumerQueueErrorTopic().isEmpty()) {
Expand All @@ -135,7 +135,6 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu
solaceErrorTopic.setDmqEligible(ic.getConsumerQueueErrorMessageDmqEligible().booleanValue());
solaceErrorTopic.setTimeToLive(ic.getConsumerQueueErrorMessageTtl().orElse(null));
solaceErrorTopic.setMaxDeliveryAttempts(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts());
solaceErrorTopic.setSupportsNacks(ic.getConsumerQueueSupportsNacks());
return solaceErrorTopic;
default:
throw ex.illegalArgumentInvalidFailureStrategy(strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,26 @@ Message<?> consumeAndPublish(SolaceInboundMessage<?> p) {
*
* @param p
*/
// @Incoming("dynamic-destination-in")
// @Outgoing("dynamic-destination-out")
// @Acknowledgment(Acknowledgment.Strategy.MANUAL)
// Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
// Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
// SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
// .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message
// .createPubSubOutboundMetadata();
// Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
// CompletableFuture completableFuture = new CompletableFuture();
// p.ack();
// completableFuture.complete(null);
// return completableFuture;
// }, (throwable) -> {
// CompletableFuture completableFuture = new CompletableFuture();
// p.nack(throwable, p.getMetadata());
// completableFuture.complete(null);
// return completableFuture;
// });
// return outboundMessage;
// }
@Incoming("dynamic-destination-in")
@Outgoing("dynamic-destination-out")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message
.createPubSubOutboundMetadata();
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
CompletableFuture completableFuture = new CompletableFuture();
p.ack();
completableFuture.complete(null);
return completableFuture;
}, (throwable) -> {
CompletableFuture completableFuture = new CompletableFuture();
p.nack(throwable, p.getMetadata());
completableFuture.complete(null);
return completableFuture;
});
return outboundMessage;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,19 @@ quarkus.solace.authentication.basic.password=

mp.messaging.outgoing.hello-out.connector=quarkus-solace
mp.messaging.outgoing.hello-out.producer.topic=
#mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait
#mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1
#mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false

mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.name=
mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error
#mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=error_topic
#mp.messaging.incoming.hello-in.consumer.queue.error.topic=

mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace
mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.supports-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=
mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.hello-in.consumer.queue.failure-strategy=ignore
mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error

mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace
Expand Down

0 comments on commit e41fb74

Please sign in to comment.