Skip to content

Commit

Permalink
Merge pull request #26 from SolaceCoEExt/fix-issues
Browse files Browse the repository at this point in the history
Added health status - started, ready, alive
  • Loading branch information
ozangunalp authored Jan 11, 2024
2 parents 2366810 + 0aa38bc commit c0518cf
Show file tree
Hide file tree
Showing 5 changed files with 320 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import static io.smallrye.reactive.messaging.providers.locals.ContextAwareMessage.captureContextMetadata;

import java.util.concurrent.CompletionStage;
import java.util.function.Consumer;

import org.eclipse.microprofile.reactive.messaging.Metadata;

Expand All @@ -22,16 +23,18 @@ public class SolaceInboundMessage<T> implements ContextAwareMessage<T>, Metadata
private final SolaceFailureHandler nackHandler;
private final T payload;
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker;
private final Consumer<Throwable> reportFailure;
private Metadata metadata;

public SolaceInboundMessage(InboundMessage message, SolaceAckHandler ackHandler, SolaceFailureHandler nackHandler,
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker) {
IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker, Consumer<Throwable> reportFailure) {
this.msg = message;
this.unacknowledgedMessageTracker = unacknowledgedMessageTracker;
this.payload = (T) convertPayload();
this.ackHandler = ackHandler;
this.nackHandler = nackHandler;
this.metadata = captureContextMetadata(new SolaceInboundMetadata(message));
this.reportFailure = reportFailure;
}

public InboundMessage getMessage() {
Expand Down Expand Up @@ -88,6 +91,7 @@ public CompletionStage<Void> ack() {
@Override
public CompletionStage<Void> nack(Throwable reason, Metadata nackMetadata) {
this.unacknowledgedMessageTracker.decrement();
this.reportFailure.accept(reason);
return nackHandler.handle(this, reason, nackMetadata);

// if (solaceErrorTopicPublisherHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,15 @@

import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down Expand Up @@ -42,16 +45,20 @@ public class SolaceIncomingChannel implements ReceiverActivationPassivationConfi
private final SolaceAckHandler ackHandler;
private final SolaceFailureHandler failureHandler;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final AtomicBoolean alive = new AtomicBoolean(false);
private final PersistentMessageReceiver receiver;
private final Flow.Publisher<? extends Message<?>> stream;
private final ExecutorService pollerThread;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final List<Throwable> failures = new ArrayList<>();
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final IncomingMessagesUnsignedCounterBarrier unacknowledgedMessageTracker = new IncomingMessagesUnsignedCounterBarrier();

public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
this.solace = solace;
this.channel = ic.getChannel();
this.context = Context.newInstance(((VertxInternal) vertx.getDelegate()).createEventLoopContext());
this.gracefulShutdown = ic.getClientGracefulShutdown();
Expand Down Expand Up @@ -108,15 +115,26 @@ public SolaceIncomingChannel(Vertx vertx, SolaceConnectorIncomingConfiguration i
.until(__ -> closed.get())
.emitOn(context::runOnContext)
.map(consumed -> new SolaceInboundMessage<>(consumed, ackHandler, failureHandler,
unacknowledgedMessageTracker))
.plug(m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
unacknowledgedMessageTracker, this::reportFailure))
.plug(m -> lazyStart
? m.onSubscription()
.call(() -> Uni.createFrom().completionStage(receiver.startAsync()))
: m)
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3);
.onItem().invoke(() -> alive.set(true))
.onFailure().retry().withBackOff(Duration.ofSeconds(1)).atMost(3).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
});
if (!lazyStart) {
receiver.start();
}
}

private void reportFailure(Throwable throwable) {
failures.add(throwable);
alive.set(false);
}

