From 7f5e5e4bdd6c9dbbeeec37774b1cef9f44fc03d6 Mon Sep 17 00:00:00 2001 From: musketyr Date: Tue, 26 Nov 2024 15:35:35 +0100 Subject: [PATCH] improved pipe events --- .../worker/queues/redis/RedisQueues.java | 17 ++++++---- .../micronaut-worker-queues-sqs-v1.gradle | 1 + .../agorapulse/worker/sqs/v1/SqsQueues.java | 29 +++++++++-------- .../worker/sqs/v1/SqsQueuesUnitSpec.groovy | 29 ++++++++--------- .../agorapulse/worker/sqs/v2/SqsQueues.java | 27 ++++++++-------- .../worker/sqs/v2/SqsQueuesUnitSpec.groovy | 29 ++++++++--------- .../tck/queue/AbstractQueuesSpec.groovy | 6 ++-- .../agorapulse/worker/local/LocalQueues.java | 16 +++++++--- .../processor/DefaultMethodJobInvoker.java | 32 +++++++++---------- .../agorapulse/worker/queue/JobQueues.java | 17 +++++++++- 10 files changed, 110 insertions(+), 93 deletions(-) diff --git a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java index c7e3806d..a85e6ef3 100644 --- a/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java +++ b/libs/micronaut-worker-queues-redis/src/main/java/com/agorapulse/worker/queues/redis/RedisQueues.java @@ -29,10 +29,13 @@ import io.lettuce.core.support.BoundedPoolConfig; import io.micronaut.core.type.Argument; import io.micronaut.jackson.JacksonConfiguration; +import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; @@ -61,9 +64,9 @@ public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfi pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config); } - @Override @SuppressWarnings("unchecked") - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + @Override + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { TransactionResult result = withTransaction(redisCommands -> { String key = getKey(queueName); redisCommands.zrange(key, 0, maxNumberOfMessages - 1); @@ -71,7 +74,7 @@ public void readMessages(String queueName, int maxNumberOfMessages, Duration }); if (result == null) { - return; + return Flux.empty(); } @@ -83,14 +86,14 @@ public void readMessages(String queueName, int maxNumberOfMessages, Duration List messages = (List) firstResponse; - messages.forEach(body -> { + return Flux.fromIterable(messages).handle((body, sink) -> { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + sink.next(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); } catch (JsonProcessingException e) { if (argument.equalsType(Argument.STRING)) { - action.accept((T) body); + sink.next((T) body); } else { - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } }); diff --git a/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle b/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle index 88bbdbe9..873316a0 100644 --- a/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle +++ b/libs/micronaut-worker-queues-sqs-v1/micronaut-worker-queues-sqs-v1.gradle @@ -20,6 +20,7 @@ dependencies { api "com.agorapulse:micronaut-aws-sdk-sqs:$micronautAwsSdkVersion" implementation 'io.micronaut:micronaut-jackson-databind' + implementation 'io.micronaut.reactor:micronaut-reactor' testImplementation project(':micronaut-worker-tck') diff --git a/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java b/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java index e0d452cb..a6cffe05 100644 --- a/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java +++ b/libs/micronaut-worker-queues-sqs-v1/src/main/java/com/agorapulse/worker/sqs/v1/SqsQueues.java @@ -24,11 +24,13 @@ import com.fasterxml.jackson.databind.ObjectMapper; import io.micronaut.core.type.Argument; import io.micronaut.jackson.JacksonConfiguration; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.function.Consumer; import java.util.stream.Collectors; public class SqsQueues implements JobQueues { @@ -42,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { - simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> { - readMessageInternal(queueName, argument, action, m.getBody(), m.getReceiptHandle(), true); - }); + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { + return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m -> + readMessageInternal(queueName, argument, m.getBody(), m.getReceiptHandle(), true) + ).toList()); } @Override @@ -70,28 +72,27 @@ public void sendRawMessage(String queueName, Object result) { } } - private void readMessageInternal(String queueName, Argument argument, Consumer action, String body, String handle, boolean tryReformat) { + private Mono readMessageInternal(String queueName, Argument argument, String body, String handle, boolean tryReformat) { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + Mono result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); simpleQueueService.deleteMessage(queueName, handle); + return result; } catch (JsonProcessingException e) { if (tryReformat) { if (String.class.isAssignableFrom(argument.getType())) { - action.accept(argument.getType().cast(body)); + Mono result = Mono.just(argument.getType().cast(body)); simpleQueueService.deleteMessage(queueName, handle); - return; + return result; } if (Collection.class.isAssignableFrom(argument.getType())) { if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) { String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(",")); - readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false); } - readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + body + "]", handle, false); } } - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } } diff --git a/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy b/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy index df042d92..451fcd74 100644 --- a/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy +++ b/libs/micronaut-worker-queues-sqs-v1/src/test/groovy/com/agorapulse/worker/sqs/v1/SqsQueuesUnitSpec.groovy @@ -23,6 +23,7 @@ import com.amazonaws.services.sqs.model.AmazonSQSException import com.amazonaws.services.sqs.model.Message import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.core.type.Argument +import reactor.core.publisher.Flux import spock.lang.Shared import spock.lang.Specification @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message is deleted once read'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: values values.size() == 2 @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) + ).collectList().block() then: values values.size() == 2 @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy string messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) + ).collectList().block() then: values values.size() == 2 @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message not deleted on error'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: thrown IllegalArgumentException diff --git a/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java b/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java index 8070ead6..7c6758cb 100644 --- a/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java +++ b/libs/micronaut-worker-queues-sqs-v2/src/main/java/com/agorapulse/worker/sqs/v2/SqsQueues.java @@ -25,12 +25,12 @@ import io.micronaut.jackson.JacksonConfiguration; import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import software.amazon.awssdk.services.sqs.model.SqsException; import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.function.Consumer; import java.util.stream.Collectors; public class SqsQueues implements JobQueues { @@ -44,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { - simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> { - readMessageInternal(queueName, argument, action, m.body(), m.receiptHandle(), true); - }); + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { + return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m -> + readMessageInternal(queueName, argument, m.body(), m.receiptHandle(), true) + ).toList()); } @Override @@ -78,28 +78,27 @@ public void sendRawMessages(String queueName, Publisher result) { Flux.from(simpleQueueService.sendMessages(queueName, Flux.from(result).map(String::valueOf))).subscribe(); } - private void readMessageInternal(String queueName, Argument argument, Consumer action, String body, String handle, boolean tryReformat) { + private Mono readMessageInternal(String queueName, Argument argument, String body, String handle, boolean tryReformat) { try { - action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); + Mono result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()))); simpleQueueService.deleteMessage(queueName, handle); + return result; } catch (JsonProcessingException e) { if (tryReformat) { if (String.class.isAssignableFrom(argument.getType())) { - action.accept(argument.getType().cast(body)); + Mono result = Mono.just(argument.getType().cast(body)); simpleQueueService.deleteMessage(queueName, handle); - return; + return result; } if (Collection.class.isAssignableFrom(argument.getType())) { if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) { String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(",")); - readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false); } - readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false); - return; + return readMessageInternal(queueName, argument, "[" + body + "]", handle, false); } } - throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e); + return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e)); } } diff --git a/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy b/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy index a916ef55..8bf6a9a7 100644 --- a/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy +++ b/libs/micronaut-worker-queues-sqs-v2/src/test/groovy/com/agorapulse/worker/sqs/v2/SqsQueuesUnitSpec.groovy @@ -21,6 +21,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService import com.agorapulse.worker.queue.JobQueues import com.fasterxml.jackson.databind.ObjectMapper import io.micronaut.core.type.Argument +import reactor.core.publisher.Flux import software.amazon.awssdk.services.sqs.model.Message import software.amazon.awssdk.services.sqs.model.SqsException import spock.lang.Shared @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message is deleted once read'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: values values.size() == 2 @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) + ).collectList().block() then: values values.size() == 2 @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification { void 'can read legacy string messages'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) { - values << it - } + List> values = Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) + ).collectList().block() then: values values.size() == 2 @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification { void 'message not deleted on error'() { when: - List> values = [] - sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) { - values << it - } + Flux.from( + sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) + ).collectList().block() then: thrown IllegalArgumentException diff --git a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy index 5e83dbb8..a4fd29b8 100644 --- a/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy +++ b/libs/micronaut-worker-tck/src/main/groovy/com/agorapulse/worker/tck/queue/AbstractQueuesSpec.groovy @@ -22,6 +22,7 @@ import io.micronaut.context.ApplicationContext import io.micronaut.core.async.publisher.Publishers import io.micronaut.core.type.Argument import io.micronaut.inject.qualifiers.Qualifiers +import reactor.core.publisher.Flux import spock.lang.Specification import jakarta.inject.Inject @@ -55,14 +56,11 @@ abstract class AbstractQueuesSpec extends Specification { void 'can send raw messages to queue'() { given: JobQueues queues = context.getBean(JobQueues, Qualifiers.byName(name)) - List messages = [] when: queues.sendRawMessage('foo', 'one') queues.sendRawMessages('foo', Publishers.just('two')) and: - queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING) { message -> - messages << message - } + List messages = Flux.from(queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING)).collectList().block() then: messages == ['one', 'two'] } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java index 1103b3ed..fc68554a 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/local/LocalQueues.java @@ -22,15 +22,17 @@ import io.micronaut.context.annotation.Secondary; import io.micronaut.context.env.Environment; import io.micronaut.core.type.Argument; - import jakarta.inject.Named; import jakarta.inject.Singleton; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentMap; -import java.util.function.Consumer; @Secondary @Singleton @@ -46,15 +48,19 @@ public LocalQueues(Environment environment) { } @Override - public void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + public Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument) { ConcurrentLinkedDeque objects = queues.get(queueName); if (objects == null) { - return; + return Flux.empty(); } + List results = new ArrayList<>(); + for (int i = 0; i < maxNumberOfMessages && !objects.isEmpty(); i++) { - action.accept(environment.convertRequired(objects.removeFirst(), argument)); + results.add(environment.convertRequired(objects.removeFirst(), argument)); } + + return Flux.fromIterable(results); } @Override diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java index 66212646..d549943b 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/processor/DefaultMethodJobInvoker.java @@ -26,11 +26,10 @@ import io.micronaut.context.BeanContext; import io.micronaut.inject.ExecutableMethod; import io.micronaut.inject.qualifiers.Qualifiers; +import jakarta.inject.Singleton; import org.reactivestreams.Publisher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - -import jakarta.inject.Singleton; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -55,7 +54,6 @@ public DefaultMethodJobInvoker( this.distributedJobExecutor = distributedJobExecutor; } - @SuppressWarnings("unchecked") public void invoke(MethodJob job, B bean, JobRunContext callback) { ExecutableMethod method = job.getMethod(); JobConfiguration configuration = job.getConfiguration(); @@ -82,16 +80,18 @@ public void invoke(MethodJob job, B bean, JobRunContext callback) { handleResult(configuration, callback, executor.apply(() -> method.invoke(bean))); } else if (method.getArguments().length == 1) { JobConfiguration.ConsumerQueueConfiguration queueConfiguration = configuration.getConsumer(); - queues(queueConfiguration.getQueueType()).readMessages( - queueConfiguration.getQueueName(), - queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), - Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), - method.getArguments()[0], - message -> { - callback.message(message); - handleResult(configuration, callback, executor.apply(() -> method.invoke(bean, message))); - } - ); + Publisher results = Flux.from( + queues(queueConfiguration.getQueueType()).readMessages( + queueConfiguration.getQueueName(), + queueConfiguration.getMaxMessages() < 1 ? 1 : queueConfiguration.getMaxMessages(), + Optional.ofNullable(queueConfiguration.getWaitingTime()).orElse(Duration.ZERO), + method.getArguments()[0] + ) + ) + .doOnNext(callback::message) + .flatMap(message -> executor.apply(() -> method.invoke(bean, message))); + + handleResult(configuration, callback, results); } else { LOGGER.error("Too many arguments for " + method + "! The job method wasn't executed!"); } @@ -143,9 +143,9 @@ protected void handleResult(JobConfiguration configuration, JobRunContext callba private JobQueues queues(String qualifier) { return context.findBean( - JobQueues.class, - qualifier == null ? null : Qualifiers.byName(qualifier) - ) + JobQueues.class, + qualifier == null ? null : Qualifiers.byName(qualifier) + ) .orElseGet(() -> context.getBean(JobQueues.class)); } diff --git a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java index e9576bbe..76670684 100644 --- a/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java +++ b/libs/micronaut-worker/src/main/java/com/agorapulse/worker/queue/JobQueues.java @@ -26,7 +26,22 @@ public interface JobQueues { - void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action); + /** + * Reads messages from the queue and processes them with the given action. + * @param queueName the name of the queue + * @param maxNumberOfMessages the maximal number of messages to read + * @param waitTime the maximal time to wait for the messages + * @param argument the argument type + * @param action the action to process the message + * @param the type of the message + * @deprecated Use {@link #readMessages(String, int, Duration, Argument)} instead + */ + @Deprecated(forRemoval = true) + default void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument, Consumer action) { + Flux.from(readMessages(queueName, maxNumberOfMessages, waitTime, argument)).subscribe(action::accept); + } + + Publisher readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument argument); default void sendRawMessages(String queueName, Publisher result) { Flux.from(result).subscribe(message -> sendRawMessage(queueName, message));