Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Health support #130

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion spec/src/main/asciidoc/architecture.asciidoc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//
// Copyright (c) 2018, 2020 Contributors to the Eclipse Foundation
// Copyright (c) 2018, 2021 Contributors to the Eclipse Foundation
//
// See the NOTICE file(s) distributed with this work for additional
// information regarding copyright ownership.
Expand Down Expand Up @@ -1331,6 +1331,44 @@ The connector is responsible for the acknowledgment (positive or negative) of th
* An outgoing connector must acknowledge the incoming `org.eclipse.microprofile.reactive.messaging.Message` once it has successfully dispatched the message.
* An outgoing connector must acknowledge negatively the incoming `org.eclipse.microprofile.reactive.messaging.Message` if it cannot be dispatched.

== Health

When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

readiness might be misleading. Do you mean "ready to get traffic" or "started successfully"? Kubernetes has readiness and startup checks, and readiness is focusing on traffic, which most of the time is ignored by messaging protocol (as the probe are used for HTTP request routing).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ve been thinking about use-cases of producing/consuming RS channels with websocket or SSE

and liveness checks `messaging` are produced.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messaging might not be explicit enough, or conflict with user checks. What about MicroProfile Reactive Messaging?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, I just can't find any definitive convention for naming health checks, maybe mp-reactive-messaging would make it more machine readable?


[source, json]
----
{
"status": "UP",
"checks": [
{
"name": "messaging",
"status": "UP"
}
]
}
----

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case, we don't pass the checks, what extra data should we provide? None won't be very useful (even if these checks tend to be consumed by machines ignoring the extra data). Should we list the unsatisfied channels? Is it implementation specific?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was playing with that idea but couldn't come with any sufficient reason for requiring channel listing in the checks

=== Readiness
danielkec marked this conversation as resolved.
Show resolved Hide resolved
State of the readiness check `messaging` is DOWN until subscribers of all messaging channels are subscribed to their
respective publishers and `onSubscribe` signal is detected on all channels.

=== Liveness

State of the liveness check `messaging` is UP until `cancel` or `onError` signals are detected on any of the messaging
channels.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about cancellations. We have seen applications using cancellations as part of their regular lifecycle.
Failure definitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have also seen use-cases with failures as part of its regular lifecycle (onErrorResumeWith, ...), but as I see it channel is a simple processor which is not mutating the stream(except for un/wrapping), so any cancel or onError signal passing thru channel renders it DOWN for good

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Problem with channels where retries/re-subscriptions are expected should be solvable with exclusions(inclusions?)


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering about "all channels" while it seems to only require checking the final subscribers (subscriber methods, outgoing channels).

Also, about @Channel injection (unmanaged stream). It might be hard to track, as in this case, the subscription is delegated to the users. So, all the used channels may not receive their subscription, because the user has not subscribed yet (and it does not mean it's not ready).

Maybe it should focus on:

  • subscriber methods:
    • for readiness, must have received a subscription,
    • for liveness, a received failure would trigger the liveness check to be DOWN
  • outgoing connector
    • for readiness, must have received a subscription,
    • for liveness, a received failure would trigger the liveness check to be DOWN, an unrecoverable (like serialization) failure in the connector logic would trigger the liveness check to be DOWN.
  • incoming connector
    • for liveness, any unrecoverable failure would set the liveness check to DOWN
    • for readiness, it can be tricky, as we may not have a downstream subscription yet (because of unmanaged streams) (we are actually seen reports about that in Quarkus)

Intermediate channels can recover from failures, and implement retry logic, so it should not be reported as a DOWN.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be quite a different angle than the one I was trying to take.

There are not much parts of reactive pipelines we are actually managing in the MP Messaging impls and those are the channels. Channels are basically processors which are not mutating the stream (except for un/wrapping) so if cancel or 'onError' signal passes thru it, it can be considered as failed. Any recovery made by unmanaged streams is just re-routing the stream thru different chain of operators or retry/resubscribe.

I am not sure that the health of connectors, methods and unmanaged streams should be in the scope of MP Messaging implicit health check. I would count those as business code where health and readiness can be monitored with standard MP Health API.

Also implicit checks should be consistent, if we differentiate its behavior by its pub/sub kinds, we risk user confusion. Again custom check with exclusion/inclusion can help in such cases.

=== Excluding channels

For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with
`mp.messaging.health.live.exclude` or `mp.messaging.health.ready.exclude` config keys.

[source, properties]
----
mp.messaging.health.live.exclude=<channel-name>,<channel-name>
mp.messaging.health.ready.exclude=<channel-name>,<channel-name>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow the logic I described above, this would only make sense for subscriber methods. For connectors, it would be better to have a specific attribute and keep the connector configuration co-localized.

----

== Metrics

When MicroProfile Reactive Messaging is used in an environment where MicroProfile Metrics is enabled, the Reactive Messaging implementation automatically produces metrics.
Expand Down
14 changes: 14 additions & 0 deletions tck/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
<properties>
<arquillian.version>1.4.1.Final</arquillian.version>
<shrinkwrap-api.version>1.2.6</shrinkwrap-api.version>
<version.glassfish.json>1.1.6</version.glassfish.json>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -120,6 +121,19 @@
<artifactId>awaitility</artifactId>
</dependency>

<dependency>
<groupId>jakarta.json</groupId>
<artifactId>jakarta.json-api</artifactId>
<version>${version.glassfish.json}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.glassfish</groupId>
<artifactId>jakarta.json</artifactId>
<version>${version.glassfish.json}</version>
</dependency>

<dependency>
<groupId>org.jboss.shrinkwrap</groupId>
<artifactId>shrinkwrap-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
/**
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.microprofile.reactive.messaging.tck.health;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

import javax.enterprise.context.ApplicationScoped;

import org.junit.Assert;

/**
* Helper bean for keeping references of all the messaging channels
* used in the health test scenario.
*/
@ApplicationScoped
public class ChannelRegister {

private final Map<String, Channel<?>> channelMap = new HashMap<>();
private final ReentrantLock mapLock = new ReentrantLock();

/**
* Return representation of messaging channel by its name.
*
* @param channelName name of the messaging channel
* @param <P> payload type of the channel
* @return the channel with references to its publisher and subscriber
*/
@SuppressWarnings("unchecked")
<P> Channel<P> get(String channelName) {
try {
mapLock.lock();
channelMap.putIfAbsent(channelName, new Channel<P>());
return (Channel<P>) channelMap.get(channelName);
}
finally {
mapLock.unlock();
}
}

/**
* Return all the channels used in the health test scenario.
*
* @return all the channels
*/
Collection<Channel<?>> getAllChannels() {
try {
mapLock.lock();
return Collections.unmodifiableCollection(channelMap.values());
}
finally {
mapLock.unlock();
}
}

/**
* Iterate over all registered channels, on each's publisher invoke onSubscribe signal
* and block until any onSubscribe signal is received on its subscriber.
*/
void readyAll() {
getAllChannels().forEach(Channel::ready);
}

/**
* Iterate over all registered channels, on each's publisher invoke onError signal
* and block until any error signal is received on its subscriber.
*/
void failAll() {
getAllChannels().forEach(Channel::fail);
}

/**
* Iterate over all registered channels, on each's publisher invoke onComplete signal
* and block until any complete signal is received on its subscriber.
*/
void completeAll() {
getAllChannels().forEach(Channel::complete);
}

/**
* Iterate over all registered channels, on each's subscriber subscription invoke cancel signal
* and block until any cancel signal is received on its publisher.
*/
void cancelAll() {
getAllChannels().forEach(Channel::cancel);
}

/**
* Represents a messaging channel used in the health test scenario.
* @param <PAYLOAD> type of the channel's payload
*/
public static class Channel<PAYLOAD> {
private final TestPublisher<PAYLOAD> publisher;
private final TestSubscriber<PAYLOAD> subscriber;

public Channel() {
publisher = new TestPublisher<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
subscriber = new TestSubscriber<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
}

/**
* Access this channel's publisher.
*
* @return this channel's test publisher
*/
public TestPublisher<PAYLOAD> publisher() {
return publisher;
}

/**
* Access this channel's subscriber.
*
* @return this channel's test subscriber
*/
public TestSubscriber<PAYLOAD> subscriber() {
return subscriber;
}

/**
* Trigger onSubscribe signal and block until received by subscriber.
*
* @return this channel
*/
public Channel<PAYLOAD> ready() {
this.publisher().ready();
this.subscriber().awaitSubscription();
return this;
}

/**
* Trigger onComplete signal and block until received by subscriber.
*
* @return this channel
*/
public Channel<PAYLOAD> complete() {
this.publisher().complete();
this.subscriber().awaitCompletion();
return this;
}

/**
* Trigger onError signal and block until received by subscriber.
*
* @return this channel
*/
public Channel<PAYLOAD> fail() {
Exception exception = new Exception("BOOM!!!");
this.publisher().fail(exception);
Assert.assertEquals(exception.getMessage(), this.subscriber().awaitError().getMessage());
return this;
}

/**
* Trigger cancel signal and block until received by publisher.
*
* @return this channel
*/
public Channel<PAYLOAD> cancel() {
this.subscriber().awaitSubscription().cancel();
this.publisher().awaitCancel();
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.eclipse.microprofile.reactive.messaging.tck.health;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;

import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class HealthAllExcludedTestBean {

public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel";

@Inject
private ChannelRegister channelRegisterBean;


@Incoming(ALL_EXCLUDED_CHANNEL)
public Subscriber<String> consumeInnerChannel() {
return channelRegisterBean.<String>get(ALL_EXCLUDED_CHANNEL).subscriber();
}

@Outgoing(ALL_EXCLUDED_CHANNEL)
public Publisher<String> produceInnerChannel() {
return channelRegisterBean.<String>get(ALL_EXCLUDED_CHANNEL).publisher();
}

}
Loading