From 9b1725b13f97a01e937e444e6c2ea33b8313d3e2 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 7 Feb 2024 17:19:11 +0530
Subject: [PATCH 01/15] Implemented Quarkus Client Side Tracing
---
.../runtime/pom.xml | 5 +
.../quarkus/messaging/SolaceConnector.java | 1 +
.../incoming/SolaceIncomingChannel.java | 44 ++-
.../outgoing/SolaceOutgoingChannel.java | 38 ++-
.../tracing/SolaceAttributeExtractor.java | 72 +++++
.../SolaceOpenTelemetryInstrumenter.java | 60 ++++
.../messaging/tracing/SolaceTrace.java | 101 +++++++
.../tracing/SolaceTraceTextMapGetter.java | 27 ++
.../tracing/SolaceTraceTextMapSetter.java | 17 ++
.../tracing/TracingPropogationTest.java | 257 ++++++++++++++++++
10 files changed, 612 insertions(+), 10 deletions(-)
create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceOpenTelemetryInstrumenter.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTrace.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapGetter.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapSetter.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
diff --git a/quarkus-solace-messaging-connector/runtime/pom.xml b/quarkus-solace-messaging-connector/runtime/pom.xml
index 4ba4c00..9a286f0 100644
--- a/quarkus-solace-messaging-connector/runtime/pom.xml
+++ b/quarkus-solace-messaging-connector/runtime/pom.xml
@@ -23,6 +23,11 @@
io.smallrye.reactive
smallrye-connector-attribute-processor
+
+ io.smallrye.reactive
+ smallrye-reactive-messaging-otel
+ 4.16.0
+
com.solace
solace-messaging-client
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
index 6badc57..289408f 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/SolaceConnector.java
@@ -37,6 +37,7 @@
//@ConnectorAttribute(name = "client.type", type = "string", direction = INCOMING_AND_OUTGOING, description = "Direct or persisted", defaultValue = "persisted")
@ConnectorAttribute(name = "client.lazy.start", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether the receiver or publisher is started at initialization or lazily at subscription time", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to shutdown client gracefully", defaultValue = "true")
+@ConnectorAttribute(name = "client.tracing-enabled", type = "boolean", direction = INCOMING_AND_OUTGOING, description = "Whether to enable tracing for incoming and outgoing messages", defaultValue = "false")
@ConnectorAttribute(name = "client.graceful-shutdown.wait-timeout", type = "long", direction = INCOMING_AND_OUTGOING, description = "Timeout in milliseconds to wait for messages to finish processing before shutdown", defaultValue = "10000")
@ConnectorAttribute(name = "consumer.queue.name", type = "string", direction = INCOMING, description = "The queue name of receiver.")
@ConnectorAttribute(name = "consumer.queue.type", type = "string", direction = INCOMING, description = "The queue type of receiver. Supported values `durable-exclusive`, `durable-non-exclusive`, `non-durable-exclusive`", defaultValue = "durable-exclusive")
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
index 634a05a..6f61982 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
@@ -22,6 +22,7 @@
import com.solace.messaging.config.MissingResourcesCreationConfiguration.MissingResourcesCreationStrategy;
import com.solace.messaging.config.ReceiverActivationPassivationConfiguration;
import com.solace.messaging.config.ReplayStrategy;
+import com.solace.messaging.config.SolaceConstants;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.receiver.PersistentMessageReceiver;
import com.solace.messaging.resources.Queue;
@@ -29,6 +30,8 @@
import com.solace.quarkus.messaging.SolaceConnectorIncomingConfiguration;
import com.solace.quarkus.messaging.fault.*;
import com.solace.quarkus.messaging.i18n.SolaceLogging;
+import com.solace.quarkus.messaging.tracing.SolaceOpenTelemetryInstrumenter;
+import com.solace.quarkus.messaging.tracing.SolaceTrace;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.Uni;
@@ -51,6 +54,7 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final List failures = new ArrayList<>();
+ private final SolaceOpenTelemetryInstrumenter solaceOpenTelemetryInstrumenter;
private volatile MessagingService solace;
// Assuming we won't ever exceed the limit of an unsigned long...
@@ -107,19 +111,47 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
// TODO Here use a subscription receiver.receiveAsync with an internal queue
this.pollerThread = Executors.newSingleThreadExecutor();
- this.stream = Multi.createBy().repeating()
+
+ Multi extends Message>> incomingMulti = Multi.createBy().repeating()
.uni(() -> Uni.createFrom().item(receiver::receiveMessage)
.runSubscriptionOn(pollerThread))
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
- unacknowledgedMessageTracker, this::reportFailure))
- .plug(m -> lazyStart
- ? m.onSubscription()
- .call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
- : m)
+ unacknowledgedMessageTracker, this::reportFailure));
+
+ if (ic.getClientTracingEnabled()) {
+ solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForIncoming();
+ incomingMulti = incomingMulti.map(message -> {
+ InboundMessage consumedMessage = message.getMetadata(SolaceInboundMetadata.class).get().getMessage();
+ SolaceTrace solaceTrace = new SolaceTrace.Builder()
+ .withDestinationKind("queue")
+ .withTopic(consumedMessage.getDestinationName())
+ .withMessageID(consumedMessage.getApplicationMessageId())
+ .withCorrelationID(consumedMessage.getCorrelationId())
+ .withPartitionKey(
+ consumedMessage
+ .hasProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
+ ? consumedMessage
+ .getProperty(
+ SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
+ : null)
+ .withPayloadSize(Long.valueOf(consumedMessage.getPayloadAsBytes().length))
+ .withProperties(consumedMessage.getProperties())
+ .build();
+ return solaceOpenTelemetryInstrumenter.traceIncoming(message, solaceTrace, true);
+ });
+ } else {
+ solaceOpenTelemetryInstrumenter = null;
+ }
+
+ this.stream = incomingMulti.plug(m -> lazyStart
+ ? m.onSubscription()
+ .call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
+ : m)
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(this::reportFailure);
+
if (!lazyStart) {
receiver.start();
}
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
index 1b4d353..df8ad1f 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/outgoing/SolaceOutgoingChannel.java
@@ -22,6 +22,8 @@
import com.solace.messaging.resources.Topic;
import com.solace.quarkus.messaging.SolaceConnectorOutgoingConfiguration;
import com.solace.quarkus.messaging.i18n.SolaceLogging;
+import com.solace.quarkus.messaging.tracing.SolaceOpenTelemetryInstrumenter;
+import com.solace.quarkus.messaging.tracing.SolaceTrace;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.smallrye.mutiny.Uni;
@@ -44,6 +46,7 @@ public class SolaceOutgoingChannel
private final long gracefulShutdownWaitTimeout;
private final AtomicBoolean alive = new AtomicBoolean(false);
private final List failures = new ArrayList<>();
+ private final SolaceOpenTelemetryInstrumenter solaceOpenTelemetryInstrumenter;
private volatile boolean isPublisherReady = true;
private volatile MessagingService solace;
@@ -75,8 +78,14 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
}
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
+ if (oc.getClientTracingEnabled()) {
+ solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForOutgoing();
+ } else {
+ solaceOpenTelemetryInstrumenter = null;
+ }
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
- m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke(this::reportFailure));
+ m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt(), oc.getClientTracingEnabled()).onFailure()
+ .invoke(this::reportFailure));
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
@@ -91,10 +100,11 @@ public void ready() {
});
}
- private Uni sendMessage(MessagingService solace, Message> m, boolean waitForPublishReceipt) {
+ private Uni sendMessage(MessagingService solace, Message> m, boolean waitForPublishReceipt,
+ boolean isTracingEnabled) {
// TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true
- return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt)
+ return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt, isTracingEnabled)
.onItem().transformToUni(receipt -> {
alive.set(true);
if (receipt != null) {
@@ -118,7 +128,7 @@ private synchronized void reportFailure(Throwable throwable) {
}
private Uni publishMessage(PersistentMessagePublisher publisher, Message> m,
- OutboundMessageBuilder msgBuilder, boolean waitForPublishReceipt) {
+ OutboundMessageBuilder msgBuilder, boolean waitForPublishReceipt, boolean isTracingEnabled) {
publishedMessagesTracker.increment();
AtomicReference topic = new AtomicReference<>(this.topic);
OutboundMessage outboundMessage;
@@ -159,6 +169,7 @@ private Uni publishMessage(PersistentMessagePublisher publisher,
topic.set(Topic.of(metadata.getDynamicDestination()));
}
});
+
Object payload = m.getPayload();
if (payload instanceof OutboundMessage) {
outboundMessage = (OutboundMessage) payload;
@@ -173,6 +184,25 @@ private Uni publishMessage(PersistentMessagePublisher publisher,
.withHTTPContentHeader(HttpHeaderValues.APPLICATION_JSON.toString(), "")
.build(Json.encode(payload));
}
+
+ if (isTracingEnabled) {
+ SolaceTrace solaceTrace = new SolaceTrace.Builder()
+ .withDestinationKind("topic")
+ .withTopic(topic.get().getName())
+ .withMessageID(outboundMessage.getApplicationMessageId())
+ .withCorrelationID(outboundMessage.getCorrelationId())
+ .withPartitionKey(
+ outboundMessage
+ .hasProperty(SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
+ ? outboundMessage
+ .getProperty(
+ SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
+ : null)
+ .withPayloadSize(Long.valueOf(outboundMessage.getPayloadAsBytes().length))
+ .withProperties(outboundMessage.getProperties()).build();
+ solaceOpenTelemetryInstrumenter.traceOutgoing(m, solaceTrace);
+ }
+
return Uni.createFrom(). emitter(e -> {
boolean exitExceptionally = false;
try {
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
new file mode 100644
index 0000000..18366d9
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
@@ -0,0 +1,72 @@
+package com.solace.quarkus.messaging.tracing;
+
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+
+public class SolaceAttributeExtractor implements AttributesExtractor {
+ private final MessagingAttributesGetter messagingAttributesGetter;
+
+ public SolaceAttributeExtractor() {
+ this.messagingAttributesGetter = new SolaceMessagingAttributesGetter();
+ }
+
+ @Override
+ public void onStart(AttributesBuilder attributesBuilder, Context context, SolaceTrace solaceTrace) {
+ attributesBuilder.put("messaging.solace.partition_number", solaceTrace.getPartitionKey());
+ }
+
+ @Override
+ public void onEnd(AttributesBuilder attributesBuilder, Context context, SolaceTrace solaceTrace, Void unused,
+ Throwable throwable) {
+
+ }
+
+ public MessagingAttributesGetter getMessagingAttributesGetter() {
+ return messagingAttributesGetter;
+ }
+
+ private static final class SolaceMessagingAttributesGetter implements MessagingAttributesGetter {
+ @Override
+ public String getSystem(final SolaceTrace solaceTrace) {
+ return "solace";
+ }
+
+ @Override
+ public String getDestinationKind(SolaceTrace solaceTrace) {
+ return solaceTrace.getDestinationKind();
+ }
+
+ @Override
+ public String getDestination(final SolaceTrace solaceTrace) {
+ return solaceTrace.getTopic();
+ }
+
+ @Override
+ public boolean isTemporaryDestination(final SolaceTrace solaceTrace) {
+ return false;
+ }
+
+ @Override
+ public String getConversationId(final SolaceTrace solaceTrace) {
+ return solaceTrace.getCorrelationId();
+ }
+
+ @Override
+ public Long getMessagePayloadSize(final SolaceTrace solaceTrace) {
+ return solaceTrace.getPayloadSize();
+ }
+
+ @Override
+ public Long getMessagePayloadCompressedSize(final SolaceTrace solaceTrace) {
+ return null;
+ }
+
+ @Override
+ public String getMessageId(final SolaceTrace solaceTrace, final Void unused) {
+ return solaceTrace.getMessageId();
+ }
+
+ }
+}
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceOpenTelemetryInstrumenter.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceOpenTelemetryInstrumenter.java
new file mode 100644
index 0000000..4c16fcf
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceOpenTelemetryInstrumenter.java
@@ -0,0 +1,60 @@
+package com.solace.quarkus.messaging.tracing;
+
+import org.eclipse.microprofile.reactive.messaging.Message;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
+import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
+import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
+import io.smallrye.reactive.messaging.tracing.TracingUtils;
+
+public class SolaceOpenTelemetryInstrumenter {
+
+ private final Instrumenter instrumenter;
+
+ public SolaceOpenTelemetryInstrumenter(Instrumenter instrumenter) {
+ this.instrumenter = instrumenter;
+ }
+
+ public static SolaceOpenTelemetryInstrumenter createForIncoming() {
+ return createInstrumenter(true);
+ }
+
+ public static SolaceOpenTelemetryInstrumenter createForOutgoing() {
+ return createInstrumenter(false);
+ }
+
+ private static SolaceOpenTelemetryInstrumenter createInstrumenter(boolean incoming) {
+ MessageOperation messageOperation = incoming ? MessageOperation.RECEIVE : MessageOperation.SEND;
+
+ SolaceAttributeExtractor myExtractor = new SolaceAttributeExtractor();
+ MessagingAttributesGetter attributesGetter = myExtractor.getMessagingAttributesGetter();
+ var spanNameExtractor = MessagingSpanNameExtractor.create(attributesGetter, messageOperation);
+ InstrumenterBuilder builder = Instrumenter.builder(GlobalOpenTelemetry.get(),
+ "io.smallrye.reactive.messaging", spanNameExtractor);
+ var attributesExtractor = MessagingAttributesExtractor.create(attributesGetter, messageOperation);
+
+ builder
+ .addAttributesExtractor(attributesExtractor)
+ .addAttributesExtractor(myExtractor);
+
+ if (incoming) {
+ return new SolaceOpenTelemetryInstrumenter(builder.buildConsumerInstrumenter(SolaceTraceTextMapGetter.INSTANCE));
+ } else {
+ return new SolaceOpenTelemetryInstrumenter(builder.buildProducerInstrumenter(SolaceTraceTextMapSetter.INSTANCE));
+ }
+ }
+ //
+
+ public Message> traceIncoming(Message> message, SolaceTrace myTrace, boolean makeCurrent) {
+ return TracingUtils.traceIncoming(instrumenter, message, myTrace, makeCurrent);
+ }
+
+ public void traceOutgoing(Message> message, SolaceTrace myTrace) {
+ TracingUtils.traceOutgoing(instrumenter, message, myTrace);
+ }
+
+}
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTrace.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTrace.java
new file mode 100644
index 0000000..cc3dba7
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTrace.java
@@ -0,0 +1,101 @@
+package com.solace.quarkus.messaging.tracing;
+
+import java.util.Map;
+
+public class SolaceTrace {
+ private final String destinationKind;
+ private final String topic;
+ private final String messageId;
+ private final String correlationId;
+ private final String partitionKey;
+ private final Long payloadSize;
+ private final Map messageProperties;
+
+ private SolaceTrace(String destinationKind, String topic, String messageId, String correlationId, String partitionKey,
+ Long payloadSize, Map messageProperties) {
+ this.destinationKind = destinationKind;
+ this.topic = topic;
+ this.messageId = messageId;
+ this.correlationId = correlationId;
+ this.partitionKey = partitionKey;
+ this.payloadSize = payloadSize;
+ this.messageProperties = messageProperties;
+ }
+
+ public String getDestinationKind() {
+ return destinationKind;
+ }
+
+ public String getTopic() {
+ return topic;
+ }
+
+ public String getMessageId() {
+ return messageId;
+ }
+
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ public String getPartitionKey() {
+ return partitionKey;
+ }
+
+ public Long getPayloadSize() {
+ return payloadSize;
+ }
+
+ public Map getMessageProperties() {
+ return messageProperties;
+ }
+
+ public static class Builder {
+ private String destinationKind;
+ private String topic;
+ private String messageId;
+ private String correlationId;
+ private String partitionKey;
+ private Long payloadSize;
+ private Map properties;
+
+ public Builder withDestinationKind(String destinationKind) {
+ this.destinationKind = destinationKind;
+ return this;
+ }
+
+ public Builder withTopic(String topic) {
+ this.topic = topic;
+ return this;
+ }
+
+ public Builder withMessageID(String messageId) {
+ this.messageId = messageId;
+ return this;
+ }
+
+ public Builder withCorrelationID(String correlationId) {
+ this.correlationId = correlationId;
+ return this;
+ }
+
+ public Builder withPartitionKey(String partitionKey) {
+ this.partitionKey = partitionKey;
+ return this;
+ }
+
+ public Builder withPayloadSize(Long payloadSize) {
+ this.payloadSize = payloadSize;
+ return this;
+ }
+
+ public Builder withProperties(Map properties) {
+ this.properties = properties;
+ return this;
+ }
+
+ public SolaceTrace build() {
+ return new SolaceTrace(destinationKind, topic, messageId, correlationId, partitionKey, payloadSize, properties);
+ }
+ }
+}
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapGetter.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapGetter.java
new file mode 100644
index 0000000..b4e1c60
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapGetter.java
@@ -0,0 +1,27 @@
+package com.solace.quarkus.messaging.tracing;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import io.opentelemetry.context.propagation.TextMapGetter;
+
+public enum SolaceTraceTextMapGetter implements TextMapGetter {
+ INSTANCE;
+
+ @Override
+ public Iterable keys(SolaceTrace carrier) {
+ Map headers = carrier.getMessageProperties();
+ return new ArrayList<>(headers.keySet());
+ }
+
+ @Override
+ public String get(final SolaceTrace carrier, final String key) {
+ if (carrier != null) {
+ Map properties = carrier.getMessageProperties();
+ if (properties != null) {
+ return properties.get(key);
+ }
+ }
+ return null;
+ }
+}
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapSetter.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapSetter.java
new file mode 100644
index 0000000..327f583
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceTraceTextMapSetter.java
@@ -0,0 +1,17 @@
+package com.solace.quarkus.messaging.tracing;
+
+import java.util.Map;
+
+import io.opentelemetry.context.propagation.TextMapSetter;
+
+public enum SolaceTraceTextMapSetter implements TextMapSetter {
+ INSTANCE;
+
+ @Override
+ public void set(SolaceTrace carrier, String key, String value) {
+ if (carrier != null) {
+ Map properties = carrier.getMessageProperties();
+ properties.put(key, value);
+ }
+ }
+}
diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
new file mode 100644
index 0000000..e7adf57
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
@@ -0,0 +1,257 @@
+package com.solace.quarkus.messaging.tracing;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.stream.Collectors;
+
+import com.solace.messaging.publisher.OutboundMessage;
+import com.solace.messaging.receiver.InboundMessage;
+import com.solace.messaging.receiver.PersistentMessageReceiver;
+import com.solace.messaging.resources.Queue;
+import com.solace.messaging.resources.TopicSubscription;
+import com.solace.quarkus.messaging.SolaceProcessorTest;
+import com.solace.quarkus.messaging.SolacePublisherTest;
+import io.smallrye.mutiny.Multi;
+import jakarta.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.Outgoing;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import com.solace.messaging.publisher.PersistentMessagePublisher;
+import com.solace.messaging.resources.Topic;
+import com.solace.quarkus.messaging.base.WeldTestBase;
+import com.solace.quarkus.messaging.incoming.SolaceInboundMetadata;
+
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
+import io.opentelemetry.context.propagation.ContextPropagators;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.common.CompletableResultCode;
+import io.opentelemetry.sdk.testing.exporter.InMemorySpanExporter;
+import io.opentelemetry.sdk.trace.SdkTracerProvider;
+import io.opentelemetry.sdk.trace.SpanProcessor;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
+import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
+
+public class TracingPropogationTest extends WeldTestBase {
+ private SdkTracerProvider tracerProvider;
+ private InMemorySpanExporter spanExporter;
+
+ @BeforeEach
+ public void setup() {
+ GlobalOpenTelemetry.resetForTest();
+
+ spanExporter = InMemorySpanExporter.create();
+ SpanProcessor spanProcessor = SimpleSpanProcessor.create(spanExporter);
+
+ tracerProvider = SdkTracerProvider.builder()
+ .addSpanProcessor(spanProcessor)
+ .setSampler(Sampler.alwaysOn())
+ .build();
+
+ OpenTelemetrySdk.builder()
+ .setPropagators(ContextPropagators.create(W3CTraceContextPropagator.getInstance()))
+ .setTracerProvider(tracerProvider)
+ .buildAndRegisterGlobal();
+ }
+
+ @AfterAll
+ static void shutdown() {
+ GlobalOpenTelemetry.resetForTest();
+ }
+
+ @Test
+ void consumer() {
+ MapBasedConfig config = commonConfig()
+ .with("mp.messaging.incoming.in.connector", "quarkus-solace")
+ .with("mp.messaging.incoming.in.client.tracing-enabled", "true")
+ .with("mp.messaging.incoming.in.consumer.queue.name", queue)
+ .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
+ .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
+ .with("mp.messaging.incoming.in.consumer.queue.subscriptions", "quarkus/integration/test/replay/messages");
+
+ // Run app that consumes messages
+ MyConsumer app = runApplication(config, MyConsumer.class);
+
+ // Produce messages
+ PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
+ .build()
+ .start();
+ Topic tp = Topic.of("quarkus/integration/test/replay/messages");
+ publisher.publish("1", tp);
+ publisher.publish("2", tp);
+ publisher.publish("3", tp);
+ publisher.publish("4", tp);
+ publisher.publish("5", tp);
+
+ // Assert on published messages
+ await().untilAsserted(() -> assertThat(app.getReceived()).contains("1", "2", "3", "4", "5"));
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.whenComplete(() -> {
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(5, spans.size());
+
+ assertEquals(5, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
+
+ SpanData span = spans.get(0);
+ assertEquals(SpanKind.CONSUMER, span.getKind());
+ });
+ }
+
+ @Test
+ void publisher() {
+ MapBasedConfig config = commonConfig()
+ .with("mp.messaging.outgoing.out.connector", "quarkus-solace")
+ .with("mp.messaging.outgoing.out.client.tracing-enabled", "true")
+ .with("mp.messaging.outgoing.out.producer.topic", topic);
+
+ List expected = new CopyOnWriteArrayList<>();
+
+ // Start listening first
+ PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
+ .withSubscriptions(TopicSubscription.of(topic))
+ .build(Queue.nonDurableExclusiveQueue());
+ receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString()));
+ receiver.start();
+
+ // Run app that publish messages
+ MyApp app = runApplication(config, MyApp.class);
+ // Assert on published messages
+ await().untilAsserted(() -> assertThat(app.getAcked()).contains("1", "2", "3", "4", "5"));
+ // Assert on received messages
+ await().untilAsserted(() -> assertThat(expected).contains("1", "2", "3", "4", "5"));
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.whenComplete(() -> {
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(5, spans.size());
+
+ assertEquals(5, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
+
+ SpanData span = spans.get(0);
+ assertEquals(SpanKind.PRODUCER, span.getKind());
+ });
+ }
+
+ @Test
+ void processor() {
+ String processedTopic = topic + "/processed";
+ MapBasedConfig config = commonConfig()
+ .with("mp.messaging.incoming.in.connector", "quarkus-solace")
+ .with("mp.messaging.incoming.in.client.tracing-enabled", "true")
+ .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
+ .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
+ .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic)
+ .with("mp.messaging.outgoing.out.connector", "quarkus-solace")
+ .with("mp.messaging.outgoing.out.client.tracing-enabled", "true")
+ .with("mp.messaging.outgoing.out.producer.topic", processedTopic);
+
+ // Run app that processes messages
+ MyProcessor app = runApplication(config, MyProcessor.class);
+
+ List expected = new CopyOnWriteArrayList<>();
+
+ // Start listening processed messages
+ PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder()
+ .withSubscriptions(TopicSubscription.of(processedTopic))
+ .build(Queue.nonDurableExclusiveQueue());
+ receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString()));
+ receiver.start();
+
+ // Produce messages
+ PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
+ .build()
+ .start();
+ Topic tp = Topic.of(topic);
+ publisher.publish("1", tp);
+ publisher.publish("2", tp);
+ publisher.publish("3", tp);
+ publisher.publish("4", tp);
+ publisher.publish("5", tp);
+
+ // Assert on received messages
+ await().untilAsserted(() -> assertThat(app.getReceived()).contains("1", "2", "3", "4", "5"));
+ // Assert on processed messages
+ await().untilAsserted(() -> assertThat(expected).contains("1", "2", "3", "4", "5"));
+
+ CompletableResultCode completableResultCode = tracerProvider.forceFlush();
+ completableResultCode.whenComplete(() -> {
+ List spans = spanExporter.getFinishedSpanItems();
+ assertEquals(10, spans.size());
+
+ assertEquals(5, spans.stream().map(SpanData::getTraceId).collect(Collectors.toSet()).size());
+
+ SpanData span = spans.get(0);
+ assertEquals(SpanKind.CONSUMER, span.getKind());
+
+ span = spans.get(5);
+ assertEquals(SpanKind.PRODUCER, span.getKind());
+ });
+ }
+
+ @ApplicationScoped
+ static class MyConsumer {
+ private final List received = new CopyOnWriteArrayList<>();
+
+ @Incoming("in")
+ CompletionStage in(Message msg) {
+ SolaceInboundMetadata solaceInboundMetadata = msg.getMetadata(SolaceInboundMetadata.class).orElseThrow();
+ received.add(solaceInboundMetadata.getPayloadAsString());
+ return msg.ack();
+ }
+
+ public List getReceived() {
+ return received;
+ }
+ }
+
+ @ApplicationScoped
+ static class MyApp {
+ private final List acked = new CopyOnWriteArrayList<>();
+
+ @Outgoing("out")
+ Multi> out() {
+
+ return Multi.createFrom().items("1", "2", "3", "4", "5")
+ .map(payload -> Message.of(payload).withAck(() -> {
+ acked.add(payload);
+ return CompletableFuture.completedFuture(null);
+ }));
+ }
+
+ public List getAcked() {
+ return acked;
+ }
+ }
+
+ @ApplicationScoped
+ static class MyProcessor {
+ private final List received = new CopyOnWriteArrayList<>();
+
+ @Incoming("in")
+ @Outgoing("out")
+ OutboundMessage in(InboundMessage msg) {
+ String payload = msg.getPayloadAsString();
+ received.add(payload);
+ return messagingService.messageBuilder().build(payload);
+ }
+
+ public List getReceived() {
+ return received;
+ }
+ }
+}
From 3b94300ec689ff175ebadb5c21c116a5b206d557 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Wed, 7 Feb 2024 17:20:17 +0530
Subject: [PATCH 02/15] Code formatting
---
.../messaging/tracing/TracingPropogationTest.java | 14 ++++++--------
1 file changed, 6 insertions(+), 8 deletions(-)
diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
index e7adf57..9455b73 100644
--- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
+++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/tracing/TracingPropogationTest.java
@@ -10,14 +10,6 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.stream.Collectors;
-import com.solace.messaging.publisher.OutboundMessage;
-import com.solace.messaging.receiver.InboundMessage;
-import com.solace.messaging.receiver.PersistentMessageReceiver;
-import com.solace.messaging.resources.Queue;
-import com.solace.messaging.resources.TopicSubscription;
-import com.solace.quarkus.messaging.SolaceProcessorTest;
-import com.solace.quarkus.messaging.SolacePublisherTest;
-import io.smallrye.mutiny.Multi;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.reactive.messaging.Incoming;
@@ -27,8 +19,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import com.solace.messaging.publisher.OutboundMessage;
import com.solace.messaging.publisher.PersistentMessagePublisher;
+import com.solace.messaging.receiver.InboundMessage;
+import com.solace.messaging.receiver.PersistentMessageReceiver;
+import com.solace.messaging.resources.Queue;
import com.solace.messaging.resources.Topic;
+import com.solace.messaging.resources.TopicSubscription;
import com.solace.quarkus.messaging.base.WeldTestBase;
import com.solace.quarkus.messaging.incoming.SolaceInboundMetadata;
@@ -44,6 +41,7 @@
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor;
import io.opentelemetry.sdk.trace.samplers.Sampler;
+import io.smallrye.mutiny.Multi;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
public class TracingPropogationTest extends WeldTestBase {
From fbf01560dd2786be99c5ce3a62d8b3516a162b51 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Mon, 12 Feb 2024 20:45:26 +0530
Subject: [PATCH 03/15] changed system name in tracing
---
.../quarkus/messaging/tracing/SolaceAttributeExtractor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
index 18366d9..0b4428c 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/tracing/SolaceAttributeExtractor.java
@@ -30,7 +30,7 @@ public MessagingAttributesGetter getMessagingAttributesGetter
private static final class SolaceMessagingAttributesGetter implements MessagingAttributesGetter {
@Override
public String getSystem(final SolaceTrace solaceTrace) {
- return "solace";
+ return "SolacePubSub+";
}
@Override
From 843c7fb7fde54ddba3014fd98470ba93bafa90d0 Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Tue, 20 Feb 2024 18:14:36 +0530
Subject: [PATCH 04/15] Configured tracing in test broker and added user
properties
---
.../messaging/incoming/SolaceIncomingChannel.java | 14 ++++++++++----
.../quarkus/messaging/base/SolaceContainer.java | 10 ++++++++++
2 files changed, 20 insertions(+), 4 deletions(-)
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
index 6f61982..7f68298 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
@@ -4,9 +4,7 @@
import java.time.Duration;
import java.time.ZonedDateTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
@@ -124,6 +122,14 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
solaceOpenTelemetryInstrumenter = SolaceOpenTelemetryInstrumenter.createForIncoming();
incomingMulti = incomingMulti.map(message -> {
InboundMessage consumedMessage = message.getMetadata(SolaceInboundMetadata.class).get().getMessage();
+ Map messageProperties = new HashMap<>();
+
+ messageProperties.put("messaging.solace.replication_group_message_id",
+ consumedMessage.getReplicationGroupMessageId().toString());
+ messageProperties.put("messaging.solace.priority", Integer.toString(consumedMessage.getPriority()));
+ if (consumedMessage.getProperties().size() > 0) {
+ messageProperties.putAll(consumedMessage.getProperties());
+ }
SolaceTrace solaceTrace = new SolaceTrace.Builder()
.withDestinationKind("queue")
.withTopic(consumedMessage.getDestinationName())
@@ -137,7 +143,7 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
SolaceConstants.MessageUserPropertyConstants.QUEUE_PARTITION_KEY)
: null)
.withPayloadSize(Long.valueOf(consumedMessage.getPayloadAsBytes().length))
- .withProperties(consumedMessage.getProperties())
+ .withProperties(messageProperties)
.build();
return solaceOpenTelemetryInstrumenter.traceIncoming(message, solaceTrace, true);
});
diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java
index 8beae7e..404fd65 100644
--- a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java
+++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/SolaceContainer.java
@@ -96,6 +96,16 @@ private Transferable createConfigurationScript() {
updateConfigScript(scriptBuilder, "enable");
updateConfigScript(scriptBuilder, "configure");
+ // telemetry configuration
+ updateConfigScript(scriptBuilder, "message-vpn default");
+ updateConfigScript(scriptBuilder, "create telemetry-profile trace");
+ updateConfigScript(scriptBuilder, "trace");
+ updateConfigScript(scriptBuilder, "no shutdown");
+ updateConfigScript(scriptBuilder, "create filter default");
+ updateConfigScript(scriptBuilder, "no shutdown");
+ updateConfigScript(scriptBuilder, "create subscription \">\"");
+ updateConfigScript(scriptBuilder, "end");
+
// create replay log
updateConfigScript(scriptBuilder, "message-spool message-vpn default");
updateConfigScript(scriptBuilder, "create replay-log integration-test-replay-log");
From f511069e4db97bb306fbc250a6776c9c14c5859e Mon Sep 17 00:00:00 2001
From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com>
Date: Tue, 12 Mar 2024 14:05:52 +0530
Subject: [PATCH 05/15] OAuth implementation
---
quarkus-solace-client/deployment/pom.xml | 4 +
.../quarkus/deployment/SolaceProcessor.java | 15 +-
quarkus-solace-client/runtime/pom.xml | 5 +-
.../solace/quarkus/runtime/OidcProvider.java | 59 +
.../quarkus/runtime/SolaceRecorder.java | 15 +-
.../runtime/pom.xml | 7 +
.../incoming/SolaceIncomingChannel.java | 2 +-
.../quarkus/messaging/SolaceOAuthTest.java | 172 ++
.../messaging/base/KeyCloakContainer.java | 102 +
.../messaging/base/SolaceBrokerExtension.java | 6 +-
.../messaging/base/SolaceContainer.java | 83 +-
.../runtime/src/test/resources/keycloak.crt | 23 +
.../runtime/src/test/resources/keycloak.key | 28 +
.../keycloak/realms/solace-realm.json | 2266 +++++++++++++++++
.../runtime/src/test/resources/solace.pem | 51 +
15 files changed, 2812 insertions(+), 26 deletions(-)
create mode 100644 quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/base/KeyCloakContainer.java
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/resources/keycloak.crt
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/resources/keycloak.key
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/resources/keycloak/realms/solace-realm.json
create mode 100644 quarkus-solace-messaging-connector/runtime/src/test/resources/solace.pem
diff --git a/quarkus-solace-client/deployment/pom.xml b/quarkus-solace-client/deployment/pom.xml
index ba6846b..0d6971d 100644
--- a/quarkus-solace-client/deployment/pom.xml
+++ b/quarkus-solace-client/deployment/pom.xml
@@ -27,6 +27,10 @@
io.quarkus
quarkus-devservices-deployment
+
+ io.quarkus
+ quarkus-oidc-client-deployment
+
io.quarkus
quarkus-junit5-internal
diff --git a/quarkus-solace-client/deployment/src/main/java/com/solace/quarkus/deployment/SolaceProcessor.java b/quarkus-solace-client/deployment/src/main/java/com/solace/quarkus/deployment/SolaceProcessor.java
index 6305d4e..d95bcc9 100644
--- a/quarkus-solace-client/deployment/src/main/java/com/solace/quarkus/deployment/SolaceProcessor.java
+++ b/quarkus-solace-client/deployment/src/main/java/com/solace/quarkus/deployment/SolaceProcessor.java
@@ -10,15 +10,14 @@
import com.solace.messaging.MessagingService;
import com.solace.quarkus.MessagingServiceClientCustomizer;
+import com.solace.quarkus.runtime.OidcProvider;
import com.solace.quarkus.runtime.SolaceConfig;
import com.solace.quarkus.runtime.SolaceRecorder;
import com.solace.quarkus.runtime.observability.SolaceMetricBinder;
import com.solacesystems.jcsmp.JCSMPFactory;
import io.quarkus.arc.SyntheticCreationalContext;
-import io.quarkus.arc.deployment.SyntheticBeanBuildItem;
-import io.quarkus.arc.deployment.SyntheticBeansRuntimeInitBuildItem;
-import io.quarkus.arc.deployment.UnremovableBeanBuildItem;
+import io.quarkus.arc.deployment.*;
import io.quarkus.deployment.annotations.*;
import io.quarkus.deployment.annotations.Record;
import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
@@ -31,13 +30,14 @@
import io.quarkus.smallrye.health.deployment.spi.HealthBuildItem;
class SolaceProcessor {
-
private static final String FEATURE = "solace-client";
private static final ParameterizedType SOLACE_CUSTOMIZER_INJECTION_TYPE = ParameterizedType.create(
DotName.createSimple(Instance.class),
new Type[] { ClassType.create(DotName.createSimple(MessagingServiceClientCustomizer.class.getName())) }, null);
+ private static final Type OIDC_PROVIDER = ClassType.create(DotName.createSimple(OidcProvider.class));
+
private static final AnnotationInstance[] EMPTY_ANNOTATIONS = new AnnotationInstance[0];
@BuildStep
@@ -59,21 +59,24 @@ ExtensionSslNativeSupportBuildItem ssl() {
@Record(ExecutionTime.RUNTIME_INIT)
ServiceStartBuildItem init(
SolaceConfig config, SolaceRecorder recorder,
- ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans) {
+ ShutdownContextBuildItem shutdown, BuildProducer syntheticBeans,
+ BuildProducer additionalBeanBuildItemBuildProducer) {
Function, MessagingService> function = recorder.init(config, shutdown);
+ additionalBeanBuildItemBuildProducer.produce(AdditionalBeanBuildItem.unremovableOf(OidcProvider.class));
+
SyntheticBeanBuildItem.ExtendedBeanConfigurator solaceConfigurator = SyntheticBeanBuildItem
.configure(MessagingService.class)
.defaultBean()
.scope(ApplicationScoped.class)
.addInjectionPoint(SOLACE_CUSTOMIZER_INJECTION_TYPE, EMPTY_ANNOTATIONS)
+ .addInjectionPoint(OIDC_PROVIDER)
.createWith(function)
.unremovable()
.setRuntimeInit();
syntheticBeans.produce(solaceConfigurator.done());
-
return new ServiceStartBuildItem(FEATURE);
}
diff --git a/quarkus-solace-client/runtime/pom.xml b/quarkus-solace-client/runtime/pom.xml
index 3645f90..cedad1b 100644
--- a/quarkus-solace-client/runtime/pom.xml
+++ b/quarkus-solace-client/runtime/pom.xml
@@ -18,7 +18,10 @@
com.solace
solace-messaging-client
-
+
+ io.quarkus
+ quarkus-oidc-client
+
io.quarkus
quarkus-micrometer
diff --git a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java
new file mode 100644
index 0000000..a3a08bf
--- /dev/null
+++ b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/OidcProvider.java
@@ -0,0 +1,59 @@
+package com.solace.quarkus.runtime;
+
+import java.time.Duration;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+
+import org.eclipse.microprofile.config.inject.ConfigProperty;
+
+import com.solace.messaging.MessagingService;
+import com.solace.messaging.config.SolaceProperties;
+
+import io.quarkus.oidc.client.OidcClient;
+import io.quarkus.oidc.client.Tokens;
+import io.quarkus.runtime.StartupEvent;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.mutiny.infrastructure.Infrastructure;
+
+@ApplicationScoped
+public class OidcProvider {
+
+ @ConfigProperty(name = "quarkus.solace.oidc.refresh.interval", defaultValue = "60s")
+ Duration duration;
+
+ @Inject
+ OidcClient client;
+
+ private volatile Tokens lastToken;
+ private MessagingService service;
+
+ Tokens getToken() {
+ Tokens firstToken = client.getTokens().await().indefinitely();
+ lastToken = firstToken;
+ return firstToken;
+ }
+
+ void init(MessagingService service) {
+ this.service = service;
+ }
+
+ void startup(@Observes StartupEvent event) {
+ Multi.createFrom().ticks().every(duration)
+ .emitOn(Infrastructure.getDefaultWorkerPool())
+ // .filter(aLong -> {
+ // if (lastToken.isAccessTokenWithinRefreshInterval()) {
+ // return true;
+ // } else
+ // return false;
+ // })
+ .call(() -> client.getTokens().invoke(tokens -> {
+ lastToken = tokens;
+ }))
+ .invoke(() -> service.updateProperty(SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN,
+ lastToken.getAccessToken()))
+ .subscribe().with(aLong -> {
+ });
+ }
+}
diff --git a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java
index 01bcda6..fb6c927 100644
--- a/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java
+++ b/quarkus-solace-client/runtime/src/main/java/com/solace/quarkus/runtime/SolaceRecorder.java
@@ -38,11 +38,18 @@ public MessagingService apply(SyntheticCreationalContext conte
}
}
- MessagingServiceClientBuilder builder = MessagingService.builder(ConfigurationProfile.V1)
- .fromProperties(properties);
-
Instance reference = context.getInjectedReference(CUSTOMIZER);
+ OidcProvider oidcProvider = context.getInjectedReference(OidcProvider.class);
+ String authScheme = config.extra().get("authentication.scheme");
+
+ if (oidcProvider != null && authScheme != null && authScheme.equals("AUTHENTICATION_SCHEME_OAUTH2")) {
+ properties.put(SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN,
+ oidcProvider.getToken().getAccessToken());
+ }
+
+ MessagingServiceClientBuilder builder = MessagingService.builder(ConfigurationProfile.V1)
+ .fromProperties(properties);
MessagingService service;
if (reference.isUnsatisfied()) {
service = builder.build();
@@ -54,12 +61,14 @@ public MessagingService apply(SyntheticCreationalContext conte
}
}
+ oidcProvider.init(service);
var tmp = service;
shutdown.addLastShutdownTask(() -> {
if (tmp.isConnected()) {
tmp.disconnect();
}
});
+
return service.connect();
}
};
diff --git a/quarkus-solace-messaging-connector/runtime/pom.xml b/quarkus-solace-messaging-connector/runtime/pom.xml
index 2bfb5fd..00ff391 100644
--- a/quarkus-solace-messaging-connector/runtime/pom.xml
+++ b/quarkus-solace-messaging-connector/runtime/pom.xml
@@ -137,6 +137,13 @@
slf4j-log4j12
test
+
+
+ org.keycloak
+ keycloak-admin-client
+ test
+
+
diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
index 7f68298..7f0c685 100644
--- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
+++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceIncomingChannel.java
@@ -45,7 +45,7 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final AtomicBoolean alive = new AtomicBoolean(false);
+ private final AtomicBoolean alive = new AtomicBoolean(true);
private final PersistentMessageReceiver receiver;
private final Flow.Publisher extends Message>> stream;
private final ExecutorService pollerThread;
diff --git a/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java
new file mode 100644
index 0000000..63db077
--- /dev/null
+++ b/quarkus-solace-messaging-connector/runtime/src/test/java/com/solace/quarkus/messaging/SolaceOAuthTest.java
@@ -0,0 +1,172 @@
+package com.solace.quarkus.messaging;
+
+import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
+
+import java.io.*;
+import java.security.KeyStore;
+import java.security.cert.CertificateFactory;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.*;
+
+import jakarta.ws.rs.client.Client;
+import jakarta.ws.rs.client.ClientBuilder;
+
+import org.apache.http.conn.ssl.DefaultHostnameVerifier;
+import org.awaitility.Awaitility;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.keycloak.admin.client.Keycloak;
+import org.keycloak.admin.client.KeycloakBuilder;
+import org.testcontainers.utility.MountableFile;
+
+import com.solace.messaging.MessagingService;
+import com.solace.messaging.config.SolaceProperties;
+import com.solace.messaging.config.profile.ConfigurationProfile;
+import com.solace.messaging.publisher.PersistentMessagePublisher;
+import com.solace.messaging.resources.Topic;
+import com.solace.quarkus.messaging.base.KeyCloakContainer;
+import com.solace.quarkus.messaging.base.SolaceContainer;
+import com.solace.quarkus.messaging.incoming.SolaceIncomingChannel;
+
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;
+import io.vertx.mutiny.core.Vertx;
+
+public class SolaceOAuthTest {
+
+ private static final String SOLACE_IMAGE = "solace/solace-pubsub-standard:latest";
+
+ private static SolaceContainer createSolaceContainer() {
+ return new SolaceContainer(SOLACE_IMAGE);
+ }
+
+ private static KeyCloakContainer keyCloakContainer;
+ private static SolaceContainer solaceContainer;
+
+ @BeforeAll
+ static void startContainers() {
+ keyCloakContainer = new KeyCloakContainer();
+ keyCloakContainer.start();
+ keyCloakContainer.createHostsFile();
+ await().until(() -> keyCloakContainer.isRunning());
+
+ solaceContainer = createSolaceContainer();
+ solaceContainer.withCredentials("user", "pass")
+ .withClientCert(MountableFile.forClasspathResource("solace.pem"),
+ MountableFile.forClasspathResource("keycloak.crt"), false)
+ .withOAuth()
+ .withExposedPorts(SolaceContainer.Service.SMF.getPort(), SolaceContainer.Service.SMF_SSL.getPort(), 1943, 8080)
+ .withPublishTopic("quarkus/integration/test/replay/messages", SolaceContainer.Service.SMF)
+ .withPublishTopic("quarkus/integration/test/default/>", SolaceContainer.Service.SMF)
+ .withPublishTopic("quarkus/integration/test/provisioned/>", SolaceContainer.Service.SMF)
+ .withPublishTopic("quarkus/integration/test/dynamic/>", SolaceContainer.Service.SMF);
+
+ solaceContainer.start();
+ await().until(() -> solaceContainer.isRunning());
+ }
+
+ private static KeyStore createKeyStore(byte[] ca, byte[] serviceCa) {
+ try {
+ KeyStore keyStore = KeyStore.getInstance("JKS");
+ keyStore.load(null);
+ CertificateFactory cf = CertificateFactory.getInstance("X.509");
+ if (ca != null) {
+ keyStore.setCertificateEntry("keycloak",
+ cf.generateCertificate(new ByteArrayInputStream(ca)));
+ }
+ if (serviceCa != null) {
+ keyStore.setCertificateEntry("service-ca",
+ cf.generateCertificate(new ByteArrayInputStream(serviceCa)));
+ }
+ return keyStore;
+ } catch (Exception ignored) {
+ return null;
+ }
+ }
+
+ private String getAccessToken() throws IOException {
+ ClassLoader classLoader = SolaceOAuthTest.class.getClassLoader();
+ InputStream is = new FileInputStream(classLoader.getResource("keycloak.crt").getFile());
+ KeyStore trustStore = createKeyStore(is.readAllBytes(), null);
+ Client resteasyClient = ClientBuilder.newBuilder()
+ .connectTimeout(30, TimeUnit.SECONDS)
+ .trustStore(trustStore)
+ .hostnameVerifier(new DefaultHostnameVerifier())
+ .build();
+
+ Keycloak keycloak = KeycloakBuilder.builder()
+ .serverUrl(keyCloakContainer.getOrigin(KeyCloakContainer.Service.HTTPS))
+ .realm("solace")
+ .clientId("solace")
+ .clientSecret("solace-secret")
+ .grantType("client_credentials")
+ .resteasyClient(resteasyClient)
+ .build();
+ return keycloak.tokenManager().getAccessTokenString();
+ }
+
+ private MessagingService getMessagingService() throws IOException {
+ Properties properties = new Properties();
+ properties.put(SolaceProperties.TransportLayerProperties.HOST,
+ solaceContainer.getOrigin(SolaceContainer.Service.SMF_SSL));
+ properties.put(SolaceProperties.ServiceProperties.VPN_NAME, solaceContainer.getVpn());
+ properties.put(SolaceProperties.AuthenticationProperties.SCHEME, "AUTHENTICATION_SCHEME_OAUTH2");
+ properties.put(SolaceProperties.TransportLayerSecurityProperties.CERT_VALIDATED, "false");
+ properties.put(SolaceProperties.TransportLayerSecurityProperties.CERT_VALIDATE_SERVERNAME, "false");
+ properties.put(SolaceProperties.AuthenticationProperties.SCHEME_OAUTH2_ACCESS_TOKEN, getAccessToken());
+
+ MessagingService messagingService = MessagingService.builder(ConfigurationProfile.V1)
+ .fromProperties(properties)
+ .build();
+ messagingService.connect();
+
+ return messagingService;
+ }
+
+ @Test
+ void oauthTest() throws IOException {
+ MapBasedConfig config = new MapBasedConfig()
+ .with("channel-name", "in")
+ .with("consumer.queue.name", "queue-" + UUID.randomUUID().getMostSignificantBits())
+ .with("consumer.queue.add-additional-subscriptions", true)
+ .with("consumer.queue.missing-resource-creation-strategy", "create-on-start")
+ .with("consumer.queue.subscriptions", SolaceContainer.INTEGRATION_TEST_QUEUE_SUBSCRIPTION);
+
+ MessagingService messagingService = getMessagingService();
+ SolaceIncomingChannel solaceIncomingChannel = new SolaceIncomingChannel(Vertx.vertx(),
+ new SolaceConnectorIncomingConfiguration(config), messagingService);
+
+ CopyOnWriteArrayList