private SolaceFailureHandler createFailureHandler(SolaceConnectorIncomingConfiguration ic, MessagingService solace) {
String strategy = ic.getConsumerQueueFailureStrategy();
SolaceFailureHandler.Strategy actualStrategy = SolaceFailureHandler.Strategy.from(strategy);
Expand Down Expand Up @@ -204,15 +222,26 @@ public void close() {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && receiver != null && receiver.isRunning());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {
List<Throwable> reportedFailures;
if (!failures.isEmpty()) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}

builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package io.quarkiverse.solace.outgoing;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Flow;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import org.eclipse.microprofile.reactive.messaging.Message;

Expand Down Expand Up @@ -37,12 +41,16 @@ public class SolaceOutgoingChannel
private final SenderProcessor processor;
private final boolean gracefulShutdown;
private final long gracefulShutdownWaitTimeout;
private final AtomicBoolean alive = new AtomicBoolean(false);
private final List<Throwable> failures = new ArrayList<>();
private volatile boolean isPublisherReady = true;
private volatile MessagingService solace;

// Assuming we won't ever exceed the limit of an unsigned long...
private final OutgoingMessagesUnsignedCounterBarrier publishedMessagesTracker = new OutgoingMessagesUnsignedCounterBarrier();

public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration oc, MessagingService solace) {
this.solace = solace;
this.channel = oc.getChannel();
PersistentMessagePublisherBuilder builder = solace.createPersistentMessagePublisherBuilder();
switch (oc.getProducerBackPressureStrategy()) {
Expand All @@ -67,7 +75,10 @@ public SolaceOutgoingChannel(Vertx vertx, SolaceConnectorOutgoingConfiguration o
boolean lazyStart = oc.getClientLazyStart();
this.topic = Topic.of(oc.getProducerTopic().orElse(this.channel));
this.processor = new SenderProcessor(oc.getProducerMaxInflightMessages(), oc.getProducerWaitForPublishReceipt(),
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()));
m -> sendMessage(solace, m, oc.getProducerWaitForPublishReceipt()).onFailure().invoke((t) -> {
failures.add(t);
alive.set(false);
}));
this.subscriber = MultiUtils.via(processor, multi -> multi.plug(
m -> lazyStart ? m.onSubscription().call(() -> Uni.createFrom().completionStage(publisher.startAsync())) : m));
if (!lazyStart) {
Expand All @@ -87,12 +98,17 @@ private Uni<Void> sendMessage(MessagingService solace, Message<?> m, boolean wai
// TODO - Use isPublisherReady to check if publisher is in ready state before publishing. This is required when back-pressure is set to reject. We need to block this call till isPublisherReady is true
return publishMessage(publisher, m, solace.messageBuilder(), waitForPublishReceipt)
.onItem().transformToUni(receipt -> {
alive.set(true);
if (receipt != null) {
OutgoingMessageMetadata.setResultOnMessage(m, receipt);
}
return Uni.createFrom().completionStage(m.getAck());
})
.onFailure().recoverWithUni(t -> Uni.createFrom().completionStage(m.nack(t)));
.onFailure().recoverWithUni(t -> {
failures.add(t);
alive.set(false);
return Uni.createFrom().completionStage(m.nack(t));
});

}

Expand Down Expand Up @@ -215,15 +231,25 @@ public void onPublishReceipt(PublishReceipt publishReceipt) {
}

public void isStarted(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected());
}

public void isReady(HealthReport.HealthReportBuilder builder) {

builder.add(channel, solace.isConnected() && this.publisher != null && this.publisher.isReady());
}

public void isAlive(HealthReport.HealthReportBuilder builder) {

List<Throwable> reportedFailures;
if (!failures.isEmpty()) {
synchronized (this) {
reportedFailures = new ArrayList<>(failures);
}
builder.add(channel, solace.isConnected() && alive.get(),
reportedFailures.stream().map(Throwable::getMessage).collect(Collectors.joining()));
failures.removeAll(reportedFailures);
} else {
builder.add(channel, solace.isConnected() && alive.get());
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package io.quarkiverse.solace.health;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CopyOnWriteArrayList;

import jakarta.enterprise.context.ApplicationScoped;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.junit.jupiter.api.Test;

import com.solace.messaging.publisher.PersistentMessagePublisher;
import com.solace.messaging.receiver.InboundMessage;
import com.solace.messaging.resources.Topic;

import io.quarkiverse.solace.base.WeldTestBase;
import io.quarkiverse.solace.incoming.SolaceInboundMessage;
import io.smallrye.reactive.messaging.health.HealthReport;
import io.smallrye.reactive.messaging.test.common.config.MapBasedConfig;

public class SolaceConsumerHealthTest extends WeldTestBase {

@Test
void solaceConsumerHealthCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyConsumer app = runApplication(config, MyConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);
publisher.publish("3", tp);
publisher.publish("4", tp);
publisher.publish("5", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

}

@Test
void solaceConsumerLivenessCheck() {
MapBasedConfig config = new MapBasedConfig()
.with("mp.messaging.incoming.in.connector", "quarkus-solace")
.with("mp.messaging.incoming.in.consumer.queue.name", queue)
.with("mp.messaging.incoming.in.consumer.queue.add-additional-subscriptions", "true")
.with("mp.messaging.incoming.in.consumer.queue.missing-resource-creation-strategy", "create-on-start")
.with("mp.messaging.incoming.in.consumer.queue.subscriptions", topic);

// Run app that consumes messages
MyErrorConsumer app = runApplication(config, MyErrorConsumer.class);

await().until(() -> isStarted() && isReady());

// Produce messages
PersistentMessagePublisher publisher = messagingService.createPersistentMessagePublisherBuilder()
.build()
.start();
Topic tp = Topic.of(topic);
publisher.publish("1", tp);
publisher.publish("2", tp);

await().until(() -> isAlive());

HealthReport startup = getHealth().getStartup();
HealthReport liveness = getHealth().getLiveness();
HealthReport readiness = getHealth().getReadiness();

assertThat(startup.isOk()).isTrue();
assertThat(liveness.isOk()).isTrue();
assertThat(readiness.isOk()).isTrue();
assertThat(startup.getChannels()).hasSize(1);
assertThat(liveness.getChannels()).hasSize(1);
assertThat(readiness.getChannels()).hasSize(1);

publisher.publish("3", tp);
await().until(() -> {
HealthReport healthReport = getHealth().getLiveness();
return (healthReport.isOk() == false && !healthReport.getChannels().get(0).getMessage().isEmpty());
});

publisher.publish("4", tp);
publisher.publish("5", tp);
await().until(() -> getHealth().getLiveness().isOk() == true);
}

@ApplicationScoped
static class MyConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
void in(InboundMessage msg) {
received.add(msg.getPayloadAsString());
}

public List<String> getReceived() {
return received;
}
}

@ApplicationScoped
static class MyErrorConsumer {
private final List<String> received = new CopyOnWriteArrayList<>();

@Incoming("in")
CompletionStage<Void> in(SolaceInboundMessage<byte[]> msg) {
String payload = new String(msg.getPayload(), StandardCharsets.UTF_8);
if (payload.equals("3")) {
return msg.nack(new IllegalArgumentException("Nacking message with payload 3"));
}

return msg.ack();
}
}
}
Loading

0 comments on commit c0518cf

Please sign in to comment.