Skip to content

Commit

Permalink
Revert "Context propagation for Messaging"
Browse files Browse the repository at this point in the history
This reverts commit 3153ee8.
  • Loading branch information
ozangunalp committed Jan 31, 2025
1 parent 1621e6b commit 7ed508f
Show file tree
Hide file tree
Showing 20 changed files with 25 additions and 1,350 deletions.
95 changes: 0 additions & 95 deletions docs/src/main/asciidoc/messaging.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,6 @@ public class StreamProcessor {
}
----

[[execution_model]]
== Execution Model

Quarkus Messaging sits on top of the xref:quarkus-reactive-architecture.adoc#engine[reactive engine] of Quarkus and leverages link:{eclipse-vertx}[Eclipse Vert.x] to dispatch messages for processing.
Expand Down Expand Up @@ -635,100 +634,6 @@ Depending on the broker technology, this can be useful to increase the applicati
while still preserving the partial order of messages received in different copies.
This is the case, for example, for Kafka, where multiple consumers can consume different topic partitions.

== Context Propagation

In Quarkus Messaging, the default mechanism for propagating context between different processing stages is the
link:https://smallrye.io/smallrye-reactive-messaging/latest/concepts/message-context[message context].
This provides a consistent way to pass context information along with the message as it flows through different stages.

=== Interaction with Mutiny and MicroProfile Context Propagation

Mutiny, which is the foundation of reactive programming in Quarkus, is integrated with the MicroProfile Context Propagation.
This integration enables automatic capturing and restoring of context across asynchronous boundaries.
To learn more about context propagation in Quarkus and Mutiny, refer to the xref:context-propagation.adoc[Context Propagation] guide.

However, Quarkus Messaging needs to coordinate multiple asynchronous boundaries.
This is why the default context propagation can result in unexpected behavior in some cases, especially using `Emitters`.

To ensure consistent behavior, Quarkus Messaging disables the propagation of any context during message dispatching, through internal channels or connectors.
This means that Emitters won't capture the caller context, and incoming channels won't dispatch messages by activating a context (ex. the request context).

For example, you might want to propagate the caller context from an incoming HTTP request to the message processing stage.
For emitters, instead of using the regular `Emitter` or `MutinyEmitter`, you can inject the `ContextualEmitter` to make sure the message captures the caller context.
This ensures consistent and predictable behaviour by relying on the message context handling provided by the framework.

For example, let `RequestScopedBean` a request-scoped bean, `ContextualEmitter` can be used to dispatch messages locally through the internal channel `app`:

[source, java]
----
import jakarta.inject.Inject;
import jakarta.ws.rs.Consumes;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.core.MediaType;
import org.eclipse.microprofile.reactive.messaging.Channel;
import io.quarkus.logging.Log;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitter;
import io.smallrye.mutiny.Uni;
import io.vertx.core.Context;
import io.vertx.core.Vertx;
@Path("/")
public class Resource {
@Channel("app")
ContextualEmitter<String> emitter;
@Inject
RequestScopedBean requestScopedBean;
@POST
@Path("/send")
public void send(String message) {
requestScopedBean.setValue("Hello");
emitter.sendAndAwait(message);
}
}
----

Then the request-scoped bean can be accessed in the message processing stage, regardless of the <<execution_model>>:

[source, java]
----
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import io.quarkus.logging.Log;
import io.smallrye.reactive.messaging.annotations.Blocking;
@ApplicationScoped
public class Processor {
@Inject
RequestScopedBean requestScopedBean;
@Incoming("app")
@Blocking
public void process(String message) {
Log.infof("Message %s from request %s", message, requestScopedBean.getValue());
}
}
----

=== Request Context Activation

In some cases, you might need to activate the request context while processing messages consumed from a broker.
While using `@ActivateRequestContext` on the `@Incoming` method is an option, it's lifecycle does not follow that of a Quarkus Messaging message.
For incoming channels, you can enable the request scope activation with the build time property `quarkus.messaging.request-scoped.enabled=true`.
This will activate the request context for each message processed by the incoming channel, and close the context once the message is processed.

== Health Checks

Together with the SmallRye Health extension, Quarkus Messaging extensions provide health check support per channel.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,4 @@ public interface ReactiveMessagingBuildTimeConfig {
@WithName("auto-connector-attachment")
@WithDefault("true")
boolean autoConnectorAttachment();

/**
* Whether to enable the RequestScope context on a message context
*/
@WithName("request-scoped.enabled")
@WithDefault("false")
boolean activateRequestScopeEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,13 @@
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedChannelBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.InjectedEmitterBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.items.MediatorBuildItem;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextClearedDecorator;
import io.quarkus.smallrye.reactivemessaging.runtime.ContextualEmitterFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.DuplicatedContextConnectorFactoryInterceptor;
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterFilter;
import io.quarkus.smallrye.reactivemessaging.runtime.HealthCenterInterceptor;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusMediatorConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusWorkerPoolRegistry;
import io.quarkus.smallrye.reactivemessaging.runtime.ReactiveMessagingConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.RequestScopedDecorator;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder;
import io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingRecorder.SmallRyeReactiveMessagingContext;
Expand Down Expand Up @@ -115,14 +112,11 @@ FeatureBuildItem feature() {
}

@BuildStep
void beans(BuildProducer<AdditionalBeanBuildItem> additionalBean, ReactiveMessagingBuildTimeConfig buildTimeConfig) {
AdditionalBeanBuildItem beans() {
// We add the connector and channel qualifiers to make them part of the index.
additionalBean.produce(new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
return new AdditionalBeanBuildItem(SmallRyeReactiveMessagingLifecycle.class, Connector.class,
Channel.class, io.smallrye.reactive.messaging.annotations.Channel.class,
QuarkusWorkerPoolRegistry.class, ContextualEmitterFactory.class, ContextClearedDecorator.class));
if (buildTimeConfig.activateRequestScopeEnabled()) {
additionalBean.produce(new AdditionalBeanBuildItem(RequestScopedDecorator.class));
}
QuarkusWorkerPoolRegistry.class);
}

@BuildStep
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 7ed508f

Please sign in to comment.