Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async collectors #7

Open
jroper opened this issue May 16, 2018 · 2 comments
Open

Async collectors #7

jroper opened this issue May 16, 2018 · 2 comments
Labels
help wanted Extra attention is needed
Milestone

Comments

@jroper
Copy link
Member

jroper commented May 16, 2018

Currently we're using the JDK8 collector API for doing all terminal accumulations. It would be good to have an async alternative, eg AsyncCollector, where the accumulator and finisher functions return CompletionStage of a value when they are done. Then async alternatives of collect, reduce etc can be provided.

@jroper jroper added help wanted Extra attention is needed streams labels May 16, 2018
@jroper jroper added this to the streams-1.0 milestone Jun 19, 2018
@jroper
Copy link
Member Author

jroper commented Jul 6, 2018

Perhaps the simplest async collector is a forEachAsync, which would allow this:

CompletionStage<Void> saveObjectToDatabase(MyObject obj) {
  ...
}

ReactiveStreams.fromPublisher(somePublisher)
  .forEachAsync(this::saveObjectToDatabase)

Currently, the only way to do this is:

ReactiveStreams.fromPublisher(somePublisher)
  .flatMapCompletionStage(this::saveObjectToDatabase)
  .ignore()

The ignore step is counter intuitive here, you're not really ignoring anything, it's just that the only way to do an asynchronous action on each element (with backpressure) currently is to do a flatMapCompletionStage, and then since your database operation returns Void, you don't care about the result, so you ignore it.

The biggest use case though would be if you wanted to do some asynchronous operation, and the next operation depended on the result of the previous. Though, we could do that with an asynchronous version of a fold processing stage in #66, so maybe that's the best place to handle it.

@jroper jroper removed the streams label Aug 10, 2018
@jroper jroper modified the milestones: 1.0, 1.1 Sep 19, 2018
@Emily-Jiang Emily-Jiang modified the milestones: 1.1, 2.0 Jun 24, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed
Projects
None yet
Development

No branches or pull requests

3 participants