diff --git a/spec/src/main/asciidoc/architecture.asciidoc b/spec/src/main/asciidoc/architecture.asciidoc index c6b9c479..9c2a6d90 100644 --- a/spec/src/main/asciidoc/architecture.asciidoc +++ b/spec/src/main/asciidoc/architecture.asciidoc @@ -1,5 +1,5 @@ // -// Copyright (c) 2018, 2020 Contributors to the Eclipse Foundation +// Copyright (c) 2018, 2021 Contributors to the Eclipse Foundation // // See the NOTICE file(s) distributed with this work for additional // information regarding copyright ownership. @@ -1331,6 +1331,44 @@ The connector is responsible for the acknowledgment (positive or negative) of th * An outgoing connector must acknowledge the incoming `org.eclipse.microprofile.reactive.messaging.Message` once it has successfully dispatched the message. * An outgoing connector must acknowledge negatively the incoming `org.eclipse.microprofile.reactive.messaging.Message` if it cannot be dispatched. +== Health + +When MicroProfile Reactive Messaging is used in an environment where MicroProfile Health is enabled, implicit readiness +and liveness checks `messaging` are produced. + +[source, json] +---- +{ + "status": "UP", + "checks": [ + { + "name": "messaging", + "status": "UP" + } + ] +} +---- + +=== Readiness +State of the readiness check `messaging` is DOWN until subscribers of all messaging channels are subscribed to their +respective publishers and `onSubscribe` signal is detected on all channels. + +=== Liveness + +State of the liveness check `messaging` is UP until `cancel` or `onError` signals are detected on any of the messaging +channels. + +=== Excluding channels + +For channels which are not desired to be checked for liveness or readiness, exclusion is configurable with +`mp.messaging.health.live.exclude` or `mp.messaging.health.ready.exclude` config keys. + +[source, properties] +---- +mp.messaging.health.live.exclude=, +mp.messaging.health.ready.exclude=, +---- + == Metrics When MicroProfile Reactive Messaging is used in an environment where MicroProfile Metrics is enabled, the Reactive Messaging implementation automatically produces metrics. diff --git a/tck/pom.xml b/tck/pom.xml index 22d170ca..e83b0dff 100644 --- a/tck/pom.xml +++ b/tck/pom.xml @@ -34,6 +34,7 @@ 1.4.1.Final 1.2.6 + 1.1.6 @@ -120,6 +121,19 @@ awaitility + + jakarta.json + jakarta.json-api + ${version.glassfish.json} + provided + + + + org.glassfish + jakarta.json + ${version.glassfish.json} + + org.jboss.shrinkwrap shrinkwrap-api diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java new file mode 100644 index 00000000..453c63ff --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/ChannelRegister.java @@ -0,0 +1,184 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import javax.enterprise.context.ApplicationScoped; + +import org.junit.Assert; + +/** + * Helper bean for keeping references of all the messaging channels + * used in the health test scenario. + */ +@ApplicationScoped +public class ChannelRegister { + + private final Map> channelMap = new HashMap<>(); + private final ReentrantLock mapLock = new ReentrantLock(); + + /** + * Return representation of messaging channel by its name. + * + * @param channelName name of the messaging channel + * @param

payload type of the channel + * @return the channel with references to its publisher and subscriber + */ + @SuppressWarnings("unchecked") +

Channel

get(String channelName) { + try { + mapLock.lock(); + channelMap.putIfAbsent(channelName, new Channel

()); + return (Channel

) channelMap.get(channelName); + } + finally { + mapLock.unlock(); + } + } + + /** + * Return all the channels used in the health test scenario. + * + * @return all the channels + */ + Collection> getAllChannels() { + try { + mapLock.lock(); + return Collections.unmodifiableCollection(channelMap.values()); + } + finally { + mapLock.unlock(); + } + } + + /** + * Iterate over all registered channels, on each's publisher invoke onSubscribe signal + * and block until any onSubscribe signal is received on its subscriber. + */ + void readyAll() { + getAllChannels().forEach(Channel::ready); + } + + /** + * Iterate over all registered channels, on each's publisher invoke onError signal + * and block until any error signal is received on its subscriber. + */ + void failAll() { + getAllChannels().forEach(Channel::fail); + } + + /** + * Iterate over all registered channels, on each's publisher invoke onComplete signal + * and block until any complete signal is received on its subscriber. + */ + void completeAll() { + getAllChannels().forEach(Channel::complete); + } + + /** + * Iterate over all registered channels, on each's subscriber subscription invoke cancel signal + * and block until any cancel signal is received on its publisher. + */ + void cancelAll() { + getAllChannels().forEach(Channel::cancel); + } + + /** + * Represents a messaging channel used in the health test scenario. + * @param type of the channel's payload + */ + public static class Channel { + private final TestPublisher publisher; + private final TestSubscriber subscriber; + + public Channel() { + publisher = new TestPublisher<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + subscriber = new TestSubscriber<>(HealthBase.TIMEOUT_MILLIS, TimeUnit.MILLISECONDS); + } + + /** + * Access this channel's publisher. + * + * @return this channel's test publisher + */ + public TestPublisher publisher() { + return publisher; + } + + /** + * Access this channel's subscriber. + * + * @return this channel's test subscriber + */ + public TestSubscriber subscriber() { + return subscriber; + } + + /** + * Trigger onSubscribe signal and block until received by subscriber. + * + * @return this channel + */ + public Channel ready() { + this.publisher().ready(); + this.subscriber().awaitSubscription(); + return this; + } + + /** + * Trigger onComplete signal and block until received by subscriber. + * + * @return this channel + */ + public Channel complete() { + this.publisher().complete(); + this.subscriber().awaitCompletion(); + return this; + } + + /** + * Trigger onError signal and block until received by subscriber. + * + * @return this channel + */ + public Channel fail() { + Exception exception = new Exception("BOOM!!!"); + this.publisher().fail(exception); + Assert.assertEquals(exception.getMessage(), this.subscriber().awaitError().getMessage()); + return this; + } + + /** + * Trigger cancel signal and block until received by publisher. + * + * @return this channel + */ + public Channel cancel() { + this.subscriber().awaitSubscription().cancel(); + this.publisher().awaitCancel(); + return this; + } + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java new file mode 100644 index 00000000..3ca26490 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAllExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +@ApplicationScoped +public class HealthAllExcludedTestBean { + + public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(ALL_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(ALL_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(ALL_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(ALL_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java new file mode 100644 index 00000000..5b5ac4f3 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthAssertions.java @@ -0,0 +1,179 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.util.List; +import java.util.stream.Collectors; + +import javax.json.Json; +import javax.json.JsonArray; +import javax.json.JsonObject; +import javax.json.JsonReader; +import javax.json.JsonValue; + +import static org.junit.Assert.assertEquals; + +/** + * Health check response representation with ability to assert it's state. + */ +class HealthAssertions { + + private int responseCode; + private JsonObject jsonResponse; + + /** + * Connect health endpoint and download response to be able to assert it later. + * + * @param url of the health check endpoint + * @return new instance of the health assertions + */ + static HealthAssertions create(String url) { + try { + HttpURLConnection con = (HttpURLConnection) new URL(url).openConnection(); + con.setRequestMethod("GET"); + con.setConnectTimeout(HealthBase.TIMEOUT_MILLIS); + con.setReadTimeout(HealthBase.TIMEOUT_MILLIS); + + con.connect(); + HealthAssertions healthAssertions = new HealthAssertions(); + healthAssertions.responseCode = con.getResponseCode(); + + InputStream is = con.getErrorStream(); + if(is == null){ + is = con.getInputStream(); + } + JsonReader jsonReader = Json.createReader(is); + healthAssertions.jsonResponse = jsonReader.readObject(); + is.close(); + con.disconnect(); + return healthAssertions; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Assert health check to have desired HTTP status code. + * + * @param expectedStatus 200 or 503 + * @return this health response assertion + */ + HealthAssertions assertResponseCode(int expected) { + assertEquals(expected, this.responseCode); + return this; + } + + /** + * Assert health check to have HTTP status code 200. + * + * @return this health response assertion + */ + HealthAssertions assertResponseCodeUp(){ + assertResponseCode(200); + return this; + } + + /** + * Assert health check to have HTTP status code 503. + * + * @return this health response assertion + */ + HealthAssertions assertResponseCodeDown(){ + assertResponseCode(503); + return this; + } + + /** + * Assert messaging check to have desired status. + *

{@code
+     * {
+     *   "status": "UP",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "EXPECTED_STATUS"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ + HealthAssertions assertMessagingStatus(String expectedStatus){ + JsonArray checks = this.jsonResponse.getJsonArray("checks"); + List messagingObjects = checks.stream() + .map(JsonValue::asJsonObject) + .filter(o -> o.containsKey("name") && "messaging".equals(o.getString("name"))) + .collect(Collectors.toList()); + assertEquals(1, messagingObjects.size()); + JsonObject messagingObject = messagingObjects.get(0); + assertEquals(expectedStatus, messagingObject.getString("status")); + return this; + } + + /** + * Assert messaging check to have desired status UP. + *
{@code
+     * {
+     *   "status": "UP",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "UP"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ + HealthAssertions assertMessagingStatusUp(){ + assertMessagingStatus("UP"); + return this; + } + + /** + * Assert messaging check to have desired status DOWN. + *
{@code
+     * {
+     *   "status": "DOWN",
+     *   "checks": [
+     *     {
+     *       "name": "messaging",
+     *       "status": "DOWN"
+     *     }
+     *   ]
+     * }
+     * }
+ * + * @param expectedStatus UP or DOWN + * @return this health response assertion + */ + HealthAssertions assertMessagingStatusDown() { + assertMessagingStatus("DOWN"); + return this; + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java new file mode 100644 index 00000000..7de9d6a4 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthBase.java @@ -0,0 +1,77 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.net.URI; +import java.util.ServiceLoader; + +import org.eclipse.microprofile.reactive.messaging.tck.ArchiveExtender; +import org.eclipse.microprofile.reactive.messaging.tck.metrics.ConfigAsset; +import org.eclipse.microprofile.reactive.messaging.tck.metrics.TestConnector; +import org.jboss.arquillian.test.api.ArquillianResource; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.EmptyAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.jboss.shrinkwrap.api.spec.WebArchive; + +public class HealthBase { + + public static final int TIMEOUT_MILLIS = 5000; + + public static final String ALL_EXCLUDED_CHANNEL = "excluded-channel"; + public static final String LIVE_EXCLUDED_CHANNEL = "live-excluded-channel"; + public static final String READY_EXCLUDED_CHANNEL = "ready-excluded-channel"; + public static final String CHANNEL_CONNECTOR_IN = "channel-connector-in"; + public static final String CHANNEL_CONNECTOR_OUT = "channel-connector-out"; + public static final String CHANNEL_INNER = "inner-channel"; + + protected static WebArchive prepareArchive(Class... classes) { + ConfigAsset config = new ConfigAsset() + .put("mp.messaging.incoming.channel-connector-in.connector", TestConnector.ID) + .put("mp.messaging.outgoing.channel-connector-out.connector", TestConnector.ID) + .put("mp.messaging.health.ready.exclude", READY_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL) + .put("mp.messaging.health.live.exclude", LIVE_EXCLUDED_CHANNEL + "," + ALL_EXCLUDED_CHANNEL); + + JavaArchive testJar = ShrinkWrap + .create(JavaArchive.class, "healthTest.jar") + .addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml") + .addAsManifestResource(config, "microprofile-config.properties") + .addClasses(HealthTestBean.class, HealthTestConnector.class, ChannelRegister.class) + .addClasses(classes) + .as(JavaArchive.class); + + ServiceLoader.load(ArchiveExtender.class).iterator().forEachRemaining(ext -> ext.extend(testJar)); + + return ShrinkWrap + .create(WebArchive.class, "healthTest.war") + .addAsLibrary(testJar); + } + + @ArquillianResource + private URI uri; + + protected HealthAssertions getLiveness() { + return HealthAssertions.create(uri + "/health/live"); + } + + protected HealthAssertions getReadiness() { + return HealthAssertions.create(uri + "/health/ready"); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java new file mode 100644 index 00000000..11b66ba2 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorInChannelTest.java @@ -0,0 +1,145 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Cancelled connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthCancelledConnectorInChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java new file mode 100644 index 00000000..984f3c87 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledConnectorOutChannelTest.java @@ -0,0 +1,145 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Cancelled connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthCancelledConnectorOutChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java new file mode 100644 index 00000000..62907d9c --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthCancelledInnerChannelTest.java @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Cancelled inner channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send cancel signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as cancel signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send cancel signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthCancelledInnerChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneCancelledChannelLiveness() { + channelRegister.get(CHANNEL_INNER).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.cancelAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java new file mode 100644 index 00000000..fe4cfbea --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredAndCancelledTest.java @@ -0,0 +1,147 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Combination of failed and cancelled channels scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to one of the channels and cancel to another
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to the remaining channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthErroredAndCancelledTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_INNER).fail(); + channelRegister.get(CHANNEL_CONNECTOR_IN).cancel(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java new file mode 100644 index 00000000..474daefd --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorInChannelTest.java @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Failed connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to channel with publisher provided by connector
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthErroredConnectorInChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java new file mode 100644 index 00000000..467f56c4 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredConnectorOutChannelTest.java @@ -0,0 +1,144 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Failed connector channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to channel with subscriber provided by connector
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthErroredConnectorOutChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java new file mode 100644 index 00000000..f462484d --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthErroredInnerChannelTest.java @@ -0,0 +1,146 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Failed inner channel scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 3. Send onSubscribe signal to one of the channels
  • + *
  • 4. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on ALL the channels yet
  • + *
  • 5. Send onSubscribe signal all of the channels
  • + *
  • 6. Assert {@code /health/ready} is UP as onSubscribe has been issued on ALL the channels
  • + *
  • 7. Assert {@code /health/live} is UP as not onError nor cancel signal has not been issued on any channel
  • + *
  • 8. Send onError signal to one of the channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued on one channel
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to all the other channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthErroredInnerChannelTest extends HealthBase { + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testLivenessBeforeReady() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testPartialReady() { + channelRegister.get(CHANNEL_INNER).ready(); + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testReadiness() { + channelRegister.readyAll(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.failAll(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java new file mode 100644 index 00000000..1cc1cb0b --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthExclusionTest.java @@ -0,0 +1,143 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Channel excluded from readiness and liveness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal all channels except the excluded one
  • + *
  • 3. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels except the excluded one
  • + *
  • 4. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 5. Send onSubscribe signal to excluded channel
  • + *
  • 6. Send onError signal to excluded channel
  • + *
  • 7. Assert {@code /health/live} is UP as onError signal has been issued only on excluded channel
  • + *
  • 8. Send onError signal to one of the not excluded channels
  • + *
  • 9. Assert {@code /health/live} is DOWN as onError signal has been issued
  • + *
  • 10. Assert {@code /health/ready} is still UP
  • + *
  • 11. Send onError signal to the remaining channels
  • + *
  • 12. Assert {@code /health/live} is still DOWN
  • + *
  • 13. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(HealthAllExcludedTestBean.class); + } + + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // ALL_EXCLUDED_CHANNEL is excluded by config + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testLivenessWithFailedExcluded() { + channelRegister.get(ALL_EXCLUDED_CHANNEL).ready(); + channelRegister.get(ALL_EXCLUDED_CHANNEL).fail(); + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java new file mode 100644 index 00000000..a5c7d840 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.LIVE_EXCLUDED_CHANNEL; + +@ApplicationScoped +public class HealthLiveExcludedTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(LIVE_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(LIVE_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(LIVE_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(LIVE_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java new file mode 100644 index 00000000..ccd3dcf0 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthLiveExclusionTest.java @@ -0,0 +1,155 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Channel excluded from liveness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal all channels except the one excluded from liveness check
  • + *
  • 3. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued + * on all the channels, the one is excluded only from liveness
  • + *
  • 4. Send onSubscribe signal to remaining channel
  • + *
  • 5. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels
  • + *
  • 6. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 7. Send onError signal to excluded channel
  • + *
  • 8. Assert {@code /health/live} is UP as onError signal has been issued only on excluded channel
  • + *
  • 9. Send onError signal to one of the not excluded channels
  • + *
  • 10. Assert {@code /health/live} is DOWN as onError signal has been issued
  • + *
  • 11. Assert {@code /health/ready} is still UP
  • + *
  • 12. Send onError signal to the remaining channels
  • + *
  • 13. Assert {@code /health/live} is still DOWN
  • + *
  • 14. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthLiveExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(HealthLiveExcludedTestBean.class); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testPartialReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // LIVE_EXCLUDED_CHANNEL is not excluded from readiness check + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testReadiness() { + channelRegister.get(LIVE_EXCLUDED_CHANNEL).ready(); + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testLivenessWithFailedExcluded() { + channelRegister.get(LIVE_EXCLUDED_CHANNEL).fail(); + // LIVE_EXCLUDED_CHANNEL is excluded from liveness check + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(8) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(9) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java new file mode 100644 index 00000000..24f212a6 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExcludedTestBean.java @@ -0,0 +1,48 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.READY_EXCLUDED_CHANNEL; + +@ApplicationScoped +public class HealthReadyExcludedTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + + @Incoming(READY_EXCLUDED_CHANNEL) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(READY_EXCLUDED_CHANNEL).subscriber(); + } + + @Outgoing(READY_EXCLUDED_CHANNEL) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(READY_EXCLUDED_CHANNEL).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java new file mode 100644 index 00000000..7f23c5f5 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthReadyExclusionTest.java @@ -0,0 +1,129 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.inject.Inject; + +import org.jboss.arquillian.container.test.api.Deployment; +import org.jboss.arquillian.container.test.api.RunAsClient; +import org.jboss.arquillian.junit.Arquillian; +import org.jboss.arquillian.junit.InSequence; +import org.jboss.shrinkwrap.api.spec.WebArchive; +import org.junit.Test; +import org.junit.runner.RunWith; + +/** + * Channel excluded from readiness check scenario: + *
    + *
  • 1. Assert {@code /health/ready} is DOWN as onSubscribe has not been issued on any channel yet
  • + *
  • 2. Send onSubscribe signal to all channels except the one excluded from readiness check
  • + *
  • 3. Assert {@code /health/ready} is UP as onSubscribe has been issued to all except the excluded channel
  • + *
  • 4. Assert {@code /health/live} is UP as not onError nor cancel signal has been issued on any channel
  • + *
  • 5. Send onSubscribe signal to remaining channel
  • + *
  • 6. Send onError signal to excluded channel
  • + *
  • 7. Assert {@code /health/live} is DOWN as onError signal has been issued channel excluded only from readiness
  • + *
  • 8. Assert {@code /health/ready} is UP as onSubscribe has been sent to all channels
  • + *
  • 9. Send onError signal to the remaining channels
  • + *
  • 10. Assert {@code /health/live} is still DOWN
  • + *
  • 11. Assert {@code /health/ready} is still UP
  • + *
+ */ +@RunWith(Arquillian.class) +public class HealthReadyExclusionTest extends HealthBase{ + + @Inject + private ChannelRegister channelRegister; + + @Deployment + public static WebArchive deployment() { + return prepareArchive(HealthReadyExcludedTestBean.class); + } + + @Test + @InSequence(1) + @RunAsClient + public void testNotReady() { + getReadiness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(2) + @RunAsClient + public void testReadiness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).ready(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).ready(); + channelRegister.get(CHANNEL_INNER).ready(); + // READY_EXCLUDED_CHANNEL is excluded by config + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(3) + @RunAsClient + public void testLiveness() { + getLiveness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(4) + @RunAsClient + public void testOneErroredChannelLiveness() { + channelRegister.get(READY_EXCLUDED_CHANNEL).ready(); + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(5) + @RunAsClient + public void testOneErroredChannelReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } + + @Test + @InSequence(6) + @RunAsClient + public void testAllErroredChannelsLiveness() { + channelRegister.get(CHANNEL_CONNECTOR_IN).fail(); + channelRegister.get(CHANNEL_CONNECTOR_OUT).fail(); + channelRegister.get(CHANNEL_INNER).fail(); + getLiveness() + .assertResponseCodeDown() + .assertMessagingStatusDown(); + } + + @Test + @InSequence(7) + @RunAsClient + public void testAllErroredChannelsReadiness() { + getReadiness() + .assertResponseCodeUp() + .assertMessagingStatusUp(); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java new file mode 100644 index 00000000..4669342e --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestBean.java @@ -0,0 +1,59 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.eclipse.microprofile.reactive.messaging.Incoming; +import org.eclipse.microprofile.reactive.messaging.Outgoing; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_CONNECTOR_IN; +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_CONNECTOR_OUT; +import static org.eclipse.microprofile.reactive.messaging.tck.health.HealthBase.CHANNEL_INNER; + +@ApplicationScoped +public class HealthTestBean { + + @Inject + private ChannelRegister channelRegisterBean; + + @Incoming(CHANNEL_CONNECTOR_IN) + public Subscriber consumeConnectorChannel() { + return channelRegisterBean.get(CHANNEL_CONNECTOR_IN).subscriber(); + } + + @Outgoing(CHANNEL_CONNECTOR_OUT) + public Publisher produceConnectorChannel() { + return channelRegisterBean.get(CHANNEL_CONNECTOR_OUT).publisher(); + } + + @Incoming(CHANNEL_INNER) + public Subscriber consumeInnerChannel() { + return channelRegisterBean.get(CHANNEL_INNER).subscriber(); + } + + @Outgoing(CHANNEL_INNER) + public Publisher produceInnerChannel() { + return channelRegisterBean.get(CHANNEL_INNER).publisher(); + } + +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java new file mode 100644 index 00000000..5017ac95 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/HealthTestConnector.java @@ -0,0 +1,57 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import org.eclipse.microprofile.config.Config; +import org.eclipse.microprofile.reactive.messaging.Message; +import org.eclipse.microprofile.reactive.messaging.spi.Connector; +import org.eclipse.microprofile.reactive.messaging.spi.ConnectorFactory; +import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory; +import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory; +import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder; +import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams; +import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder; + +import javax.inject.Inject; + +@Connector(HealthTestConnector.ID) +public class HealthTestConnector implements OutgoingConnectorFactory, IncomingConnectorFactory { + + public static final String ID = "test-connector"; + + @Inject + private ChannelRegister channelRegisterBean; + + @Override + public PublisherBuilder> getPublisherBuilder(final Config config) { + return ReactiveStreams.fromPublisher( + channelRegisterBean.get(config.getValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class)) + .publisher() + ).map(Message::of); + } + + @Override + public SubscriberBuilder, Void> getSubscriberBuilder(final Config config) { + return ReactiveStreams.>builder() + .map(Message::getPayload) + .onError(Throwable::printStackTrace) + .to(channelRegisterBean.get(config.getValue(ConnectorFactory.CHANNEL_NAME_ATTRIBUTE, String.class)) + .subscriber()); + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java new file mode 100644 index 00000000..17e0dcf7 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestPublisher.java @@ -0,0 +1,164 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * Simple manual publisher with guarded state, not thread safe, sequential non-concurrent calls are expected. + * @param payload type + */ +class TestPublisher implements Publisher { + + private final CompletableFuture> subscriberFuture = new CompletableFuture<>(); + private final CompletableFuture cancelFuture = new CompletableFuture<>(); + private final long timeout; + private final TimeUnit timeUnit; + private final AtomicReference state = new AtomicReference<>(State.INIT); + + private enum State { + /** + * Initial state until {@link TestPublisher#ready()} is called. + */ + INIT, + /** + * After {@link TestPublisher#ready()} is called until {@link TestPublisher#fail(Throwable)} or + * {@link TestPublisher#complete()} is invoked or cancel signal is received from downstream. + */ + READY, + /** + * After {@link TestPublisher#fail(Throwable)} is invoked. + */ + FAILED, + /** + * After cancel signal is received from downstream. + */ + CANCELLED, + /** + * After {@link TestPublisher#complete()} is invoked. + */ + COMPLETED + } + + /** + * Create new test publisher with timeout for all blocking operations. + * + * @param timeout timeout value + * @param timeUnit unit for evaluation of timeout value + */ + TestPublisher(long timeout, TimeUnit timeUnit) { + this.timeout = timeout; + this.timeUnit = timeUnit; + } + + @Override + public void subscribe(final Subscriber subscriber) { + subscriberFuture.complete(subscriber); + } + + /** + * Block until subscriber is available + * and send onSubscribe signal to downstream if publisher is in {@link State#INIT INIT} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher ready() { + if (state.compareAndSet(State.INIT, State.READY)) { + awaitSubscriber().onSubscribe(new Subscription() { + @Override + public void request(final long n) { + //noop + } + + @Override + public void cancel() { + if (state.getAndUpdate(s -> s != State.COMPLETED && s != State.FAILED ? State.CANCELLED : s) + != State.CANCELLED) { + cancelFuture.complete(null); + } + } + }); + } + return this; + } + + /** + * Block until subscriber is available + * and send onError signal to downstream if publisher is in {@link State#READY READY} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher fail(Throwable t) { + if (state.compareAndSet(State.READY, State.FAILED)) { + awaitSubscriber().onError(t); + } + return this; + } + + /** + * Block until subscriber is available + * and send onComplete signal to downstream if publisher is in {@link State#READY READY} state, or do nothing. + * + * @return this test publisher + */ + TestPublisher complete() { + if (state.compareAndSet(State.READY, State.COMPLETED)) { + awaitSubscriber().onComplete(); + } + return this; + } + + /** + * Block until cancel signal is received from the subscriber. + * + * @return this test publisher + */ + TestPublisher awaitCancel() { + try { + cancelFuture.get(timeout, timeUnit); + return this; + } + catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new RuntimeException(e); + } + } + + /** + * Wait for a subscriber, + * block till {@link TestPublisher#subscribe(org.reactivestreams.Subscriber) subscribe} method is invoked. + * + * @return subscriber of this publisher + */ + private Subscriber awaitSubscriber() { + try { + return subscriberFuture.get(timeout, timeUnit); + } + catch (InterruptedException | TimeoutException | ExecutionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java new file mode 100644 index 00000000..24d5fbb0 --- /dev/null +++ b/tck/src/main/java/org/eclipse/microprofile/reactive/messaging/tck/health/TestSubscriber.java @@ -0,0 +1,142 @@ +/** + * Copyright (c) 2021 Contributors to the Eclipse Foundation + * + * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * You may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.eclipse.microprofile.reactive.messaging.tck.health; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * Test subscriber able to block until desired signal from upstream is received. + * + * @param payload type + */ +class TestSubscriber implements Subscriber { + + private final CompletableFuture subscription = new CompletableFuture<>(); + private final CompletableFuture error = new CompletableFuture<>(); + private final CompletableFuture complete = new CompletableFuture<>(); + private final long timeout; + private final TimeUnit timeoutUnit; + + /** + * Create new test subscriber with timeout for all blocking operations. + * + * @param timeout timeout value + * @param timeoutUnit unit for evaluation of timeout value + */ + TestSubscriber(long timeout, TimeUnit timeoutUnit) { + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + @Override + public void onSubscribe(final Subscription subscription) { + this.subscription.complete(subscription); + } + + @Override + public void onNext(final PAYLOAD payload) { + //noop + } + + @Override + public void onError(final Throwable t) { + error.complete(t); + } + + @Override + public void onComplete() { + complete.complete(null); + } + + /** + * Block until onError signal is received from upstream. + * + * @return cause of the error signal + */ + public Throwable awaitError() { + try { + return this.error.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * Block until onComplete signal is received from upstream. + */ + public void awaitCompletion() { + try { + this.complete.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * Block until onSubscribe signal is received from upstream. + * + * @return this + */ + public ReadyManualSubscriber awaitSubscription() { + this.getSubscription(); + return new ReadyManualSubscriber(this); + } + + private Subscription getSubscription() { + try { + return this.subscription.get(timeout, timeoutUnit); + } + catch (InterruptedException | ExecutionException | TimeoutException e) { + throw new RuntimeException(e); + } + } + + /** + * Wrapped test subscriber with capability of sending cancel signal. + * + * @param payload type + */ + static class ReadyManualSubscriber { + + private final TestSubscriber testSubscriber; + + ReadyManualSubscriber(final TestSubscriber testSubscriber) { + this.testSubscriber = testSubscriber; + } + + /** + * Block until onSubscribe signal is received from upstream, + * then send cancel signal to upstream. + * + * @return this ready subscriber + */ + public ReadyManualSubscriber cancel() { + this.testSubscriber.getSubscription().cancel(); + return this; + } + } +}