Skip to content

Commit

Permalink
improved pipe events
Browse files Browse the repository at this point in the history
  • Loading branch information
musketyr committed Nov 26, 2024
1 parent 16776c1 commit 7f5e5e4
Show file tree
Hide file tree
Showing 10 changed files with 110 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,13 @@
import io.lettuce.core.support.BoundedPoolConfig;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
Expand Down Expand Up @@ -61,17 +64,17 @@ public RedisQueues(ObjectMapper objectMapper, RedisClient client, RedisPoolConfi
pool = new BoundedAsyncPool<>(new ConnectionFactory(client), config);
}

@Override
@SuppressWarnings("unchecked")
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
@Override
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
TransactionResult result = withTransaction(redisCommands -> {
String key = getKey(queueName);
redisCommands.zrange(key, 0, maxNumberOfMessages - 1);
redisCommands.zremrangebyrank(key, 0, maxNumberOfMessages - 1);
});

if (result == null) {
return;
return Flux.empty();
}


Expand All @@ -83,14 +86,14 @@ public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration

List<String> messages = (List<String>) firstResponse;

messages.forEach(body -> {
return Flux.fromIterable(messages).handle((body, sink) -> {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
sink.next(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
} catch (JsonProcessingException e) {
if (argument.equalsType(Argument.STRING)) {
action.accept((T) body);
sink.next((T) body);
} else {
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
sink.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ dependencies {
api "com.agorapulse:micronaut-aws-sdk-sqs:$micronautAwsSdkVersion"

implementation 'io.micronaut:micronaut-jackson-databind'
implementation 'io.micronaut.reactor:micronaut-reactor'

testImplementation project(':micronaut-worker-tck')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,13 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micronaut.core.type.Argument;
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {
Expand All @@ -42,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe
}

@Override
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> {
readMessageInternal(queueName, argument, action, m.getBody(), m.getReceiptHandle(), true);
});
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.getBody(), m.getReceiptHandle(), true)
).toList());
}

@Override
Expand All @@ -70,28 +72,27 @@ public void sendRawMessage(String queueName, Object result) {
}
}

private <T> void readMessageInternal(String queueName, Argument<T> argument, Consumer<T> action, String body, String handle, boolean tryReformat) {
private <T> Mono<T> readMessageInternal(String queueName, Argument<T> argument, String body, String handle, boolean tryReformat) {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
Mono<T> result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
simpleQueueService.deleteMessage(queueName, handle);
return result;
} catch (JsonProcessingException e) {
if (tryReformat) {
if (String.class.isAssignableFrom(argument.getType())) {
action.accept(argument.getType().cast(body));
Mono<T> result = Mono.just(argument.getType().cast(body));
simpleQueueService.deleteMessage(queueName, handle);
return;
return result;
}
if (Collection.class.isAssignableFrom(argument.getType())) {
if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) {
String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(","));
readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false);
}
readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + body + "]", handle, false);
}
}
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.amazonaws.services.sqs.model.AmazonSQSException
import com.amazonaws.services.sqs.model.Message
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.core.type.Argument
import reactor.core.publisher.Flux
import spock.lang.Shared
import spock.lang.Specification

Expand All @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message is deleted once read'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
List<Map<String, String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy messages'() {
when:
List<List<Long>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) {
values << it
}
List<List<Long>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy string messages'() {
when:
List<List<String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) {
values << it
}
List<List<String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message not deleted on error'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
thrown IllegalArgumentException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@
import io.micronaut.jackson.JacksonConfiguration;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import software.amazon.awssdk.services.sqs.model.SqsException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.function.Consumer;
import java.util.stream.Collectors;

public class SqsQueues implements JobQueues {
Expand All @@ -44,10 +44,10 @@ public SqsQueues(SimpleQueueService simpleQueueService, ObjectMapper objectMappe
}

@Override
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).forEach(m -> {
readMessageInternal(queueName, argument, action, m.body(), m.receiptHandle(), true);
});
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
return Flux.merge(simpleQueueService.receiveMessages(queueName, maxNumberOfMessages, 0, Math.toIntExact(waitTime.getSeconds())).stream().map(m ->
readMessageInternal(queueName, argument, m.body(), m.receiptHandle(), true)
).toList());
}

@Override
Expand Down Expand Up @@ -78,28 +78,27 @@ public void sendRawMessages(String queueName, Publisher<?> result) {
Flux.from(simpleQueueService.sendMessages(queueName, Flux.from(result).map(String::valueOf))).subscribe();
}

