Skip to content

Commit

Permalink
Merge pull request #61 from agorapulse/feature/consumption-improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr authored Feb 26, 2025
2 parents f44ecd8 + 8e33ddd commit 5a9c765
Show file tree
Hide file tree
Showing 14 changed files with 463 additions and 71 deletions.
12 changes: 9 additions & 3 deletions docs/guide/src/docs/asciidoc/usage.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -126,19 +126,25 @@ arguments they have and their return value.

==== Quick Start

You can use two conventional annotations `@QueueProducer` and `@QueueConsumer` to publish and consume messages from a queue.
You can use three conventional annotations `@QueueProducer`, `@QueueListener` and `@QueueConsumer` to publish and consume messages from a queue.

`@QeueProducer` is used to publish messages to a queue, `@QueueListener` is used to poll messages from a queue and `@QueueConsumer` is used to consume messages from a queue. The difference between `@QueueListener` and `@QueueConsumer` is that the `@QueueListener` is triggered once using `initialDelay` and then polls the queue indefinitely until the application is terminated. The `@QueueConsumer` is triggered periodically using `fixedRate` and polls the queue for messages, at most `maxMessages` in a single job run.

TIP: If you are in doubts, use `@QueueListener` to consume the messages from the queue as fast as possible. If you need more fine-grained control, use `@QueueConsumer`.

[source,java]
.Queue Producer and Consumer
.Queue Quick Start
----
include::{root-dir}/libs/micronaut-worker/src/test/groovy/com/agorapulse/worker/convention/QueueListenerAndProducerSpec.groovy[tag=quickstart,indent=0]
----
<1> Two jobs will communicate to each other using the given record object
<2> The producer jobs must have some scheduled trigger associated with them, for example cron or fixed rate
<3> Use the `@QueueProducer` annotation with the name of the queue to publish messages
<4> The producer job must return some value, ideally a `Publisher` of given messages
<5> Use the `@QueueConsumer` annotation with the name of the queue to consume messages
<5> Use the `@QueueListener` annotation with the name of the queue to poll the messages.
<6> The consumer job must have a single parameter of the same type as the producer job returns
<7> Use the `@QueueConsumer` annotation with the name of the queue to consume messages
<8> The consumer job must have a single parameter of the same type as the producer job returns

TIP: The value for `@Fork` is the same as the number of `maxMessages` for `@QueueConsumer` annotation. The value of `waitingTime` in `@QueueConsumer` is the same as the associated `@FixedRate` value.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.agorapulse.worker.queues.redis;

import com.agorapulse.worker.convention.QueueListener;
import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queue.QueueMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -34,8 +35,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -65,52 +68,39 @@ public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfi
pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config);
}

@SuppressWarnings("unchecked")
@Override
public <T> Publisher<QueueMessage<T>> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
TransactionResult result = withTransaction(redisCommands -> {
String key = getKey(queueName);
redisCommands.zrange(key, 0, maxNumberOfMessages - 1L);
redisCommands.zremrangebyrank(key, 0, maxNumberOfMessages - 1L);
});

if (result == null) {
return Flux.empty();
}


Object firstResponse = result.get(0);

if (!(firstResponse instanceof List)) {
throw new IllegalStateException("There result is not a list of Strings. Got: " + firstResponse);
}

List<String> messages = (List<String>) firstResponse;
return Flux.<List<String>, Integer>generate(() -> maxNumberOfMessages, (state, sink) -> {
List<String> messages = readMessages(queueName, state);
if (messages.isEmpty() && !QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
sink.complete();
return 0;
}

return Flux.fromIterable(messages).handle((body, sink) -> {
try {
T message = objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()));
QueueMessage<T> queueMessage = QueueMessage.alwaysRequeue(
UUID.randomUUID().toString(),
message,
() -> {},
() -> sendRawMessage(queueName, body)
);
sink.next(queueMessage);
} catch (JsonProcessingException e) {
if (argument.equalsType(Argument.STRING)) {
QueueMessage<T> queueMessage = QueueMessage.alwaysRequeue(
UUID.randomUUID().toString(),
(T) body,
() -> {},
() -> sendRawMessage(queueName, body)
);
sink.next(queueMessage);
} else {
sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
if (state > 0) {
while (QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime) && messages.isEmpty()) {
try {
Thread.sleep(100);
messages = readMessages(queueName, state);
} catch (InterruptedException e) {
sink.error(e);
return 0;
}
}
sink.next(messages);
}
});

int nextRemainingMessages = state - messages.size();
if (nextRemainingMessages <= 0 && !QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
sink.complete();
return 0;
}

return nextRemainingMessages;
})
.flatMap(Flux::fromIterable)
.flatMap(body -> readMessageInternalWithFallback(queueName, argument, body))
.take(maxNumberOfMessages);
}

@Override
Expand All @@ -137,6 +127,54 @@ public void sendRawMessage(String queueName, Object result) {
});
}

@SuppressWarnings("unchecked")
private List<String> readMessages(String queueName, int maxNumberOfMessages) {
TransactionResult result = withTransaction(redisCommands -> {
String key = getKey(queueName);
redisCommands.zrange(key, 0, maxNumberOfMessages - 1L);
redisCommands.zremrangebyrank(key, 0, maxNumberOfMessages - 1L);
});

if (result == null) {
return Collections.emptyList();
}


Object firstResponse = result.get(0);

if (!(firstResponse instanceof List)) {
throw new IllegalStateException("There result is not a list of Strings. Got: " + firstResponse);
}

return (List<String>) firstResponse;
}

private <T> Mono<QueueMessage<T>> readMessageInternalWithFallback(String queueName, Argument<T> argument, String body) {
try {
return Mono.just(readMessageInternal(queueName, argument, body));
} catch (JsonProcessingException e) {
if (argument.equalsType(Argument.STRING)) {
return Mono.just(QueueMessage.alwaysRequeue(
UUID.randomUUID().toString(),
(T) body,
() -> {},
() -> sendRawMessage(queueName, body)
));
}
return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}

private <T> QueueMessage<T> readMessageInternal(String queueName, Argument<T> argument, String body) throws JsonProcessingException {
T message = objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory()));
return QueueMessage.alwaysRequeue(
UUID.randomUUID().toString(),
message,
() -> {},
() -> sendRawMessage(queueName, body)
);
}

