From 0e1c144b7eb060b76ee599866f21338e7783589a Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Thu, 11 Jan 2024 11:51:14 +0530 Subject: [PATCH 1/2] Added health status - started, ready, alive --- .../solace/incoming/SolaceInboundMessage.java | 6 +- .../incoming/SolaceIncomingChannel.java | 23 ++- .../solace/outgoing/SenderProcessor.java | 7 +- .../outgoing/SolaceOutgoingChannel.java | 23 ++- .../health/SolaceConsumerHealthTest.java | 138 ++++++++++++++++++ .../health/SolaceProducerHealthCheck.java | 108 ++++++++++++++ 6 files changed, 292 insertions(+), 13 deletions(-) create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java create mode 100644 pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java index 151dfec..8af05ba 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceInboundMessage.java @@ -3,6 +3,7 @@ import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata; import java.util.concurrent.CompletionStage; +import java.util.function.Consumer; import org.eclipse.microprofile.reactive.messaging.Metadata; @@ -22,16 +23,18 @@ public class SolaceInboundMessage implements ContextAwareMessage, Metadata private final SolaceFailureHandler nackHandler; private final T payload; private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker; + private final Consumer reportFailure; private Metadata metadata; public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler, - IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) { + IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker, Consumer reportFailure) { this.msg = message; this.unacknowledgedMessageTracker = unacknowledgedMessageTracker; this.payload = (T) convertPayload(); this.ackHandler = ackHandler; this.nackHandler = nackHandler; this.metadata = captureContextMetadata(new SolaceInboundMetadata(message)); + this.reportFailure = reportFailure; } public InboundMessage getMessage() { @@ -88,6 +91,7 @@ public CompletionStage ack() { @Override public CompletionStage nack(Throwable reason, Metadata nackMetadata) { this.unacknowledgedMessageTracker.decrement(); + this.reportFailure.accept(reason); return nackHandler.handle(this, reason, nackMetadata); // if (solaceErrorTopicPublisherHandler == null) { diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 21a8d19..44446c9 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -42,16 +42,19 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final SolaceAckHandler ackHandler; private final SolaceFailureHandler failureHandler; private final AtomicBoolean closed = new AtomicBoolean(false); + private final AtomicBoolean alive = new AtomicBoolean(false); private final PersistentMessageReceiver receiver; private final Flow.Publisher> stream; private final ExecutorService pollerThread; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; + private volatile MessagingService solace; // Assuming we won't ever exceed the limit of an unsigned long... private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier(); public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) { + this.solace = solace; this.channel = ic.getChannel(); this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext()); this.gracefulShutdown = ic.getClientGracefulShutdown(); @@ -108,15 +111,23 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .until(__ -> closed.get()) .emitOn(context::runOnContext) .map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler, - unacknowledgedMessageTracker)) - .plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync())) + unacknowledgedMessageTracker, this::reportFailure)) + .plug(m -> lazyStart + ? m.onSubscription() + .call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) - .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3); + .onItem().invoke(() -> alive.set(true)) + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(() -> alive.set(false)); if (!lazyStart) { receiver.start(); } } + private void reportFailure(Throwable throwable) { + // should we send cause of failure in isAlive method? + alive.set(false); + } + private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) { String strategy = ic.getConsumerQueueFailureStrategy(); SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy); @@ -204,15 +215,15 @@ public void close() { } public void isStarted(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected()); } public void isReady(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning()); } public void isAlive(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && alive.get()); } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java index 8aebbef..0a49ad8 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java @@ -6,6 +6,7 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.Function; import org.eclipse.microprofile.reactive.messaging.Message; @@ -20,11 +21,14 @@ class SenderProcessor implements Processor, Message>, Subscription private final Function, Uni> send; private final AtomicReference subscription = new AtomicReference<>(); private final AtomicReference>> downstream = new AtomicReference<>(); + private final Consumer reportFailure; - public SenderProcessor(long inflights, boolean waitForCompletion, Function, Uni> send) { + public SenderProcessor(long inflights, boolean waitForCompletion, Function, Uni> send, + Consumer reportFailure) { this.inflights = inflights; this.waitForCompletion = waitForCompletion; this.send = send; + this.reportFailure = reportFailure; } @Override @@ -101,6 +105,7 @@ public void onError(Throwable throwable) { Subscriber> subscriber = downstream.getAndSet(null); if (subscriber != null) { subscriber.onError(throwable); + reportFailure.accept(throwable); } } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 4afb63c..5bc7840 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -2,6 +2,7 @@ import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.eclipse.microprofile.reactive.messaging.Message; @@ -37,12 +38,15 @@ public class SolaceOutgoingChannel private final SenderProcessor processor; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; + private final AtomicBoolean alive = new AtomicBoolean(false); private volatile boolean isPublisherReady = true; + private volatile MessagingService solace; // Assuming we won't ever exceed the limit of an unsigned long... private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier(); public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) { + this.solace = solace; this.channel = oc.getChannel(); PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder(); switch (oc.getProducerBackPressureStrategy()) { @@ -67,7 +71,7 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o boolean lazyStart = oc.getClientLazyStart(); this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel)); this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(), - m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt())); + m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()), this::reportFailure); this.subscriber = MultiUtils.via(processor, multi -> multi.plug( m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m)); if (!lazyStart) { @@ -82,17 +86,26 @@ public void ready() { }); } + private void reportFailure(Throwable throwable) { + // should we send cause of failure in isAlive method? + alive.set(false); + } + private Uni sendMessage(MessagingService solace, Message m, boolean waitForPublishReceipt) { // TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt) .onItem().transformToUni(receipt -> { + alive.set(true); if (receipt != null) { OutgoingMessageMetadata.setResultOnMessage(m, receipt); } return Uni.createFrom().completionStage(m.getAck()); }) - .onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t))); + .onFailure().recoverWithUni(t -> { + alive.set(false); + return Uni.createFrom().completionStage(m.nack(t)); + }); } @@ -215,15 +228,15 @@ public void onPublishReceipt(PublishReceipt publishReceipt) { } public void isStarted(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected()); } public void isReady(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady()); } public void isAlive(HealthReport.HealthReportBuilder builder) { - + builder.add(channel, solace.isConnected() && alive.get()); } @Override diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java new file mode 100644 index 0000000..c407e88 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java @@ -0,0 +1,138 @@ +package io.quarkiverse.solace.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.publisher.PersistentMessagePublisher; +import com.solace.messaging.receiver.InboundMessage; +import com.solace.messaging.resources.Topic; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.quarkiverse.solace.incoming.SolaceInboundMessage; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceConsumerHealthTest extends WeldTestBase { + + @Test + void solaceConsumerHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + + // Run app that consumes messages + MyConsumer app = runApplication(config, MyConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + publisher.publish("3", tp); + publisher.publish("4", tp); + publisher.publish("5", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + } + + @Test + void solaceConsumerLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.incoming.in.connector", "quarkus-solace") + .with("mp.messaging.incoming.in.consumer.queue.name", queue) + .with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true") + .with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start") + .with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic); + + // Run app that consumes messages + MyErrorConsumer app = runApplication(config, MyErrorConsumer.class); + + await().until(() -> isStarted() && isReady()); + + // Produce messages + PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder() + .build() + .start(); + Topic tp = Topic.of(topic); + publisher.publish("1", tp); + publisher.publish("2", tp); + + await().until(() -> isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + + publisher.publish("3", tp); + await().until(() -> getHealth().getLiveness().isOk() == false); + + publisher.publish("4", tp); + publisher.publish("5", tp); + await().until(() -> getHealth().getLiveness().isOk() == true); + } + + @ApplicationScoped + static class MyConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + void in(InboundMessage msg) { + received.add(msg.getPayloadAsString()); + } + + public List getReceived() { + return received; + } + } + + @ApplicationScoped + static class MyErrorConsumer { + private final List received = new CopyOnWriteArrayList<>(); + + @Incoming("in") + CompletionStage in(SolaceInboundMessage msg) { + String payload = new String(msg.getPayload(), StandardCharsets.UTF_8); + if (payload.equals("3")) { + return msg.nack(new IllegalArgumentException("Nacking message with payload 3")); + } + + return msg.ack(); + } + } +} diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java new file mode 100644 index 0000000..49bba93 --- /dev/null +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java @@ -0,0 +1,108 @@ +package io.quarkiverse.solace.health; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CopyOnWriteArrayList; + +import jakarta.enterprise.context.ApplicationScoped; + +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.junit.jupiter.api.Test; + +import com.solace.messaging.receiver.PersistentMessageReceiver; +import com.solace.messaging.resources.Queue; +import com.solace.messaging.resources.TopicSubscription; + +import io.quarkiverse.solace.base.WeldTestBase; +import io.smallrye.mutiny.Multi; +import io.smallrye.reactive.messaging.health.HealthReport; +import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig; + +public class SolaceProducerHealthCheck extends WeldTestBase { + @Test + void publisherHealthCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", topic); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of(topic)) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().until(() -> isStarted() && isReady() && isAlive()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isTrue(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + } + + @Test + void publisherLivenessCheck() { + MapBasedConfig config = new MapBasedConfig() + .with("mp.messaging.outgoing.out.connector", "quarkus-solace") + .with("mp.messaging.outgoing.out.producer.topic", "publish/deny"); + + List expected = new CopyOnWriteArrayList<>(); + + // Start listening first + PersistentMessageReceiver receiver = messagingService.createPersistentMessageReceiverBuilder() + .withSubscriptions(TopicSubscription.of("publish/deny")) + .build(Queue.nonDurableExclusiveQueue()); + receiver.receiveAsync(inboundMessage -> expected.add(inboundMessage.getPayloadAsString())); + receiver.start(); + + // Run app that publish messages + MyApp app = runApplication(config, MyApp.class); + + await().until(() -> isStarted() && isReady()); + + HealthReport startup = getHealth().getStartup(); + HealthReport liveness = getHealth().getLiveness(); + HealthReport readiness = getHealth().getReadiness(); + + assertThat(startup.isOk()).isTrue(); + assertThat(liveness.isOk()).isFalse(); + assertThat(readiness.isOk()).isTrue(); + assertThat(startup.getChannels()).hasSize(1); + assertThat(liveness.getChannels()).hasSize(1); + assertThat(readiness.getChannels()).hasSize(1); + } + + @ApplicationScoped + static class MyApp { + private final List acked = new CopyOnWriteArrayList<>(); + + @Outgoing("out") + Multi> out() { + + return Multi.createFrom().items("1", "2", "3", "4", "5") + .map(payload -> Message.of(payload).withAck(() -> { + acked.add(payload); + return CompletableFuture.completedFuture(null); + })); + } + + public List getAcked() { + return acked; + } + } +} From 0aa38bc52d0d698ff3b0865e9f0654f327db9c11 Mon Sep 17 00:00:00 2001 From: SravanThotakura05 <83568543+SravanThotakura05@users.noreply.github.com> Date: Thu, 11 Jan 2024 14:48:47 +0530 Subject: [PATCH 2/2] Addressed comments in PR #26 --- .../incoming/SolaceIncomingChannel.java | 24 ++++++++++++++--- .../solace/outgoing/SenderProcessor.java | 7 +---- .../outgoing/SolaceOutgoingChannel.java | 27 ++++++++++++++----- .../health/SolaceConsumerHealthTest.java | 5 +++- .../health/SolaceProducerHealthCheck.java | 1 + 5 files changed, 47 insertions(+), 17 deletions(-) diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java index 44446c9..7a5c88e 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/incoming/SolaceIncomingChannel.java @@ -4,12 +4,15 @@ import java.time.Duration; import java.time.ZonedDateTime; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Message; @@ -48,6 +51,7 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi private final ExecutorService pollerThread; private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; + private final List failures = new ArrayList<>(); private volatile MessagingService solace; // Assuming we won't ever exceed the limit of an unsigned long... @@ -117,14 +121,17 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i .call(() -> Uni.createFrom().completionStage(receiver.startAsync())) : m) .onItem().invoke(() -> alive.set(true)) - .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke(() -> alive.set(false)); + .onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> { + failures.add(t); + alive.set(false); + }); if (!lazyStart) { receiver.start(); } } private void reportFailure(Throwable throwable) { - // should we send cause of failure in isAlive method? + failures.add(throwable); alive.set(false); } @@ -223,7 +230,18 @@ public void isReady(HealthReport.HealthReportBuilder builder) { } public void isAlive(HealthReport.HealthReportBuilder builder) { - builder.add(channel, solace.isConnected() && alive.get()); + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + failures.removeAll(reportedFailures); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } } @Override diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java index 0a49ad8..8aebbef 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SenderProcessor.java @@ -6,7 +6,6 @@ import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Consumer; import java.util.function.Function; import org.eclipse.microprofile.reactive.messaging.Message; @@ -21,14 +20,11 @@ class SenderProcessor implements Processor, Message>, Subscription private final Function, Uni> send; private final AtomicReference subscription = new AtomicReference<>(); private final AtomicReference>> downstream = new AtomicReference<>(); - private final Consumer reportFailure; - public SenderProcessor(long inflights, boolean waitForCompletion, Function, Uni> send, - Consumer reportFailure) { + public SenderProcessor(long inflights, boolean waitForCompletion, Function, Uni> send) { this.inflights = inflights; this.waitForCompletion = waitForCompletion; this.send = send; - this.reportFailure = reportFailure; } @Override @@ -105,7 +101,6 @@ public void onError(Throwable throwable) { Subscriber> subscriber = downstream.getAndSet(null); if (subscriber != null) { subscriber.onError(throwable); - reportFailure.accept(throwable); } } diff --git a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java index 5bc7840..a0367b2 100644 --- a/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java +++ b/pubsub-plus-connector/src/main/java/io/quarkiverse/solace/outgoing/SolaceOutgoingChannel.java @@ -1,9 +1,12 @@ package io.quarkiverse.solace.outgoing; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.Flow; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import org.eclipse.microprofile.reactive.messaging.Message; @@ -39,6 +42,7 @@ public class SolaceOutgoingChannel private final boolean gracefulShutdown; private final long gracefulShutdownWaitTimeout; private final AtomicBoolean alive = new AtomicBoolean(false); + private final List failures = new ArrayList<>(); private volatile boolean isPublisherReady = true; private volatile MessagingService solace; @@ -71,7 +75,10 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o boolean lazyStart = oc.getClientLazyStart(); this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel)); this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(), - m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()), this::reportFailure); + m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> { + failures.add(t); + alive.set(false); + })); this.subscriber = MultiUtils.via(processor, multi -> multi.plug( m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m)); if (!lazyStart) { @@ -86,11 +93,6 @@ public void ready() { }); } - private void reportFailure(Throwable throwable) { - // should we send cause of failure in isAlive method? - alive.set(false); - } - private Uni sendMessage(MessagingService solace, Message m, boolean waitForPublishReceipt) { // TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true @@ -103,6 +105,7 @@ private Uni sendMessage(MessagingService solace, Message m, boolean wai return Uni.createFrom().completionStage(m.getAck()); }) .onFailure().recoverWithUni(t -> { + failures.add(t); alive.set(false); return Uni.createFrom().completionStage(m.nack(t)); }); @@ -236,7 +239,17 @@ public void isReady(HealthReport.HealthReportBuilder builder) { } public void isAlive(HealthReport.HealthReportBuilder builder) { - builder.add(channel, solace.isConnected() && alive.get()); + List reportedFailures; + if (!failures.isEmpty()) { + synchronized (this) { + reportedFailures = new ArrayList<>(failures); + } + builder.add(channel, solace.isConnected() && alive.get(), + reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining())); + failures.removeAll(reportedFailures); + } else { + builder.add(channel, solace.isConnected() && alive.get()); + } } @Override diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java index c407e88..a702b08 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceConsumerHealthTest.java @@ -100,7 +100,10 @@ void solaceConsumerLivenessCheck() { assertThat(readiness.getChannels()).hasSize(1); publisher.publish("3", tp); - await().until(() -> getHealth().getLiveness().isOk() == false); + await().until(() -> { + HealthReport healthReport = getHealth().getLiveness(); + return (healthReport.isOk() == false && !healthReport.getChannels().get(0).getMessage().isEmpty()); + }); publisher.publish("4", tp); publisher.publish("5", tp); diff --git a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java index 49bba93..ad85137 100644 --- a/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java +++ b/pubsub-plus-connector/src/test/java/io/quarkiverse/solace/health/SolaceProducerHealthCheck.java @@ -85,6 +85,7 @@ void publisherLivenessCheck() { assertThat(startup.getChannels()).hasSize(1); assertThat(liveness.getChannels()).hasSize(1); assertThat(readiness.getChannels()).hasSize(1); + assertThat(liveness.getChannels().get(0).getMessage()).isNotEmpty(); } @ApplicationScoped