Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A PR for reactive streams support #151

Merged
merged 41 commits into from
May 27, 2024
Merged

A PR for reactive streams support #151

merged 41 commits into from
May 27, 2024

Conversation

bbakerman
Copy link
Member

@bbakerman bbakerman commented May 17, 2024

This PR is a long running branch with work to allow "reactive" publishers to complete works progressively as results arrive.

A normal BatchLoader gathers ALL the value futures (given a set of keys) and completes them in one go.

The use of reactive Publisher / Subscribers means that keys can complete progressively as each result arrives.

This may mean that processing will happen quicker depending on whether further sub processing occurs

AlexandreCarlton and others added 2 commits May 12, 2024 17:56
**Note**: This commit, as-is, is not (yet) intended for merge. It is
created to provide a proof-of-concept and gauge interest as
polishing/testing this requires a non-trivial amount of effort.

Motivation
==========

The current DataLoader mechanism completes the corresponding
`CompletableFuture` for a given key when the corresponding value is
returned. However, DataLoader's `BatchLoader` assumes that the
underlying batch function can only return all of its requested items at
once (as an example, a SQL database query).

However, the batch function may be a service that can return items
progressively using a subscription-like architecture. Some examples
include:

 - Project Reactor's [Subscriber](https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html).
 - gRPC's [StreamObserver](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html).
 - RX Java's [Flowable](https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html).

Streaming results in this fashion offers several advantages:

 - Certain values may be returned earlier than others (for example, the
   batch function may have cached values it can return early).
 - Memory load is lessened on the batch function (which may be an
   external service), as it does not need to keep hold of the retrieved
   values before it can send them out at once.
 - We are able to save the need to stream individual error values by
   providing an `onError` function to terminate the stream early.

Proposal
========

We provide two new `BatchLoader`s and support for them in
`java-dataloader`:

 - `ObserverBatchLoader`, with a load function that accepts:
   - a list of keys.
   - a `BatchObserver` intended as a delegate for publisher-like
     structures found in Project Reactor and Rx Java. This obviates the
     need to depend on external libraries.
 - `MappedObserverBatchLoader`, similar to `ObserverBatchLoader` but
   with an `onNext` that accepts a key _and_ value (to allow for early
   termination of streams without needing to process `null`s).
 - `*WithContext` variants for the above.

The key value-add is that the implementation of `BatchObserver`
(provided to the load functions) will immediately complete the queued
future for a given key when `onNext` is called with a value. This means
that if we have a batch function that can deliver values progressively,
we can continue evaluating the query as the values arrive. As an
arbitrary example, let's have a batch function that serves both the
reporter and project fields on a Jira issue:

```graphql
query {
  issue {
    project {
      issueTypes { ... }
    }
    reporter { ... }
  }
}
```

If the batch function can return a `project` immediately but is delayed
in when it can `reporter`, then our batch loader can return `project`
and start evaluating the `issueTypes` immediately while we load the
`reporter` in parallel. This would provide a more performant query
evaluation.

As mentioned above, this is not in a state to be merged - this is
intended to gauge whether this is something the maintainers would be
interested in owning. Should this be the case, the author is willing to
test/polish this pull request so that it may be merged.
@bbakerman bbakerman changed the title A PR for reactive streams support DO NOT MERGE - YET - A PR for reactive streams support May 17, 2024
@dondonz dondonz added this to the Next release 3.4.0 milestone May 17, 2024
AlexandreCarlton and others added 24 commits May 18, 2024 12:22
…er-proof-of-concept

* origin/master:
  Bump to Java 11
`reactive-streams` has become the de-facto standard for reactive
frameworks; we thus use this as a base to allow seamless interop (rather
than prompt an extra adapter layer).
This gives us more workable exceptions.
Passing an exception into `onNext` is not typically done in
reactive-land - we would instead call `onError(Throwable)`. We can thus
avoid handling this case.
This is keeping in line with the other methods found in
`DataLoaderFactory`.
Given the large number of existing tests, we copy across this existing
set for our publisher tests.

What this really indicates is that we should invest in parameterised
testing, but this is a bit painful in JUnit 4 - so we'll bump to JUnit 5
independently and parameterise when we have this available.

This is important because re-using the existing test suite reveals a
failure that we'll need to address.
This keeps in line with the original suggestion (because yours truly
couldn't read, apparently).

We also purge any remaining mention of 'observer', which was the first
swing at this code.
Multiple threads may call `onNext` - we thus (lazily) chuck a
`synchronized` to ensure correctness at the cost of speed.

In future, we should examine how we should manage this concurrency
better.
…roof-of-concept

Add a proof-of-concept for "Observer-like" batch loading
We now have the same coverage but with less code. Note that:

 - this is currently failing on 'duplicate keys when caching disabled'.
 - we still need to add tests that only make sense for the Publisher
   variants (e.g. half-completed keys).
If we did not cache the futures, then the MappedBatchPublisher
DataLoader would not work as we were only completing the last future for
a given key.
…active-streams-common-publisher-impl

# Conflicts:
#	src/main/java/org/dataloader/DataLoaderHelper.java
#	src/test/java/org/dataloader/DataLoaderBatchPublisherTest.java
#	src/test/java/org/dataloader/DataLoaderMappedBatchPublisherTest.java
…lisher-impl

Making the Subscribers use a common base class
bbakerman and others added 13 commits May 23, 2024 10:32
…ra-tests-for-reactive

More tests for Publishers on reactive branch
…e-reactive-classes-out-of-dataloader-helper

Reactive streams branch move reactive classes out of dataloader helper
This is more symmetric with `MappedbatchLoader` and preserves
efficiency; we do not need to emit a `Map.Entry` for duplicate keys
(given the strong intention that this will be used to create a `Map`).
…ublishers

Have MappedBatchPublisher take in a Set<K> keys (and add README sections)
@bbakerman bbakerman changed the title DO NOT MERGE - YET - A PR for reactive streams support A PR for reactive streams support May 27, 2024
@bbakerman bbakerman merged commit d44070a into master May 27, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants