Skip to content

Commit

Permalink
changed default back-pressure strategy to wait, updated documentation…
Browse files Browse the repository at this point in the history
… and remove poll-wait-timeout
  • Loading branch information
SravanThotakura05 committed Dec 20, 2023
1 parent e41fb74 commit 16b74de
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The queue name of receiver.
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|string
|
| required icon:exclamation-circle[title=Configuration property is required]


a| [[quarkus-solace_quarkus.consumer.queue.type]]`link:#quarkus-solace_quarkus.consumer.queue.type[consumer.queue.type]`
Expand Down Expand Up @@ -161,23 +161,6 @@ The receiver replay replication group message id.
--|`string`
|


a| [[quarkus-solace_quarkus.consumer.queue.polled-wait-time-in-millis]]`link:#quarkus-solace_quarkus.consumer.queue.polled-wait-time-in-millis[consumer.queue.polled-wait-time-in-millis]`


[.description]
--
Maximum wait time in milliseconds for consumers to receive a message from configured queue.

// ifdef::add-copy-button-to-env-var[]
// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[]
// endif::add-copy-button-to-env-var[]
// ifndef::add-copy-button-to-env-var[]
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`integer`
| 100

a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]`


Expand All @@ -200,7 +183,7 @@ Specify the failure strategy to apply when a message consumed from Solace broker
// Environment variable: `+++QUARKUS_SOLACE+++`
// endif::add-copy-button-to-env-var[]
--|`string`
| discard
| ignore

a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ The topic to publish messages, by default the channel name.
// Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++`
// endif::add-copy-button-to-env-var[]
--|string
|
| required icon:exclamation-circle[title=Configuration property is required]


a| [[quarkus-solace_quarkus.producer.max-inflight-messages]]`link:#quarkus-solace_quarkus.producer.max-inflight-messages[producer.max-inflight-messages]`
Expand Down Expand Up @@ -111,7 +111,7 @@ Supported strategies `reject`, `elastic`, `wait`. Refer to `https://docs.solace.
// Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SERVICE_NAME+++`
// endif::add-copy-button-to-env-var[]
--|string
|`reject`
|`wait`


a| [[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.producer.back-pressure.buffer-capacity[producer.back-pressure.buffer-capacity]`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
@ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy")
@ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time")
@ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id")
@ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100")
// TODO implement consumer concurrency
//@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1")
@ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore")
Expand All @@ -65,7 +64,7 @@
@ConnectorAttribute(name = "producer.waitForPublishReceipt", type = "boolean", direction = OUTGOING, description = "Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message", defaultValue = "true")
@ConnectorAttribute(name = "producer.delivery.ack.timeout", type = "int", direction = OUTGOING, description = "Timeout to receive the publish receipt from broker.")
@ConnectorAttribute(name = "producer.delivery.ack.window.size", type = "int", direction = OUTGOING, description = "Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace.")
@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "reject")
@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "wait")
@ConnectorAttribute(name = "producer.back-pressure.buffer-capacity", type = "int", direction = OUTGOING, description = "Outgoing messages backpressure buffer capacity", defaultValue = "1024")
public class SolaceConnector implements InboundConnector, OutboundConnector, HealthReporter {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.solace.messaging.receiver.InboundMessage;

import io.netty.handler.codec.http.HttpHeaderValues;
import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration;
import io.quarkiverse.solace.fault.SolaceFailureHandler;
import io.quarkiverse.solace.i18n.SolaceLogging;
import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage;
Expand All @@ -21,19 +20,17 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final InboundMessage msg;
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler nackHandler;
private final SolaceConnectorIncomingConfiguration ic;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;
private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
SolaceConnectorIncomingConfiguration ic, IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
this.msg = message;
this.unacknowledgedMessageTracker = unacknowledgedMessageTracker;
this.payload = (T) convertPayload();
this.ackHandler = ackHandler;
this.nackHandler = nackHandler;
this.ic = ic;
this.metadata = captureContextMetadata(new SolaceInboundMetadata(message));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
this.ackHandler = new SolaceAckHandler(receiver);
this.failureHandler = createFailureHandler(ic, solace);

Integer timeout = getTimeout(ic.getConsumerQueuePolledWaitTimeInMillis());
// TODO Here use a subscription receiver.receiveAsync with an internal queue
this.pollerThread = Executors.newSingleThreadExecutor();
this.stream = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().item(timeout == null ? receiver.receiveMessage() : receiver.receiveMessage(timeout))
.uni(() -> Uni.createFrom().item(receiver::receiveMessage)
.runSubscriptionOn(pollerThread))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, ic,
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
unacknowledgedMessageTracker))
.plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
Expand Down Expand Up @@ -142,29 +141,6 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu

}

private Integer getTimeout(Integer timeoutInMillis) {
Integer realTimeout;
final Long expiry = timeoutInMillis != null
? timeoutInMillis + System.currentTimeMillis()
: null;
if (expiry != null) {
try {
realTimeout = Math.toIntExact(expiry - System.currentTimeMillis());
if (realTimeout < 0) {
realTimeout = 0;
}
} catch (ArithmeticException e) {
// Always true: expiry - System.currentTimeMillis() < timeoutInMillis
// So just set it to 0 (no-wait) if we underflow
realTimeout = 0;
}
} else {
realTimeout = null;
}

return realTimeout;
}

private static Queue getQueue(SolaceConnectorIncomingConfiguration ic) {
String queueType = ic.getConsumerQueueType();
switch (queueType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
case "elastic":
builder.onBackPressureElastic();
break;
case "wait":
builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
case "reject":
builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity());
break;
default:
builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity());
builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity());
break;
}
this.waitTimeout = oc.getClientShutdownWaitTimeout();
Expand Down
3 changes: 2 additions & 1 deletion runtime/src/main/resources/META-INF/quarkus-extension.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
name: Solace
#description: Do something useful.
description: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh.
metadata:
# keywords:
# - solace
# - pubsubplus event broker
# guide: https://quarkiverse.github.io/quarkiverse-docs/solace/dev/ # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension
# categories:
# - "miscellaneous"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ Message<?> consumeAndPublish(SolaceInboundMessage<?> p) {
Message<?> consumeAndPublishToDynamicTopic(SolaceInboundMessage<?> p) {
Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8));
SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder()
.setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message
.setApplicationMessageId("test")
.setDynamicDestination("solace/quarkus/producer/" + p.getMessage().getCorrelationId()) // make sure correlationID is available on incoming message
.createPubSubOutboundMetadata();
Message<?> outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> {
CompletableFuture completableFuture = new CompletableFuture();
Expand Down

0 comments on commit 16b74de

Please sign in to comment.