From ce6fef564365cf8b6a8a005af6de03f995cc810a Mon Sep 17 00:00:00 2001 From: musketyr Date: Wed, 11 Dec 2024 09:02:03 +0100 Subject: [PATCH] handle publisher returned from non-producer method --- .../processor/DefaultMethodJobInvoker.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java index a452afc4..2a49709f 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java @@ -80,7 +80,7 @@ public void invoke(MethodJob job, B bean, JobRunContext context) { if (method.getArguments().length == 0) { context.message(null); - handleResult(configuration, context, executor(context, configuration).apply(() -> { + handleResult(producer, configuration, context, executor(context, configuration).apply(() -> { if (configuration.getFork() > 1) { ParallelFlux resultsOfParallelExecution = Flux.range(0, configuration.getFork()) .parallel(configuration.getFork()) @@ -115,7 +115,7 @@ public void invoke(MethodJob job, B bean, JobRunContext context) { return method.invoke(bean); })); } else if (method.getArguments().length == 1) { - handleResult(configuration, context, executor(context, configuration).apply(() -> { + handleResult(producer, configuration, context, executor(context, configuration).apply(() -> { JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer(); Flux> messages = Flux.from( queues(queueConfiguration.getQueueType()).readMessages( @@ -185,7 +185,7 @@ private Function, Publisher> executor(JobRunContext context, return s -> distributedJobExecutor.execute(context, s); } - protected void handleResult(JobConfiguration configuration, JobRunContext callback, Publisher resultPublisher) { + protected void handleResult(boolean producer, JobConfiguration configuration, JobRunContext callback, Publisher resultPublisher) { Object result = Flux.from(resultPublisher).blockFirst(); if (result == null) { @@ -193,6 +193,17 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba return; } + if (!producer && result instanceof Publisher p) { + Mono.from(p) + .doOnNext(callback::result) + .doOnError(callback::error) + .doFinally(signalType -> callback.finished()) + .block(); + + return; + } + + String queueName = configuration.getProducer().getQueueName(); JobQueues sender = queues(configuration.getProducer().getQueueType());