private String getKey(String queueName) {
return PREFIX_DATED_QUEUE + queueName;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,13 @@ class RedisQueuesSpec extends AbstractQueuesSpec implements TestPropertyProvider
@Override
Map<String, String> getProperties() {
return [
'redis.uri' : "redis://$redis.host:${redis.getMappedPort(6379)}",
'worker.jobs.send-words-job-listen.enabled': 'true',
'worker.jobs.send-words-job-hello.enabled' : 'true',
'redis.uri' : "redis://$redis.host:${redis.getMappedPort(6379)}",
'worker.jobs.send-words-job-listen.enabled' : 'true',
'worker.jobs.send-words-job-hello.enabled' : 'true',
'worker.jobs.non-blocking-job-numbers.enabled' : 'true',
'worker.jobs.non-blocking-job-consume.enabled' : 'true',
'worker.jobs.non-blocking-job-more-numbers.enabled': 'true',
'worker.jobs.non-blocking-job-consume-ones.enabled': 'true',
]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,31 @@
package com.agorapulse.worker.sqs.v1;

import com.agorapulse.micronaut.aws.sqs.SimpleQueueService;
import com.agorapulse.worker.convention.QueueListener;
import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queue.QueueMessage;
import com.amazonaws.services.sqs.model.AmazonSQSException;
import com.amazonaws.services.sqs.model.Message;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {
private static final int MAX_MESSAGES_TO_POLL = 10;
private static final int MIN_PARALLELISM = 4;
private static final int MAX_WAITING_TIME = 20;

private final SimpleQueueService simpleQueueService;
private final ObjectMapper objectMapper;
Expand All @@ -46,16 +54,50 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe

@Override
public <T> Publisher<QueueMessage<T>> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.getBody(), true).map(
message -> QueueMessage.requeueIfDeleted(
m.getMessageId(),
message,
() -> simpleQueueService.deleteMessage(queueName, m.getReceiptHandle()),
() -> simpleQueueService.sendMessage(queueName, m.getBody())
Instant pollStart = Instant.now();
return Flux.<List<Message>, Integer>generate(() -> maxNumberOfMessages, (remainingNumberOfMessages, sink) -> {
try {
List<Message> messages = simpleQueueService.receiveMessages(
queueName,
Math.min(MAX_MESSAGES_TO_POLL, remainingNumberOfMessages),
0,
Math.min(MAX_WAITING_TIME, Math.toIntExact(waitTime.getSeconds())));

if (messages.isEmpty() && waitTime.getSeconds() <= MAX_WAITING_TIME && !QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
sink.complete();
return 0;
}

sink.next(messages);

if (QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
return maxNumberOfMessages;
}

int nextRemaining = remainingNumberOfMessages - messages.size();

if (nextRemaining <= 0 || pollStart.plus(waitTime).isBefore(Instant.now())) {
sink.complete();
}

return nextRemaining;
} catch (Throwable th) {
sink.error(th);
return 0;
}
})
.flatMap(Flux::fromIterable)
.flatMap(m ->
readMessageInternal(queueName, argument, m.getBody(), true).map(
message -> QueueMessage.requeueIfDeleted(
m.getMessageId(),
message,
() -> simpleQueueService.deleteMessage(queueName, m.getReceiptHandle()),
() -> simpleQueueService.sendMessage(queueName, m.getBody())
)
)
)
).toList());
.parallel(Math.max(Schedulers.DEFAULT_POOL_SIZE, MIN_PARALLELISM));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ import io.micronaut.test.extensions.spock.annotation.MicronautTest
@Property(name = 'aws.sqs.auto-create-queue', value = 'true')
@Property(name = 'worker.jobs.send-words-job-listen.enabled', value = 'true')
@Property(name = 'worker.jobs.send-words-job-hello.enabled', value = 'true')
@Property(name = 'worker.jobs.non-blocking-job-numbers.enabled', value = 'true')
@Property(name = 'worker.jobs.non-blocking-job-consume.enabled', value = 'true')
@Property(name = 'worker.jobs.non-blocking-job-more-numbers.enabled', value = 'true')
@Property(name = 'worker.jobs.non-blocking-job-consume-ones.enabled', value = 'true')
@Property(name = 'non-blocking-job.delay', value = '500')
class SqsQueuesSpec extends AbstractQueuesSpec {

@SuppressWarnings('GetterMethodCouldBeProperty')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.agorapulse.worker.sqs.v2;

import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService;
import com.agorapulse.worker.convention.QueueListener;
import com.agorapulse.worker.queue.JobQueues;
import com.agorapulse.worker.queue.QueueMessage;
import com.fasterxml.jackson.core.JsonProcessingException;
Expand All @@ -27,15 +28,23 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {

private static final int MAX_MESSAGES_TO_POLL = 10;
private static final int MIN_PARALLELISM = 4;
private static final int MAX_WAITING_TIME = 20;

private final SimpleQueueService simpleQueueService;
private final ObjectMapper objectMapper;

Expand All @@ -46,16 +55,50 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe

@Override
public <T> Publisher<QueueMessage<T>> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.body(), true).map(
message -> QueueMessage.requeueIfDeleted(
m.messageId(),
message,
() -> simpleQueueService.deleteMessage(queueName, m.receiptHandle()),
() -> simpleQueueService.sendMessage(queueName, m.body())
Instant pollStart = Instant.now();
return Flux.<List<Message>, Integer>generate(() -> maxNumberOfMessages, (remainingNumberOfMessages, sink) -> {
try {
List<Message> messages = simpleQueueService.receiveMessages(
queueName,
Math.min(MAX_MESSAGES_TO_POLL, remainingNumberOfMessages),
0,
Math.min(MAX_WAITING_TIME, Math.toIntExact(waitTime.getSeconds())));

if (messages.isEmpty() && waitTime.getSeconds() <= MAX_WAITING_TIME && !QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
sink.complete();
return 0;
}

sink.next(messages);

if (QueueListener.Utils.isInfinitePoll(maxNumberOfMessages, waitTime)) {
return maxNumberOfMessages;
}

int nextRemaining = remainingNumberOfMessages - messages.size();

if (nextRemaining <= 0 || pollStart.plus(waitTime).isBefore(Instant.now())) {
sink.complete();
}

return nextRemaining;
} catch (Throwable th) {
sink.error(th);
return 0;
}
})
.flatMap(Flux::fromIterable)
.flatMap(m ->
readMessageInternal(queueName, argument, m.body(), true).map(
message -> QueueMessage.requeueIfDeleted(
m.messageId(),
message,
() -> simpleQueueService.deleteMessage(queueName, m.receiptHandle()),
() -> simpleQueueService.sendMessage(queueName, m.body())
)
)
)
).toList());
.parallel(Math.max(Schedulers.DEFAULT_POOL_SIZE, MIN_PARALLELISM));
}

@Override
Expand Down
Loading

0 comments on commit 5a9c765

Please sign in to comment.