Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ReactiveStreams #166

Open
FroMage opened this issue Jun 26, 2019 · 9 comments · May be fixed by #180
Open

Support ReactiveStreams #166

FroMage opened this issue Jun 26, 2019 · 9 comments · May be fixed by #180
Labels
enhancement New feature or request

Comments

@FroMage
Copy link
Contributor

FroMage commented Jun 26, 2019

Although the plugin system of MP-CP and RxJava2 make it possible to have automatic context propagation for RxJava2, it appears some ReactiveStreams implementations don't use RxJava2 or Reactor but built their own implementations, which don't necessarily have a plugin system and so must be manually embarked into context propagation.

We could add those methods to ThreadContext:

public <T> Subscriber<T> wrapSubscriber(Subscriber<T> subscriber);

At the cost of a dependency to ReactiveStreams, but that's an industry standard and has 4 interfaces. We could add support for the JDK 9's Flow.Subscriber too, but I don't think we can add a dependency to JDK9.

@FroMage FroMage added the enhancement New feature or request label Jun 26, 2019
@FroMage
Copy link
Contributor Author

FroMage commented Jun 26, 2019

FTR, the MongoDB driver has its own RS implementation, so it's quite a popular library.

@njr-11
Copy link
Contributor

njr-11 commented Jun 27, 2019

Discussed this issue with @nmittles on the MP Context Propagation call. Here is a summary of our thoughts:

This is a good idea. The duplicate interfaces in Reactive Streams and Flows are unfortunate. Long term direction will likely end up being the Flow interface that is built in to the JDK, and it will then be undesirable for the spec to have the extra dependency on reactive streams if we did this, especially for those who aren't using reactive streams. We should really avoid direct dependencies on non-JDK interfaces like this. Java EE Concurrency's ContextService.createContextualProxy actually solves the problem generically for any interface, but it comes at the cost of compiler warnings that must be suppressed when interfaces are parameterized (which applies in the case of Subscriber<T>) and also at the cost of having to deal with the possibility that the contextualized object might Serializable, which has important implications to captured context which MicroProfile Context Propagation currently avoids dealing with by restricting which interfaces can be contextualized. For this, let's do some investigating into whether there is a better way of writing an API method for general any-interface contextualization that can provide a better solution for parameterized interfaces. We ought to be able deal with the Serializable problem simply by documenting a restriction on it against contextualizing Serializable, for implementations to enforce.

@FroMage
Copy link
Contributor Author

FroMage commented Jun 27, 2019

Well, I disagree that it's very bad to add a dependency on a tiny lib of 4 interfaces, and the fact of the matter is that ATM nobody implements the Flow version, so if we're looking for usefulness we can skip the JDK9 version for quite some time.

@kenfinnigan
Copy link

Given other discussions at the Architecture level discussing switching to a JDK 11 base, as opposed to JDK 8, I see it as being a couple of years before we based on a JDK that includes Flow.

To that end, I don't think worrying about an "extra dependency" several years down the road should prevent this work from happening now

@njr-11
Copy link
Contributor

njr-11 commented Jun 27, 2019

I'm not suggesting that we prevent this work from being done, just that we explore a more generic solution that solves it for both Flow and ReactiveStreams plus other generic interfaces that can benefit from contextualization.

@aguibert
Copy link
Contributor

I think it is pretty safe to add a dependency on org.reactive-streams, because MP has already officially endorsed this API by building MP Reactive Streams Operators on top of it, and it's adopted very widely beyond MP as well.

I agree with @kenfinnigan and @FroMage that we shouldn't worry about Flow because we will require JDK 8 interop for a long time still.

To illustrate what I think what @njr-11 is proposing -- instead of having:

  • ThreadContext.withContextCapture(CompletableFuture<T> stage)
  • ThreadContext.withContextCapture(CompletionStage<T> stage)
  • [proposed] ThreadContext.wrapSubscriber(Subscriber<T> subscriber) (IMO we should rename to withContextCapture for consistency)

We combine these into 1 generic method such as:

  • ThreadContext.withContextCapture(Object instance, Class<?>... interfaces)

I prefer the type-safe individual methods, which IMO makes the API easier to understand/use. However, we could also add the generic method as a safety-net if any new interfaces pop up in the future in between spec releases. To avoid method signature conflicts we could adjust the signature of the generic method to:
ThreadContext.withContextCapture(Object instance, Class<?> interface, Class<?>... moreInterfaces)
This would also firm up the API a bit because it forces at least 1 interface to be supplied.

@njr-11
Copy link
Contributor

njr-11 commented Jun 27, 2019

I was thinking of something more like this, which is generic, but still properly typed (we can figure out the correct method name later),

    /**
     * <p>Proxies the an object instance with context that is captured from the thread that invokes
     * <code>contextualize</code>.</p>
     *
     * <p>When interface methods are invoked on the proxy instance,
     * context is first established on the thread that will run the method,
     * then the method of the provided instance is invoked.
     * Finally, the previous context is restored on the thread, and the result of the
     * method is returned to the invoker.</p>
     *
     * @param <T>      an interface that is implemented by the instance.
     * @param <U>      type of supplied instance.
     * @param instance instance to contextualize.
     * @return contextualized proxy for interfaces that are implemented by the supplied instance.
     * @throws IllegalArgumentException if an already-contextualized instance is supplied to this method,
     *                 or if the supplied instance does not implement any interfaces that can be proxied.
     * @throws UnsupportedOperationException if the instance implements <code>java.io.Serializable</code>
     *                 due to lack of support for serializing and deserializing captured thread context.
     */
    public <T, U extends T> T contextualize(U instance);

The above would make the following possible, without any compiler warnings, and without introducing any dependencies to the spec,

Subscriber<String> contextualSubscriber = threadContext.contextualize(mySubscriber);

If we can introduce something generic and simple that doesn't require dependencies, I don't see why we would take a different approach that does.

@FroMage
Copy link
Contributor Author

FroMage commented Jun 28, 2019

OK, thanks, I understand now. But I'm not sure if we can implement context capturing generically. See CompletionStage for example of how we can't and need a lot of specialised code. For Subscriber it would be possible, because we capture on creation and restore on any method call, but if we wanted to also provide it for Publisher to make it automatic for every subscriber we could not because we don't want to capture and restore but just forward any calls to subscribe to our Subscriber wrapper.

@njr-11
Copy link
Contributor

njr-11 commented Jun 28, 2019

I agree - the general pattern covers a core set of common usage, which is contextualizing interface methods (works well for Subscriber, and many other interfaces). There are certainly a number of special-cased patterns such as our withContextCapture(stage/future) methods which cause the resulting object to do something special with respect to context propagation (which for withContextCapture(CompletionStage) means creating context-enhanced dependent stages). The addition of a general method would not serve as a replacement for withContextCapture, and I'd suggest that we keep the distinction in naming patterns between withContextCapture vs the simpler and more common pattern so as to illustrate the difference. There is another action we can consider taking as well, which is to explicitly reject an attempt to supply a CompletionStage to the general pattern so that no one accidentally misuses it if they didn't read the JavaDoc.

njr-11 added a commit to njr-11/microprofile-context-propagation that referenced this issue Sep 24, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants