diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc index 498a628..e5489f0 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-incoming.adoc @@ -23,7 +23,7 @@ The queue name of receiver. // Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` // endif::add-copy-button-to-env-var[] --|string -| +| required icon:exclamation-circle[title=Configuration property is required] a| [[quarkus-solace_quarkus.consumer.queue.type]]`link:#quarkus-solace_quarkus.consumer.queue.type[consumer.queue.type]` @@ -161,23 +161,6 @@ The receiver replay replication group message id. --|`string` | - -a| [[quarkus-solace_quarkus.consumer.queue.polled-wait-time-in-millis]]`link:#quarkus-solace_quarkus.consumer.queue.polled-wait-time-in-millis[consumer.queue.polled-wait-time-in-millis]` - - -[.description] --- -Maximum wait time in milliseconds for consumers to receive a message from configured queue. - -// ifdef::add-copy-button-to-env-var[] -// Environment variable: env_var_with_copy_button:+++QUARKUS_SOLACE+++[] -// endif::add-copy-button-to-env-var[] -// ifndef::add-copy-button-to-env-var[] -// Environment variable: `+++QUARKUS_SOLACE+++` -// endif::add-copy-button-to-env-var[] ---|`integer` -| 100 - a| [[quarkus-solace_quarkus.consumer.queue.failure-strategy]]`link:#quarkus-solace_quarkus.consumer.queue.failure-strategy[consumer.queue.failure-strategy]` @@ -200,7 +183,7 @@ Specify the failure strategy to apply when a message consumed from Solace broker // Environment variable: `+++QUARKUS_SOLACE+++` // endif::add-copy-button-to-env-var[] --|`string` -| discard +| ignore a| [[quarkus-solace_quarkus.consumer.queue.error.topic]]`link:#quarkus-solace_quarkus.consumer.queue.error.topic[consumer.queue.error.topic]` diff --git a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc index 415c199..aa01412 100644 --- a/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc +++ b/docs/modules/ROOT/pages/includes/quarkus-solace-extension-outgoing.adoc @@ -23,7 +23,7 @@ The topic to publish messages, by default the channel name. // Environment variable: `+++QUARKUS_SOLACE_METRICS_ENABLED+++` // endif::add-copy-button-to-env-var[] --|string -| +| required icon:exclamation-circle[title=Configuration property is required] a| [[quarkus-solace_quarkus.producer.max-inflight-messages]]`link:#quarkus-solace_quarkus.producer.max-inflight-messages[producer.max-inflight-messages]` @@ -111,7 +111,7 @@ Supported strategies `reject`, `elastic`, `wait`. Refer to `https://docs.solace. // Environment variable: `+++QUARKUS_SOLACE_DEVSERVICES_SERVICE_NAME+++` // endif::add-copy-button-to-env-var[] --|string -|`reject` +|`wait` a| [[quarkus-solace_quarkus.producer.back-pressure.buffer-capacity]]`link:#quarkus-solace_quarkus.producer.back-pressure.buffer-capacity[producer.back-pressure.buffer-capacity]` diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java index 131a5cf..7b598b3 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/SolaceConnector.java @@ -50,7 +50,6 @@ @ConnectorAttribute(name = "consumer.queue.replay.strategy", type = "string", direction = INCOMING, description = "The receiver replay strategy") @ConnectorAttribute(name = "consumer.queue.replay.timebased-start-time", type = "string", direction = INCOMING, description = "The receiver replay timebased start time") @ConnectorAttribute(name = "consumer.queue.replay.replication-group-message-id", type = "string", direction = INCOMING, description = "The receiver replay replication group message id") -@ConnectorAttribute(name = "consumer.queue.polled-wait-time-in-millis", type = "int", direction = INCOMING, description = "Maximum wait time in milliseconds for consumers to receive a message from configured queue", defaultValue = "100") // TODO implement consumer concurrency //@ConnectorAttribute(name = "consumer.queue.concurrency", type = "int", direction = INCOMING, description = "The number of concurrent consumers", defaultValue = "1") @ConnectorAttribute(name = "consumer.queue.failure-strategy", type = "string", direction = INCOMING, description = "Specify the failure strategy to apply when a message consumed from Solace broker is nacked. Accepted values are `ignore` (default), `fail`, `discard`, `error_topic`.", defaultValue = "ignore") @@ -65,7 +64,7 @@ @ConnectorAttribute(name = "producer.waitForPublishReceipt", type = "boolean", direction = OUTGOING, description = "Whether the client waits to receive the publish receipt from Solace broker before acknowledging the message", defaultValue = "true") @ConnectorAttribute(name = "producer.delivery.ack.timeout", type = "int", direction = OUTGOING, description = "Timeout to receive the publish receipt from broker.") @ConnectorAttribute(name = "producer.delivery.ack.window.size", type = "int", direction = OUTGOING, description = "Publish Window will determine the maximum number of messages the application can send before the Solace API must receive an acknowledgment from the Solace.") -@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "reject") +@ConnectorAttribute(name = "producer.back-pressure.strategy", type = "string", direction = OUTGOING, description = "It is possible for the client application to publish messages more quickly than the API can send them to the broker due to network congestion or connectivity issues. This delay can cause the internal buffer to accumulate messages until it reaches its capacity, preventing the API from storing any more messages.", defaultValue = "wait") @ConnectorAttribute(name = "producer.back-pressure.buffer-capacity", type = "int", direction = OUTGOING, description = "Outgoing messages backpressure buffer capacity", defaultValue = "1024") public class SolaceConnector implements InboundConnector, OutboundConnector, HealthReporter { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java index 2f2de78..151dfec 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java @@ -9,7 +9,6 @@ import com.solace.messaging.receiver.InboundMessage; import io.netty.handler.codec.http.HttpHeaderValues; -import io.quarkiverse.solace.SolaceConnectorIncomingConfiguration; import io.quarkiverse.solace.fault.SolaceFailureHandler; import io.quarkiverse.solace.i18n.SolaceLogging; import io.smallrye.reactive.messaging.providers.MetadataInjectableMessage; @@ -21,19 +20,17 @@ public class SolaceInboundMessage implements ContextAwareMessage, Metadata private final InboundMessage msg; private final SolaceAckHandler ackHandler; private final SolaceFailureHandler nackHandler; - private final SolaceConnectorIncomingConfiguration ic; private final T payload; private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker; private Metadata metadata; public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler, - SolaceConnectorIncomingConfiguration ic, IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { + IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { this.msg = message; this.unacknowledgedMessageTracker = unacknowledgedMessageTracker; this.payload = (T) convertPayload(); this.ackHandler = ackHandler; this.nackHandler = nackHandler; - this.ic = ic; this.metadata = captureContextMetadata(new SolaceInboundMetadata(message)); } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index c9e9edb..9ebe065 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -98,15 +98,14 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i this.ackHandler = new SolaceAckHandler(receiver); this.failureHandler = createFailureHandler(ic, solace); - Integer timeout = getTimeout(ic.getConsumerQueuePolledWaitTimeInMillis()); // TODO Here use a subscription receiver.receiveAsync with an internal queue this.pollerThread = Executors.newSingleThreadExecutor(); this.stream = Multi.createBy().repeating() - .uni(() -> Uni.createFrom().item(timeout == null ? receiver.receiveMessage() : receiver.receiveMessage(timeout)) + .uni(() -> Uni.createFrom().item(receiver::receiveMessage) .runSubscriptionOn(pollerThread)) .until(__ -> closed.get()) .emitOn(context::runOnContext) - .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, ic, + .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, unacknowledgedMessageTracker)) .plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) @@ -142,29 +141,6 @@ private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfigu } - private Integer getTimeout(Integer timeoutInMillis) { - Integer realTimeout; - final Long expiry = timeoutInMillis != null - ? timeoutInMillis + System.currentTimeMillis() - : null; - if (expiry != null) { - try { - realTimeout = Math.toIntExact(expiry - System.currentTimeMillis()); - if (realTimeout < 0) { - realTimeout = 0; - } - } catch (ArithmeticException e) { - // Always true: expiry - System.currentTimeMillis() < timeoutInMillis - // So just set it to 0 (no-wait) if we underflow - realTimeout = 0; - } - } else { - realTimeout = null; - } - - return realTimeout; - } - private static Queue getQueue(SolaceConnectorIncomingConfiguration ic) { String queueType = ic.getConsumerQueueType(); switch (queueType) { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 8909afd..ae8f794 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -49,11 +49,11 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o case "elastic": builder.onBackPressureElastic(); break; - case "wait": - builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); + case "reject": + builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity()); break; default: - builder.onBackPressureReject(oc.getProducerBackPressureBufferCapacity()); + builder.onBackPressureWait(oc.getProducerBackPressureBufferCapacity()); break; } this.waitTimeout = oc.getClientShutdownWaitTimeout(); diff --git a/runtime/src/main/resources/META-INF/quarkus-extension.yaml b/runtime/src/main/resources/META-INF/quarkus-extension.yaml index b72b827..61b8e95 100644 --- a/runtime/src/main/resources/META-INF/quarkus-extension.yaml +++ b/runtime/src/main/resources/META-INF/quarkus-extension.yaml @@ -1,8 +1,9 @@ name: Solace -#description: Do something useful. +description: Solace Quarkus Extension for integrating with Solace PubSub+ message brokers. The extension provides the ability to publish or consume events from event mesh. metadata: # keywords: # - solace +# - pubsubplus event broker # guide: https://quarkiverse.github.io/quarkiverse-docs/solace/dev/ # To create and publish this guide, see https://github.com/quarkiverse/quarkiverse/wiki#documenting-your-extension # categories: # - "miscellaneous" diff --git a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java index c312634..63fb943 100644 --- a/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java +++ b/samples/hello-connector-solace/src/main/java/io/quarkiverse/solace/samples/HelloConsumer.java @@ -51,7 +51,8 @@ Message consumeAndPublish(SolaceInboundMessage p) { Message consumeAndPublishToDynamicTopic(SolaceInboundMessage p) { Log.infof("Received message: %s", new String(p.getMessage().getPayloadAsBytes(), StandardCharsets.UTF_8)); SolaceOutboundMetadata outboundMetadata = SolaceOutboundMetadata.builder() - .setApplicationMessageId("test").setDynamicDestination("hello/foobar/" + p.getMessage().getSenderId()) // make sure senderId is available on incoming message + .setApplicationMessageId("test") + .setDynamicDestination("solace/quarkus/producer/" + p.getMessage().getCorrelationId()) // make sure correlationID is available on incoming message .createPubSubOutboundMetadata(); Message outboundMessage = Message.of(p.getPayload(), Metadata.of(outboundMetadata), () -> { CompletableFuture completableFuture = new CompletableFuture();