Skip to content

Commit

Permalink
Merge pull request #55 from agorapulse/fix/properly-handle-parallel-e…
Browse files Browse the repository at this point in the history
…xecution

properly handle parallel execution
  • Loading branch information
musketyr authored Dec 11, 2024
2 parents d183f2b + 82c12ea commit 7cc66c5
Showing 1 changed file with 16 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ParallelFlux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

Expand Down Expand Up @@ -74,13 +75,14 @@ public void onApplicationEvent(RefreshEvent event) {

public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
ExecutableMethod<B, ?> method = job.getMethod();
boolean producer = !method.getReturnType().isVoid();
JobConfiguration configuration = job.getConfiguration();

if (method.getArguments().length == 0) {
context.message(null);
handleResult(configuration, context, executor(context, configuration).apply(() -> {
if (configuration.getFork() > 1) {
return Flux.range(0, configuration.getFork())
ParallelFlux<Object> resultsOfParallelExecution = Flux.range(0, configuration.getFork())
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(i -> {
Expand All @@ -102,6 +104,12 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
return Mono.empty();
}
});

if (producer) {
return resultsOfParallelExecution.sequential(configuration.getFork());
}

return resultsOfParallelExecution.then();
}

return method.invoke(bean);
Expand Down Expand Up @@ -145,10 +153,16 @@ public <B> void invoke(MethodJob<B, ?> job, B bean, JobRunContext context) {
};

if (configuration.getFork() > 1) {
return messages
ParallelFlux<Object> parallelFlux = messages
.parallel(configuration.getFork())
.runOn(getScheduler(job))
.flatMap(messageProcessor);

if (producer) {
return parallelFlux.sequential(configuration.getFork());
}

return parallelFlux.then();
}

return messages.flatMap(messageProcessor);
Expand Down

0 comments on commit 7cc66c5

Please sign in to comment.