diff --git a/README.md b/README.md index 9ffe265..fb38e69 100644 --- a/README.md +++ b/README.md @@ -288,7 +288,7 @@ For example, let's assume you want to load users from a database, you could prob ### Returning a stream of results from your batch publisher -It may be that your batch loader function is a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream. +It may be that your batch loader function can use a [Reactive Streams](https://www.reactive-streams.org/) [Publisher](https://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html), where values are emitted as an asynchronous stream. For example, let's say you wanted to load many users from a service without forcing the service to load all users into its memory (which may exert considerable pressure on it). @@ -299,7 +299,8 @@ A `org.dataloader.BatchPublisher` may be used to load this data: BatchPublisher batchPublisher = new BatchPublisher() { @Override public void load(List userIds, Subscriber userSubscriber) { - userManager.publishUsersById(userIds, userSubscriber); + Publisher userResults = userManager.streamUsersById(userIds); + userResults.subscribe(userSubscriber); } }; DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); @@ -307,21 +308,28 @@ A `org.dataloader.BatchPublisher` may be used to load this data: // ... ``` -Rather than waiting for all values to be returned, this `DataLoader` will complete +Rather than waiting for all user values to be returned on one batch, this `DataLoader` will complete the `CompletableFuture` returned by `Dataloader#load(Long)` as each value is -processed. +published. + +This pattern means that data loader values can (in theory) be satisfied more quickly than if we wait for +all results in the batch to be retrieved and hence the overall result may finish more quickly. If an exception is thrown, the remaining futures yet to be completed are completed exceptionally. You *MUST* ensure that the values are streamed in the same order as the keys provided, with the same cardinality (i.e. the number of values must match the number of keys). + Failing to do so will result in incorrect data being returned from `DataLoader#load`. +`BatchPublisher` is the reactive version of `BatchLoader`. + ### Returning a mapped stream of results from your batch publisher -Your publisher may not necessarily return values in the same order in which it processes keys. +Your publisher may not necessarily return values in the same order in which it processes keys and it +may not be able to find a value for each key presented. For example, let's say your batch publisher function loads user data which is spread across shards, with some shards responding more quickly than others. @@ -332,7 +340,8 @@ In instances like these, `org.dataloader.MappedBatchPublisher` can be used. MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { @Override public void load(Set userIds, Subscriber> userEntrySubscriber) { - userManager.publishUsersById(userIds, userEntrySubscriber); + Publisher> userEntries = userManager.streamUsersById(userIds); + userEntries.subscribe(userEntrySubscriber); } }; DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); @@ -346,6 +355,8 @@ exceptionally. Unlike the `BatchPublisher`, however, it is not necessary to return values in the same order as the provided keys, or even the same number of values. +`MappedBatchPublisher` is the reactive version of `MappedBatchLoader`. + ### Error object is not a thing in a type safe Java world In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index a20c0ea..9e30c90 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -17,6 +17,7 @@ import org.dataloader.scheduler.BatchLoaderScheduler; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import java.time.Duration; @@ -194,7 +195,8 @@ private void batchPublisher() { BatchPublisher batchPublisher = new BatchPublisher() { @Override public void load(List userIds, Subscriber userSubscriber) { - userManager.publishUsersById(userIds, userSubscriber); + Publisher userResults = userManager.streamUsersById(userIds); + userResults.subscribe(userSubscriber); } }; DataLoader userLoader = DataLoaderFactory.newPublisherDataLoader(batchPublisher); @@ -204,7 +206,8 @@ private void mappedBatchPublisher() { MappedBatchPublisher mappedBatchPublisher = new MappedBatchPublisher() { @Override public void load(Set userIds, Subscriber> userEntrySubscriber) { - userManager.publishUsersById(userIds, userEntrySubscriber); + Publisher> userEntries = userManager.streamUsersById(userIds); + userEntries.subscribe(userEntrySubscriber); } }; DataLoader userLoader = DataLoaderFactory.newMappedPublisherDataLoader(mappedBatchPublisher); diff --git a/src/test/java/org/dataloader/fixtures/UserManager.java b/src/test/java/org/dataloader/fixtures/UserManager.java index 4fed3f7..1d2ff1f 100644 --- a/src/test/java/org/dataloader/fixtures/UserManager.java +++ b/src/test/java/org/dataloader/fixtures/UserManager.java @@ -1,6 +1,6 @@ package org.dataloader.fixtures; -import org.reactivestreams.Subscriber; +import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; import java.util.HashMap; @@ -55,12 +55,12 @@ public List loadUsersById(List userIds) { return userIds.stream().map(this::loadUserById).collect(Collectors.toList()); } - public void publishUsersById(List userIds, Subscriber userSubscriber) { - Flux.fromIterable(loadUsersById(userIds)).subscribe(userSubscriber); + public Publisher streamUsersById(List userIds) { + return Flux.fromIterable(loadUsersById(userIds)); } - public void publishUsersById(Set userIds, Subscriber> userEntrySubscriber) { - Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()).subscribe(userEntrySubscriber); + public Publisher> streamUsersById(Set userIds) { + return Flux.fromIterable(loadMapOfUsersByIds(null, userIds).entrySet()); } public Map loadMapOfUsersByIds(SecurityCtx callCtx, Set userIds) {