diff --git a/docs/src/main/asciidoc/messaging.adoc b/docs/src/main/asciidoc/messaging.adoc index 66213ef4ed59e..478123e5f3a0a 100644 --- a/docs/src/main/asciidoc/messaging.adoc +++ b/docs/src/main/asciidoc/messaging.adoc @@ -562,7 +562,6 @@ public class StreamProcessor { } ---- -[[execution_model]] == Execution Model Quarkus Messaging sits on top of the xref:quarkus-reactive-architecture.adoc#engine[reactive engine] of Quarkus and leverages link:{eclipse-vertx}[Eclipse Vert.x] to dispatch messages for processing. @@ -635,100 +634,6 @@ Depending on the broker technology, this can be useful to increase the applicati while still preserving the partial order of messages received in different copies. This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions. -== Context Propagation - -In Quarkus Messaging, the default mechanism for propagating context between different processing stages is the -link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context]. -This provides a consistent way to pass context information along with the message as it flows through different stages. - -=== Interaction with Mutiny and MicroProfile Context Propagation - -Mutiny, which is the foundation of reactive programming in Quarkus, is integrated with the MicroProfile Context Propagation. -This integration enables automatic capturing and restoring of context across asynchronous boundaries. -To learn more about context propagation in Quarkus and Mutiny, refer to the xref:context-propagation.adoc[Context Propagation] guide. - -However, Quarkus Messaging needs to coordinate multiple asynchronous boundaries. -This is why the default context propagation can result in unexpected behavior in some cases, especially using `Emitters`. - -To ensure consistent behavior, Quarkus Messaging disables the propagation of any context during message dispatching, through internal channels or connectors. -This means that Emitters won't capture the caller context, and incoming channels won't dispatch messages by activating a context (ex. the request context). - -For example, you might want to propagate the caller context from an incoming HTTP request to the message processing stage. -For emitters, instead of using the regular `Emitter` or `MutinyEmitter`, you can inject the `ContextualEmitter` to make sure the message captures the caller context. -This ensures consistent and predictable behaviour by relying on the message context handling provided by the framework. - -For example, let `RequestScopedBean` a request-scoped bean, `ContextualEmitter` can be used to dispatch messages locally through the internal channel `app`: - -[source, java] ----- -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.core.MediaType; - -import org.eclipse.microprofile.reactive.messaging.Channel; - -import io.quarkus.logging.Log; -import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter; -import io.smallrye.mutiny.Uni; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@Path("/") -public class Resource { - - @Channel("app") - ContextualEmitter emitter; - - @Inject - RequestScopedBean requestScopedBean; - - @POST - @Path("/send") - public void send(String message) { - requestScopedBean.setValue("Hello"); - emitter.sendAndAwait(message); - } - -} ----- - -Then the request-scoped bean can be accessed in the message processing stage, regardless of the <>: - -[source, java] ----- -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.eclipse.microprofile.reactive.messaging.Incoming; - -import io.quarkus.logging.Log; -import io.smallrye.reactive.messaging.annotations.Blocking; - - -@ApplicationScoped -public class Processor { - - @Inject - RequestScopedBean requestScopedBean; - - @Incoming("app") - @Blocking - public void process(String message) { - Log.infof("Message %s from request %s", message, requestScopedBean.getValue()); - } - -} ----- - -=== Request Context Activation - -In some cases, you might need to activate the request context while processing messages consumed from a broker. -While using `@ActivateRequestContext` on the `@Incoming` method is an option, it's lifecycle does not follow that of a Quarkus Messaging message. -For incoming channels, you can enable the request scope activation with the build time property `quarkus.messaging.request-scoped.enabled=true`. -This will activate the request context for each message processed by the incoming channel, and close the context once the message is processed. - == Health Checks Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel. diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java index 0003fdbc1d93f..f9c105e399192 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingBuildTimeConfig.java @@ -27,11 +27,4 @@ public interface ReactiveMessagingBuildTimeConfig { @WithName("auto-connector-attachment") @WithDefault("true") boolean autoConnectorAttachment(); - - /** - * Whether to enable the RequestScope context on a message context - */ - @WithName("request-scoped.enabled") - @WithDefault("false") - boolean activateRequestScopeEnabled(); } diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java index e10c721f09cf6..16d9b47dea26c 100644 --- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java +++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java @@ -70,8 +70,6 @@ import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem; import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem; -import io.quarkus.smallrye.reactivemessaging.runtime.ContextClearedDecorator; -import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitterFactory; import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactory; import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor; import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterFilter; @@ -79,7 +77,6 @@ import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration; import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry; import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration; -import io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator; import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle; import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder; import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext; @@ -115,14 +112,11 @@ FeatureBuildItem feature() { } @BuildStep - void beans(BuildProducer additionalBean, ReactiveMessagingBuildTimeConfig buildTimeConfig) { + AdditionalBeanBuildItem beans() { // We add the connector and channel qualifiers to make them part of the index. - additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class, + return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class, Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class, - QuarkusWorkerPoolRegistry.class, ContextualEmitterFactory.class, ContextClearedDecorator.class)); - if (buildTimeConfig.activateRequestScopeEnabled()) { - additionalBean.produce(new AdditionalBeanBuildItem(RequestScopedDecorator.class)); - } + QuarkusWorkerPoolRegistry.class); } @BuildStep diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextClearedDecorator.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextClearedDecorator.java deleted file mode 100644 index d3a33014ab3a7..0000000000000 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextClearedDecorator.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.quarkus.smallrye.reactivemessaging.runtime; - -import java.util.List; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.eclipse.microprofile.context.ThreadContext; -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.PublisherDecorator; - -@ApplicationScoped -public class ContextClearedDecorator implements PublisherDecorator { - - private final ThreadContext tc; - - public ContextClearedDecorator() { - tc = ThreadContext.builder() - .propagated(ThreadContext.NONE) - .cleared(ThreadContext.ALL_REMAINING) - .build(); - } - - @Override - public Multi> decorate(Multi> publisher, List channelName, - boolean isConnector) { - if (isConnector) { - return publisher.emitOn(tc.currentContextExecutor()); - } - return publisher; - } - - @Override - public int getPriority() { - // Before the io.smallrye.reactive.messaging.providers.locals.ContextDecorator which has the priority 0 - return -100; - } -} diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java deleted file mode 100644 index eaaa3a199178e..0000000000000 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitter.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.quarkus.smallrye.reactivemessaging.runtime; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.EmitterType; - -public interface ContextualEmitter extends EmitterType { - - Uni send(T payload); - - void sendAndAwait(T payload); - - > Uni sendMessage(M msg); - - > void sendMessageAndAwait(M msg); -} diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterFactory.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterFactory.java deleted file mode 100644 index aca904dae2a8a..0000000000000 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterFactory.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.quarkus.smallrye.reactivemessaging.runtime; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.enterprise.inject.Produces; -import jakarta.enterprise.inject.Typed; -import jakarta.enterprise.inject.spi.InjectionPoint; -import jakarta.inject.Inject; - -import org.eclipse.microprofile.reactive.messaging.Channel; - -import io.smallrye.reactive.messaging.ChannelRegistry; -import io.smallrye.reactive.messaging.EmitterConfiguration; -import io.smallrye.reactive.messaging.EmitterFactory; -import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor; -import io.smallrye.reactive.messaging.providers.extension.ChannelProducer; - -@EmitterFactoryFor(ContextualEmitter.class) -@ApplicationScoped -public class ContextualEmitterFactory implements EmitterFactory> { - - @Inject - ChannelRegistry channelRegistry; - - @Override - public ContextualEmitterImpl createEmitter(EmitterConfiguration emitterConfiguration, long l) { - return new ContextualEmitterImpl<>(emitterConfiguration, l); - } - - @Produces - @Typed(ContextualEmitter.class) - @Channel("") // Stream name is ignored during type-safe resolution - ContextualEmitter produceEmitter(InjectionPoint injectionPoint) { - String channelName = ChannelProducer.getChannelName(injectionPoint); - return channelRegistry.getEmitter(channelName, ContextualEmitter.class); - } -} diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterImpl.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterImpl.java deleted file mode 100644 index bb769d1195a93..0000000000000 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/ContextualEmitterImpl.java +++ /dev/null @@ -1,72 +0,0 @@ -package io.quarkus.smallrye.reactivemessaging.runtime; - -import static io.smallrye.reactive.messaging.providers.i18n.ProviderExceptions.ex; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.smallrye.common.annotation.CheckReturnValue; -import io.smallrye.common.vertx.VertxContext; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.EmitterConfiguration; -import io.smallrye.reactive.messaging.providers.extension.AbstractEmitter; -import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -public class ContextualEmitterImpl extends AbstractEmitter implements ContextualEmitter { - - public ContextualEmitterImpl(EmitterConfiguration configuration, long defaultBufferSize) { - super(configuration, defaultBufferSize); - } - - @Override - public void sendAndAwait(T payload) { - sendMessage(Message.of(payload)).await().indefinitely(); - } - - @Override - public Uni send(T payload) { - return sendMessage(Message.of(payload)); - } - - @Override - public > void sendMessageAndAwait(M msg) { - sendMessage(msg).await().indefinitely(); - } - - @Override - @CheckReturnValue - public > Uni sendMessage(M msg) { - if (msg == null) { - throw ex.illegalArgumentForNullValue(); - } - - // If we are running on a Vert.x context, we need to capture the context to switch back during the emission. - Context context = Vertx.currentContext(); - Uni uni = Uni.createFrom().emitter(e -> { - try { - Message message = msg; - if (VertxContext.isDuplicatedContext(context)) { - message = message.addMetadata(new LocalContextMetadata(context)); - } - emit(message.withAck(() -> { - e.complete(null); - return msg.ack(); - }) - .withNack(t -> { - e.fail(t); - return msg.nack(t); - })); - } catch (Exception t) { - // Capture synchronous exception and nack the message. - msg.nack(t); - throw t; - } - }); - if (context != null) { - uni = uni.emitOn(runnable -> context.runOnContext(x -> runnable.run())); - } - return uni; - } - -} diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java index 878b867b8b3d4..e93767d8aba6d 100644 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java +++ b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/QuarkusWorkerPoolRegistry.java @@ -6,7 +6,6 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; import jakarta.annotation.Priority; import jakarta.enterprise.context.ApplicationScoped; @@ -26,9 +25,7 @@ import io.smallrye.reactive.messaging.providers.connectors.WorkerPoolRegistry; import io.smallrye.reactive.messaging.providers.helpers.Validation; import io.vertx.core.impl.ConcurrentHashSet; -import io.vertx.core.impl.ContextInternal; import io.vertx.mutiny.core.Context; -import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.core.WorkerExecutor; @Alternative @@ -65,53 +62,42 @@ public void terminate( } } - public Uni executeWork(Context msgContext, Uni uni, String workerName, boolean ordered) { + public Uni executeWork(Context currentContext, Uni uni, String workerName, boolean ordered) { Objects.requireNonNull(uni, "Action to execute not provided"); + if (workerName == null) { - if (msgContext != null) { - return msgContext.executeBlocking(uni, ordered); + if (currentContext != null) { + return currentContext.executeBlocking(Uni.createFrom().deferred(() -> uni), ordered); } return executionHolder.vertx().executeBlocking(uni, ordered); } else if (virtualThreadWorkers.contains(workerName)) { - return runOnVirtualThread(msgContext, uni); + return runOnVirtualThread(currentContext, uni); } else { - return runOnWorkerThread(msgContext, uni, workerName, ordered); - } - } - - private Uni runOnWorkerThread(Context msgContext, Uni uni, String workerName, boolean ordered) { - WorkerExecutor worker = getWorker(workerName); - if (msgContext != null) { - return uniOnMessageContext(worker.executeBlocking(uni, ordered), msgContext) - .onItemOrFailure().transformToUni((item, failure) -> { - return Uni.createFrom().emitter(emitter -> { - if (failure != null) { - msgContext.runOnContext(() -> emitter.fail(failure)); - } else { - msgContext.runOnContext(() -> emitter.complete(item)); - } + if (currentContext != null) { + return getWorker(workerName).executeBlocking(uni, ordered) + .onItemOrFailure().transformToUni((item, failure) -> { + return Uni.createFrom().emitter(emitter -> { + if (failure != null) { + currentContext.runOnContext(() -> emitter.fail(failure)); + } else { + currentContext.runOnContext(() -> emitter.complete(item)); + } + }); }); - }); + } + return getWorker(workerName).executeBlocking(uni, ordered); } - return worker.executeBlocking(uni, ordered); } - private static Uni uniOnMessageContext(Uni uni, Context msgContext) { - return msgContext != Vertx.currentContext() ? Uni.createFrom().deferred(() -> uni) - .runSubscriptionOn(r -> new ContextPreservingRunnable(r, msgContext).run()) - : uni; - } - - private Uni runOnVirtualThread(Context msgContext, Uni uni) { - ExecutorService vtExecutor = VirtualThreadsRecorder.getCurrent(); - return uniOnMessageContext(uni, msgContext, vtExecutor) + private Uni runOnVirtualThread(Context currentContext, Uni uni) { + return uni.runSubscriptionOn(VirtualThreadsRecorder.getCurrent()) .onItemOrFailure().transformToUni((item, failure) -> { return Uni.createFrom().emitter(emitter -> { - if (msgContext != null) { + if (currentContext != null) { if (failure != null) { - msgContext.runOnContext(() -> emitter.fail(failure)); + currentContext.runOnContext(() -> emitter.fail(failure)); } else { - msgContext.runOnContext(() -> emitter.complete(item)); + currentContext.runOnContext(() -> emitter.complete(item)); } } else { // Some method do not have a context (generator methods) @@ -125,38 +111,6 @@ private Uni runOnVirtualThread(Context msgContext, Uni uni) { }); } - private static Uni uniOnMessageContext(Uni uni, Context msgContext, ExecutorService vtExecutor) { - return msgContext != Vertx.currentContext() - ? uni.runSubscriptionOn(r -> vtExecutor.execute(new ContextPreservingRunnable(r, msgContext))) - : uni.runSubscriptionOn(vtExecutor); - } - - private static final class ContextPreservingRunnable implements Runnable { - - private final Runnable task; - private final io.vertx.core.Context context; - - public ContextPreservingRunnable(Runnable task, Context context) { - this.task = task; - this.context = context.getDelegate(); - } - - @Override - public void run() { - if (context instanceof ContextInternal) { - ContextInternal contextInternal = (ContextInternal) context; - final var previousContext = contextInternal.beginDispatch(); - try { - task.run(); - } finally { - contextInternal.endDispatch(previousContext); - } - } else { - task.run(); - } - } - } - public WorkerExecutor getWorker(String workerName) { Objects.requireNonNull(workerName, "Worker Name not specified"); diff --git a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/RequestScopedDecorator.java b/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/RequestScopedDecorator.java deleted file mode 100644 index b6e8262d8c6e7..0000000000000 --- a/extensions/smallrye-reactive-messaging/runtime/src/main/java/io/quarkus/smallrye/reactivemessaging/runtime/RequestScopedDecorator.java +++ /dev/null @@ -1,55 +0,0 @@ -package io.quarkus.smallrye.reactivemessaging.runtime; - -import java.util.List; -import java.util.Optional; - -import jakarta.enterprise.context.ApplicationScoped; - -import org.eclipse.microprofile.reactive.messaging.Message; - -import io.quarkus.arc.Arc; -import io.quarkus.arc.InjectableContext; -import io.quarkus.arc.ManagedContext; -import io.smallrye.common.vertx.VertxContext; -import io.smallrye.mutiny.Multi; -import io.smallrye.reactive.messaging.PublisherDecorator; -import io.smallrye.reactive.messaging.providers.locals.LocalContextMetadata; - -@ApplicationScoped -public class RequestScopedDecorator implements PublisherDecorator { - @Override - public Multi> decorate(Multi> publisher, List channelName, - boolean isConnector) { - if (isConnector) { - return publisher.map(message -> { - Optional localContextMetadata = message.getMetadata(LocalContextMetadata.class); - if (localContextMetadata.isPresent() && VertxContext.isOnDuplicatedContext()) { - ManagedContext requestContext = Arc.container().requestContext(); - if (!requestContext.isActive()) { - requestContext.activate(); - InjectableContext.ContextState state = requestContext.getState(); - Message withAck = message.withAckWithMetadata(m -> message.ack(m) - .thenAccept(x -> { - requestContext.destroy(state); - requestContext.deactivate(); - })); - return withAck.withNackWithMetadata((m, t) -> withAck.nack(m, t) - .thenAccept(x -> { - requestContext.destroy(state); - requestContext.deactivate(); - })); - } - return message; - } else { - return message; - } - }); - } - return publisher; - } - - @Override - public int getPriority() { - return 100; - } -} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index adca102c27412..69bc1f6b44788 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -339,7 +339,6 @@ reactive-messaging-rabbitmq-dyn reactive-messaging-hibernate-reactive reactive-messaging-hibernate-orm - reactive-messaging-context-propagation rest-client resteasy-reactive-kotlin rest-client-reactive diff --git a/integration-tests/reactive-messaging-context-propagation/pom.xml b/integration-tests/reactive-messaging-context-propagation/pom.xml deleted file mode 100644 index a989b977eb943..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/pom.xml +++ /dev/null @@ -1,269 +0,0 @@ - - - - quarkus-integration-tests-parent - io.quarkus - 999-SNAPSHOT - - 4.0.0 - - quarkus-integration-test-reactive-messaging-context-propagation - Quarkus - Integration Tests - Reactive Messaging - Context Propagation - The Reactive Messaging Context Propagation integration tests module - - - true - - - - - io.quarkus - quarkus-integration-test-class-transformer - - - io.quarkus - quarkus-integration-test-shared-library - - - - - io.quarkus - quarkus-rest - - - io.quarkus - quarkus-rest-jackson - - - io.quarkus - quarkus-jsonb - - - - - io.quarkus - quarkus-smallrye-health - - - - - io.quarkus - quarkus-micrometer-registry-prometheus - - - - - io.quarkus - quarkus-kafka-client - - - io.quarkus - quarkus-messaging-kafka - - - - - - io.quarkus - quarkus-junit5 - test - - - io.quarkus - quarkus-test-vertx - - - io.rest-assured - rest-assured - test - - - jakarta.xml.bind - jakarta.xml.bind-api - - - - - io.quarkus - quarkus-test-kafka-companion - test - - - - - io.quarkus - quarkus-integration-test-class-transformer-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-kafka-client-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-rest-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-rest-jackson-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-jsonb-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-smallrye-health-deployment - ${project.version} - pom - test - - - * - * - - - - - - io.quarkus - quarkus-micrometer-registry-prometheus-deployment - ${project.version} - pom - test - - - * - * - - - - - io.quarkus - quarkus-messaging-kafka-deployment - ${project.version} - pom - test - - - * - * - - - - - io.smallrye.reactive - smallrye-reactive-messaging-api - - - org.awaitility - awaitility - test - - - - - - - io.quarkus - quarkus-maven-plugin - - - - build - - - - - - - maven-failsafe-plugin - - true - - - - - maven-surefire-plugin - - true - - - - - - - - test-kafka - - - test-containers - - - - - - maven-surefire-plugin - - false - - - - maven-failsafe-plugin - - false - - - - - - - - - diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerContextualResource.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerContextualResource.java deleted file mode 100644 index 95586177e2b25..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerContextualResource.java +++ /dev/null @@ -1,122 +0,0 @@ -package io.quarkus.it.kafka; - -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.core.MediaType; - -import org.eclipse.microprofile.reactive.messaging.Channel; - -import io.quarkus.logging.Log; -import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter; -import io.smallrye.mutiny.Uni; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@Path("/flowers/contextual") -public class FlowerContextualResource { - - @Channel("contextual-flower") - ContextualEmitter emitter; - - @Channel("contextual-flower-blocking") - ContextualEmitter emitterBlocking; - - @Channel("contextual-flower-blocking-named") - ContextualEmitter emitterBlockingNamed; - - @Channel("contextual-flower-virtual-thread") - ContextualEmitter emitterVT; - - @Inject - RequestBean reqBean; - - @POST - @Path("/uni") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniEventLoop(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitter.send(body); - } - - @POST - @Path("/uni/blocking") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniBlocking(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitterBlocking.send(body); - } - - @POST - @Path("/uni/blocking-named") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniBlockingNamed(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitterBlockingNamed.send(body); - } - - @POST - @Path("/uni/virtual-thread") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniVT(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitterVT.send(body); - } - - @POST - @Path("/") - @Consumes(MediaType.TEXT_PLAIN) - public void eventLoop(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitter.sendAndAwait(body); - } - - @POST - @Path("/blocking") - @Consumes(MediaType.TEXT_PLAIN) - public void blocking(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitterBlocking.sendAndAwait(body); - } - - @POST - @Path("/blocking-named") - @Consumes(MediaType.TEXT_PLAIN) - public void blockingNamed(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitterBlockingNamed.sendAndAwait(body); - } - - @POST - @Path("/virtual-thread") - @Consumes(MediaType.TEXT_PLAIN) - public void vt(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitterVT.sendAndAwait(body); - } -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerProducer.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerProducer.java deleted file mode 100644 index 7cae335760a97..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerProducer.java +++ /dev/null @@ -1,46 +0,0 @@ -package io.quarkus.it.kafka; - -import java.util.List; -import java.util.concurrent.CopyOnWriteArrayList; - -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.GET; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.Produces; -import jakarta.ws.rs.core.MediaType; - -import org.eclipse.microprofile.reactive.messaging.Channel; - -import io.smallrye.reactive.messaging.MutinyEmitter; - -@Path("/flowers") -public class FlowerProducer { - - List received = new CopyOnWriteArrayList<>(); - - @Channel("flowers-out") - MutinyEmitter emitter; - - @POST - @Path("/produce") - @Consumes(MediaType.TEXT_PLAIN) - public void produce(String flower) { - emitter.sendAndAwait(flower); - } - - void addReceived(String flower) { - received.add(flower); - } - - public List getReceived() { - return received; - } - - @GET - @Path("/received") - @Produces(MediaType.APPLICATION_JSON) - public List received() { - return received; - } -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerReceivers.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerReceivers.java deleted file mode 100644 index 956360e90e197..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerReceivers.java +++ /dev/null @@ -1,104 +0,0 @@ -package io.quarkus.it.kafka; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.jboss.logging.Logger; - -import io.quarkus.logging.Log; -import io.quarkus.test.vertx.VirtualThreadsAssertions; -import io.smallrye.common.annotation.NonBlocking; -import io.smallrye.common.annotation.RunOnVirtualThread; -import io.smallrye.reactive.messaging.annotations.Blocking; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@ApplicationScoped -public class FlowerReceivers { - - @Inject - RequestBean reqBean; - - @Inject - Logger logger; - - @Incoming("flower") - void process(String name) { - Context ctx = Vertx.currentContext(); - assert Context.isOnEventLoopThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Greeting, %s", reqBean.getName()); - } - - @Blocking - @Incoming("flower-blocking") - void processBlocking(String name) { - Context ctx = Vertx.currentContext(); - assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Greeting, %s", reqBean.getName()); - } - - @Blocking("named-pool") - @Incoming("flower-blocking-named") - void processBlockingNamed(String name) { - Context ctx = Vertx.currentContext(); - assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Greeting, %s", reqBean.getName()); - } - - @RunOnVirtualThread - @Incoming("flower-virtual-thread") - void processVT(String name) { - Context ctx = Vertx.currentContext(); - VirtualThreadsAssertions.assertThatItRunsOnVirtualThread(); - VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Greeting, %s", reqBean.getName()); - } - - @Inject - FlowerProducer producer; - - @Incoming("flowers-in") - @NonBlocking - void receive(String flower) { - Context ctx = Vertx.currentContext(); - assert Context.isOnEventLoopThread(); - // assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - System.out.println("Received io: " + flower); - producer.addReceived(flower); - } - - @Incoming("flowers-in") - @Blocking - void receiveBlocking(String flower) { - Context ctx = Vertx.currentContext(); - assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - System.out.println("Received blocking: " + flower); - producer.addReceived(flower); - } - - @Incoming("flowers-in") - @RunOnVirtualThread - void receiveVT(String flower) { - Context ctx = Vertx.currentContext(); - VirtualThreadsAssertions.assertThatItRunsOnVirtualThread(); - VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - System.out.println("Received vt: " + flower); - producer.addReceived(flower); - } - -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerResource.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerResource.java deleted file mode 100644 index 19df8577c9c48..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowerResource.java +++ /dev/null @@ -1,135 +0,0 @@ -package io.quarkus.it.kafka; - -import jakarta.inject.Inject; -import jakarta.ws.rs.Consumes; -import jakarta.ws.rs.POST; -import jakarta.ws.rs.Path; -import jakarta.ws.rs.core.MediaType; - -import org.eclipse.microprofile.context.ThreadContext; -import org.eclipse.microprofile.reactive.messaging.Channel; - -import io.quarkus.logging.Log; -import io.smallrye.context.api.CurrentThreadContext; -import io.smallrye.mutiny.Uni; -import io.smallrye.reactive.messaging.MutinyEmitter; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@Path("/flowers") -public class FlowerResource { - - @Channel("flower") - MutinyEmitter emitter; - - @Channel("flower-blocking") - MutinyEmitter emitterBlocking; - - @Channel("flower-blocking-named") - MutinyEmitter emitterBlockingNamed; - - @Channel("flower-virtual-thread") - MutinyEmitter emitterVT; - - @Inject - RequestBean reqBean; - - @CurrentThreadContext(cleared = ThreadContext.CDI) - void emitWithoutContext(MutinyEmitter emitter, String body) { - emitter.sendAndAwait(body); - } - - @CurrentThreadContext(cleared = ThreadContext.CDI) - Uni emitWithoutContextUni(MutinyEmitter emitter, String body) { - return emitter.send(body); - } - - @POST - @Path("/uni") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniEventLoop(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitWithoutContextUni(emitter, body); - } - - @POST - @Path("/uni/blocking") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniBlocking(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitWithoutContextUni(emitterBlocking, body); - } - - @POST - @Path("/uni/blocking-named") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniBlockingNamed(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitWithoutContextUni(emitterBlockingNamed, body); - } - - @POST - @Path("/uni/virtual-thread") - @Consumes(MediaType.TEXT_PLAIN) - public Uni uniVT(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - return emitWithoutContextUni(emitterVT, body); - } - - @POST - @Path("/") - @Consumes(MediaType.TEXT_PLAIN) - public void eventLoop(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitWithoutContext(emitter, body); - } - - @POST - @Path("/blocking") - @Consumes(MediaType.TEXT_PLAIN) - public void blocking(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitWithoutContext(emitterBlocking, body); - } - - @POST - @Path("/blocking-named") - @Consumes(MediaType.TEXT_PLAIN) - public void blockingNamed(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitWithoutContext(emitterBlockingNamed, body); - } - - @POST - @Path("/virtual-thread") - @Consumes(MediaType.TEXT_PLAIN) - public void vt(String body) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - reqBean.setName(body != null ? body.toUpperCase() : body); - emitWithoutContext(emitterVT, body); - } - -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowersContextualReceivers.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowersContextualReceivers.java deleted file mode 100644 index bc0c23020897a..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/FlowersContextualReceivers.java +++ /dev/null @@ -1,63 +0,0 @@ -package io.quarkus.it.kafka; - -import jakarta.enterprise.context.ApplicationScoped; -import jakarta.inject.Inject; - -import org.eclipse.microprofile.reactive.messaging.Incoming; -import org.jboss.logging.Logger; - -import io.quarkus.logging.Log; -import io.quarkus.test.vertx.VirtualThreadsAssertions; -import io.smallrye.common.annotation.RunOnVirtualThread; -import io.smallrye.reactive.messaging.annotations.Blocking; -import io.vertx.core.Context; -import io.vertx.core.Vertx; - -@ApplicationScoped -public class FlowersContextualReceivers { - - @Inject - RequestBean reqBean; - - @Inject - Logger logger; - - @Incoming("contextual-flower") - void processContextual(String name) { - Context ctx = Vertx.currentContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Hello, %s", reqBean.getName()); - } - - @Blocking - @Incoming("contextual-flower-blocking") - void processContextualBlocking(String name) { - Context ctx = Vertx.currentContext(); - assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Hello, %s", reqBean.getName()); - } - - @Blocking("named-pool") - @Incoming("contextual-flower-blocking-named") - void processContextualBlockingNamed(String name) { - Context ctx = Vertx.currentContext(); - assert Context.isOnWorkerThread(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Hello, %s", reqBean.getName()); - } - - @RunOnVirtualThread - @Incoming("contextual-flower-virtual-thread") - void processContextualVT(String name) { - Context ctx = Vertx.currentContext(); - VirtualThreadsAssertions.assertThatItRunsOnVirtualThread(); - VirtualThreadsAssertions.assertThatItRunsOnADuplicatedContext(); - Log.info(ctx + "[" + ctx.getClass() + "]"); - Log.infof("bean: %s, id: %s", reqBean, reqBean.getId()); - logger.infof("Hello, %s", reqBean.getName()); - } -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/RequestBean.java b/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/RequestBean.java deleted file mode 100644 index 99cb1c376412a..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/java/io/quarkus/it/kafka/RequestBean.java +++ /dev/null @@ -1,34 +0,0 @@ -package io.quarkus.it.kafka; - -import java.util.UUID; - -import jakarta.annotation.PostConstruct; -import jakarta.enterprise.context.RequestScoped; - -@RequestScoped -public class RequestBean { - - private final String id; - private String name; - - public RequestBean() { - this.id = UUID.randomUUID().toString(); - } - - @PostConstruct - void construct() { - System.out.println("ReqBean constructed " + this.id); - } - - public String getId() { - return id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/main/resources/application.properties b/integration-tests/reactive-messaging-context-propagation/src/main/resources/application.properties deleted file mode 100644 index 8679577b7decf..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/main/resources/application.properties +++ /dev/null @@ -1,21 +0,0 @@ -quarkus.log.category.kafka.level=WARN -quarkus.log.category.\"org.apache.kafka\".level=WARN -quarkus.log.category.\"org.apache.zookeeper\".level=WARN - -# enable health check -quarkus.kafka.health.enabled=true - -kafka.auto.offset.reset=earliest - -quarkus.micrometer.binder.messaging.enabled=true -smallrye.messaging.observation.enabled=true - - -smallrye.messaging.worker.named-pool.max-concurrency=2 - -mp.messaging.incoming.flowers-in.topic=flowers -# Broadcast to 3 different consume methods -mp.messaging.incoming.flowers-in.broadcast=true -mp.messaging.outgoing.flowers-out.topic=flowers -# Uncomment the following line to enable request-scoped messaging, disabled by default: -#quarkus.messaging.request-scoped.enabled=true \ No newline at end of file diff --git a/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationIT.java b/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationIT.java deleted file mode 100644 index c3a144e1bfc10..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationIT.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.quarkus.it.kafka; - -import org.hamcrest.Matcher; -import org.hamcrest.Matchers; - -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -public class KafkaContextPropagationIT extends KafkaContextPropagationTest { - - @Override - protected Matcher assertBodyRequestScopedContextWasNotActive() { - return Matchers.not(Matchers.blankOrNullString()); - } -} diff --git a/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationTest.java b/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationTest.java deleted file mode 100644 index c840c24a9612d..0000000000000 --- a/integration-tests/reactive-messaging-context-propagation/src/test/java/io/quarkus/it/kafka/KafkaContextPropagationTest.java +++ /dev/null @@ -1,142 +0,0 @@ -package io.quarkus.it.kafka; - -import static io.restassured.RestAssured.given; -import static org.awaitility.Awaitility.await; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.not; - -import java.util.concurrent.TimeUnit; - -import org.hamcrest.Matcher; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledForJreRange; -import org.junit.jupiter.api.condition.JRE; - -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusTest; -import io.quarkus.test.kafka.KafkaCompanionResource; - -@QuarkusTest -@QuarkusTestResource(KafkaCompanionResource.class) -public class KafkaContextPropagationTest { - - @Test - void testContextPropagation() { - given().body("rose").post("/flowers/contextual").then().statusCode(204); - } - - @Test - void testContextPropagationUni() { - given().body("rose").post("/flowers/contextual/uni").then().statusCode(204); - } - - @Test - void testContextPropagationBlocking() { - given().body("rose").post("/flowers/contextual/blocking").then().statusCode(204); - } - - @Test - void testContextPropagationBlockingUni() { - given().body("rose").post("/flowers/contextual/uni/blocking").then().statusCode(204); - } - - @Test - void testContextPropagationBlockingNamed() { - given().body("rose").post("/flowers/contextual/blocking-named").then().statusCode(204); - } - - @Test - void testContextPropagationBlockingNamedUni() { - given().body("rose").post("/flowers/contextual/uni/blocking-named").then().statusCode(204); - } - - @Test - @EnabledForJreRange(min = JRE.JAVA_21) - void testContextPropagationVirtualThread() { - given().body("rose").post("/flowers/contextual/virtual-thread").then().statusCode(204); - } - - @Test - @EnabledForJreRange(min = JRE.JAVA_21) - void testContextPropagationVirtualThreadUni() { - given().body("rose").post("/flowers/contextual/uni/virtual-thread").then().statusCode(204); - } - - @Test - void testAbsenceOfContextPropagation() { - given().body("rose").post("/flowers").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - void testAbsenceOfContextPropagationUni() { - given().body("rose").post("/flowers/uni").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - void testAbsenceOfContextPropagationBlocking() { - given().body("rose").post("/flowers/blocking").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - void testAbsenceOfContextPropagationBlockingUni() { - given().body("rose").post("/flowers/uni/blocking").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - void testAbsenceOfContextPropagationBlockingNamed() { - given().body("rose").post("/flowers/blocking-named").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - void testAbsenceOfContextPropagationBlockingNamedUni() { - given().body("rose").post("/flowers/uni/blocking-named").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - @EnabledForJreRange(min = JRE.JAVA_21) - void testAbsenceOfContextPropagationVirtualThread() { - given().body("rose").post("/flowers/virtual-thread").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - @Test - @EnabledForJreRange(min = JRE.JAVA_21) - void testAbsenceOfContextPropagationVirtualThreadUni() { - given().body("rose").post("/flowers/uni/virtual-thread").then() - .statusCode(500) - .body(assertBodyRequestScopedContextWasNotActive()); - } - - protected Matcher assertBodyRequestScopedContextWasNotActive() { - return containsString("RequestScoped context was not active"); - } - - @Test - void testIncomingFromConnector() { - given().body("rose").post("/flowers/produce").then() - .statusCode(204); - given().body("daisy").post("/flowers/produce").then() - .statusCode(204); - given().body("peony").post("/flowers/produce").then() - .statusCode(204); - - await().pollDelay(5, TimeUnit.SECONDS).untilAsserted(() -> given().get("/flowers/received") - .then().body(not(containsString("rose")), - not(containsString("daisy")), - not(containsString("peony")))); - } - -}