Skip to content

Commit

Permalink
Added solace community license and code clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
SravanThotakura05 committed Dec 19, 2023
1 parent 4f0f188 commit f611673
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 20 deletions.
Binary file added LICENSE.pdf
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public interface SolaceLogging extends BasicLogger {
void messageSettled(String channel, String outcome, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful")
void unsuccessfulToTopic(String topic, String channel);
@Message(id = 55203, value = "Publishing error message to topic %s received from channel `%s` is unsuccessful, reason: %s")
void unsuccessfulToTopic(String topic, String channel, String reason);

@LogMessage(level = Logger.Level.ERROR)
@Message(id = 55204, value = "A exception occurred when publishing to topic %s")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceConnectorIncomingConfiguration ic;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;

private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
Expand Down Expand Up @@ -97,32 +96,45 @@ public CompletionStage<Void> ack() {

@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
if (solaceErrorTopicPublisherHandler != null) {
if (solaceErrorTopicPublisherHandler == null) {
// REJECTED - Will move message to DMQ if enabled, FAILED - Will redeliver the message.
MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks()
? (ic.getConsumerQueueDiscardMessagesOnFailure() ? MessageAcknowledgementConfiguration.Outcome.REJECTED
: MessageAcknowledgementConfiguration.Outcome.FAILED)
: null; // if nacks are not supported on broker, no outcome is required.
if (outcome != null) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata, outcome);
}
} else {
PublishReceipt publishReceipt = solaceErrorTopicPublisherHandler.handle(this, ic)
.onFailure().retry().withBackOff(Duration.ofSeconds(1))
.atMost(ic.getConsumerQueueErrorMessageMaxDeliveryAttempts())
.onFailure().transform((throwable -> {
SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel());
throw new RuntimeException(throwable); // TODO How to catch this exception in tests
}))
.await().atMost(Duration.ofSeconds(30));
.subscribeAsCompletionStage().exceptionally((t) -> {
SolaceLogging.log.unsuccessfulToTopic(ic.getConsumerQueueErrorTopic().get(), ic.getChannel(),
t.getMessage());
return null;
}).join();

if (publishReceipt != null) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata, MessageAcknowledgementConfiguration.Outcome.ACCEPTED);
} else {
if (ic.getConsumerQueueEnableNacks()) {
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
return nackHandler.handle(this, reason, nackMetadata,
MessageAcknowledgementConfiguration.Outcome.FAILED);
}
}
}

MessageAcknowledgementConfiguration.Outcome outcome = ic.getConsumerQueueEnableNacks()
&& ic.getConsumerQueueDiscardMessagesOnFailure() && solaceErrorTopicPublisherHandler == null
? MessageAcknowledgementConfiguration.Outcome.REJECTED // will move message to DMQ is enabled on queue & message
: MessageAcknowledgementConfiguration.Outcome.FAILED; // will redeliver the message
if (outcome == MessageAcknowledgementConfiguration.Outcome.REJECTED) {
this.unacknowledgedMessageTracker.decrement();
}
return ic.getConsumerQueueEnableNacks()
? nackHandler.handle(this, reason, nackMetadata, outcome)
: Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO Disconnect and reconnect the receiver in order to redeliver the message. Required when nacks are not supported by the broker version.
// decrement the tracker, as the message might get redelivered or moved to DMQ
this.unacknowledgedMessageTracker.decrement();
// return void stage if above check fail. This will not nack the message on broker.
return Uni.createFrom().voidItem().subscribeAsCompletionStage(); // TODO - Restart receiver to redeliver message, needed when nacks are not supported on broker.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
@ApplicationScoped
public class HelloConsumer {
/**
* Publish a simple string from using TryMe in Solace broker and you should see the message published to topic
* Publish a simple message using TryMe in Solace broker and you should see the message published to topic
*
* @param p
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
quarkus.solace.host=
quarkus.solace.vpn=
quarkus.solace.authentication.basic.username=
quarkus.solace.authentication.basic.password=

mp.messaging.outgoing.hello-out.connector=quarkus-solace
mp.messaging.outgoing.hello-out.producer.topic=
#mp.messaging.outgoing.hello-out.producer.back-pressure.strategy=wait
#mp.messaging.outgoing.hello-out.producer.back-pressure.buffer-capacity=1
#mp.messaging.outgoing.hello-out.producer.waitForPublishReceipt=false

mp.messaging.incoming.hello-in.connector=quarkus-solace
mp.messaging.incoming.hello-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.hello-in.consumer.queue.name=
mp.messaging.incoming.hello-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.hello-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.hello-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.hello-in.consumer.queue.error.topic=solace/quarkus/error

mp.messaging.incoming.dynamic-destination-in.connector=quarkus-solace
mp.messaging.incoming.dynamic-destination-in.consumer.queue.enable-nacks=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.name=
mp.messaging.incoming.dynamic-destination-in.consumer.queue.type=durable-exclusive
mp.messaging.incoming.dynamic-destination-in.consumer.queue.discard-messages-on-failure=false
mp.messaging.incoming.dynamic-destination-in.consumer.queue.publish-to-error-topic-on-failure=true
mp.messaging.incoming.dynamic-destination-in.consumer.queue.error.topic=solace/quarkus/error

mp.messaging.outgoing.dynamic-destination-out.connector=quarkus-solace
mp.messaging.outgoing.dynamic-destination-out.producer.topic=
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
#quarkus.solace.host=localhost:55554
#quarkus.solace.vpn=default
#quarkus.solace.authentication.basic.username=admin
#quarkus.solace.authentication.basic.password=admin

0 comments on commit f611673

Please sign in to comment.