private <T> void readMessageInternal(String queueName, Argument<T> argument, Consumer<T> action, String body, String handle, boolean tryReformat) {
private <T> Mono<T> readMessageInternal(String queueName, Argument<T> argument, String body, String handle, boolean tryReformat) {
try {
action.accept(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
Mono<T> result = Mono.just(objectMapper.readValue(body, JacksonConfiguration.constructType(argument, objectMapper.getTypeFactory())));
simpleQueueService.deleteMessage(queueName, handle);
return result;
} catch (JsonProcessingException e) {
if (tryReformat) {
if (String.class.isAssignableFrom(argument.getType())) {
action.accept(argument.getType().cast(body));
Mono<T> result = Mono.just(argument.getType().cast(body));
simpleQueueService.deleteMessage(queueName, handle);
return;
return result;
}
if (Collection.class.isAssignableFrom(argument.getType())) {
if (argument.getTypeParameters().length > 0 && CharSequence.class.isAssignableFrom(argument.getTypeParameters()[0].getType())) {
String quoted = Arrays.stream(body.split(",\\s*")).map(s -> "\"" + s + "\"").collect(Collectors.joining(","));
readMessageInternal(queueName, argument, action, "[" + quoted + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + quoted + "]", handle, false);
}
readMessageInternal(queueName, argument, action, "[" + body + "]", handle, false);
return;
return readMessageInternal(queueName, argument, "[" + body + "]", handle, false);
}
}
throw new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e);
return Mono.error(new IllegalArgumentException("Cannot convert to " + argument + "from message\n" + body, e));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.agorapulse.micronaut.amazon.awssdk.sqs.SimpleQueueService
import com.agorapulse.worker.queue.JobQueues
import com.fasterxml.jackson.databind.ObjectMapper
import io.micronaut.core.type.Argument
import reactor.core.publisher.Flux
import software.amazon.awssdk.services.sqs.model.Message
import software.amazon.awssdk.services.sqs.model.SqsException
import spock.lang.Shared
Expand All @@ -42,10 +43,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message is deleted once read'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
List<Map<String, String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -71,10 +71,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy messages'() {
when:
List<List<Long>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long)) {
values << it
}
List<List<Long>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(Long))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -101,10 +100,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'can read legacy string messages'() {
when:
List<List<String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String)) {
values << it
}
List<List<String>> values = Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.listOf(String))
).collectList().block()
then:
values
values.size() == 2
Expand All @@ -131,10 +129,9 @@ class SqsQueuesUnitSpec extends Specification {

void 'message not deleted on error'() {
when:
List<Map<String, String>> values = []
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String)) {
values << it
}
Flux.from(
sqsQueues.readMessages(QUEUE_NAME, MAX_MESSAGES, WAIT_TIME, Argument.mapOf(String, String))
).collectList().block()
then:
thrown IllegalArgumentException

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.core.async.publisher.Publishers
import io.micronaut.core.type.Argument
import io.micronaut.inject.qualifiers.Qualifiers
import reactor.core.publisher.Flux
import spock.lang.Specification

import jakarta.inject.Inject
Expand Down Expand Up @@ -55,14 +56,11 @@ abstract class AbstractQueuesSpec extends Specification {
void 'can send raw messages to queue'() {
given:
JobQueues queues = context.getBean(JobQueues, Qualifiers.byName(name))
List<String> messages = []
when:
queues.sendRawMessage('foo', 'one')
queues.sendRawMessages('foo', Publishers.just('two'))
and:
queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING) { message ->
messages << message
}
List<String> messages = Flux.from(queues.readMessages('foo', 2, Duration.ofSeconds(1), Argument.STRING)).collectList().block()
then:
messages == ['one', 'two']
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,17 @@
import io.micronaut.context.annotation.Secondary;
import io.micronaut.context.env.Environment;
import io.micronaut.core.type.Argument;

import jakarta.inject.Named;
import jakarta.inject.Singleton;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;

@Secondary
@Singleton
Expand All @@ -46,15 +48,19 @@ public LocalQueues(Environment environment) {
}

@Override
public <T> void readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument, Consumer<T> action) {
public <T> Publisher<T> readMessages(String queueName, int maxNumberOfMessages, Duration waitTime, Argument<T> argument) {
ConcurrentLinkedDeque<Object> objects = queues.get(queueName);
if (objects == null) {
return;
return Flux.empty();
}

List<T> results = new ArrayList<>();

for (int i = 0; i < maxNumberOfMessages && !objects.isEmpty(); i++) {
action.accept(environment.convertRequired(objects.removeFirst(), argument));
results.add(environment.convertRequired(objects.removeFirst(), argument));
}

return Flux.fromIterable(results);
}

@Override
Expand Down
Loading

0 comments on commit 7f5e5e4

Please sign in to comment.