diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java index 151dfec..8af05ba 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java @@ -3,6 +3,7 @@ import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.eclipse.microprofile.reactive.messaging.Metadata; @@ -22,16 +23,18 @@ public class SolaceInboundMessage implements ContextAwareMessage, Metadata private final SolaceFailureHandler nackHandler; private final T payload; private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker; + private final Consumer reportFailure; private Metadata metadata; public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler, - IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { + IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker, Consumer reportFailure) { this.msg = message; this.unacknowledgedMessageTracker = unacknowledgedMessageTracker; this.payload = (T) convertPayload(); this.ackHandler = ackHandler; this.nackHandler = nackHandler; this.metadata = captureContextMetadata(new SolaceInboundMetadata(message)); + this.reportFailure = reportFailure; } public InboundMessage getMessage() { @@ -88,6 +91,7 @@ public CompletionStage ack() { @Override public CompletionStage nack(Throwable reason, Metadata nackMetadata) { this.unacknowledgedMessageTracker.decrement(); + this.reportFailure.accept(reason); return nackHandler.handle(this, reason, nackMetadata); // if (solaceErrorTopicPublisherHandler == null) { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 21a8d19..7a5c88e 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -4,12 +4,15 @@ import java.time.Duration; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Message; @@ -42,16 +45,20 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final SolaceAckHandler ackHandler; private final SolaceFailureHandler failureHandler; private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean alive = new AtomicBoolean(false); private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; + private final List failures = new ArrayList<>(); + private volatile MessagingService solace; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { + this.solace = solace; this.channel = ic.getChannel(); this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); this.gracefulShutdown = ic.getClientGracefulShutdown(); @@ -108,15 +115,26 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .until(__ -> closed.get()) .emitOn(context::runOnContext) .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, - unacknowledgedMessageTracker)) - .plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync())) + unacknowledgedMessageTracker, this::reportFailure)) + .plug(m -> lazyStart + ? m.onSubscription() + .call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) - .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3); + .onItem().invoke(() -> alive.set(true)) + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> { + failures.add(t); + alive.set(false); + }); if (!lazyStart) { receiver.start(); } } + private void reportFailure(Throwable throwable) { + failures.add(throwable); + alive.set(false); + } + private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { String strategy = ic.getConsumerQueueFailureStrategy(); SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy); @@ -204,15 +222,26 @@ public void close() { } public void isStarted(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected()); } public void isReady(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning()); } public void isAlive(HealthReport.HealthReportBuilder builder) { + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + failures.removeAll(reportedFailures); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 4afb63c..a0367b2 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -1,8 +1,12 @@ package io.quarkiverse.solace.outgoing; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Message; @@ -37,12 +41,16 @@ public class SolaceOutgoingChannel private final SenderProcessor processor; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; + private final AtomicBoolean alive = new AtomicBoolean(false); + private final List failures = new ArrayList<>(); private volatile boolean isPublisherReady = true; + private volatile MessagingService solace; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) { + this.solace = solace; this.channel = oc.getChannel(); PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder(); switch (oc.getProducerBackPressureStrategy()) { @@ -67,7 +75,10 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o boolean lazyStart = oc.getClientLazyStart(); this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel)); this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(), - m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt())); + m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> { + failures.add(t); + alive.set(false); + })); this.subscriber = MultiUtils.via(processor, multi -> multi.plug( m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m)); if (!lazyStart) { @@ -87,12 +98,17 @@ private Uni sendMessage(MessagingService solace, Message m, boolean wai // TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt) .onItem().transformToUni(receipt -> { + alive.set(true); if (receipt != null) { OutgoingMessageMetadata.setResultOnMessage(m, receipt); } return Uni.createFrom().completionStage(m.getAck()); }) - .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t))); + .onFailure().recoverWithUni(t -> { + failures.add(t); + alive.set(false); + return Uni.createFrom().completionStage(m.nack(t)); + }); } @@ -215,15 +231,25 @@ public void onPublishReceipt(PublishReceipt publishReceipt) { } public void isStarted(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected()); } public void isReady(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady()); } public void isAlive(HealthReport.HealthReportBuilder builder) { - + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + failures.removeAll(reportedFailures); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } } @Override diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java new file mode 100644 index 0000000..a702b08 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java @@ -0,0 +1,141 @@ +package io.quarkiverse.solace.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.PersistentMessagePublisher; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.resources.Topic; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceConsumerHealthTest extends WeldTestBase { + + @Test + void solaceConsumerHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + } + + @Test + void solaceConsumerLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + + // Run app that consumes messages + MyErrorConsumer app = runApplication(config, MyErrorConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + publisher.publish("3", tp); + await().until(() -> { + HealthReport healthReport = getHealth().getLiveness(); + return (healthReport.isOk() == false && !healthReport.getChannels().get(0).getMessage().isEmpty()); + }); + + publisher.publish("4", tp); + publisher.publish("5", tp); + await().until(() -> getHealth().getLiveness().isOk() == true); + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + void in(InboundMessage msg) { + received.add(msg.getPayloadAsString()); + } + + public List getReceived() { + return received; + } + } + + @ApplicationScoped + static class MyErrorConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + CompletionStage in(SolaceInboundMessage msg) { + String payload = new String(msg.getPayload(), StandardCharsets.UTF_8); + if (payload.equals("3")) { + return msg.nack(new IllegalArgumentException("Nacking message with payload 3")); + } + + return msg.ack(); + } + } +} diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java new file mode 100644 index 0000000..ad85137 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java @@ -0,0 +1,109 @@ +package io.quarkiverse.solace.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.receiver.PersistentMessageReceiver; +import com.solace.messaging.resources.Queue; +import com.solace.messaging.resources.TopicSubscription; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceProducerHealthCheck extends WeldTestBase { + @Test + void publisherHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().until(() -> isStarted() && isReady() && isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + } + + @Test + void publisherLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", "publish/deny"); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of("publish/deny")) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().until(() -> isStarted() && isReady()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isFalse(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + assertThat(liveness.getChannels().get(0).getMessage()).isNotEmpty(); + } + + @ApplicationScoped + static class MyApp { + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + Multi> out() { + + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(payload -> Message.of(payload).withAck(() -> { + acked.add(payload); + return CompletableFuture.completedFuture(null); + })); + } + + public List getAcked() { + return acked; + } + } +}