Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQS Listener EventBridgeMessage annotation and converter #1307

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/src/main/asciidoc/sqs.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -639,6 +639,29 @@ public void listen(@SnsNotificationMessage List<Pojo> pojos) {
}
----

==== EventBridge Messages

Since 3.3.0, similar to the @SnsNotificationMessage, there is also a @EventBridgeMessage annotation that can be used to receive EventBridge messages. To only receive the `detail` part of the payload, you can utilize the `@EventBridgeMessage` annotation.
As with @SnsNotificationMessage, this supports both individual messages, or lists.

[source, java]
----
@SqsListener("my-queue")
public void listen(@EventBridgeMessage Pojo pojo) {
System.out.println(pojo.field);
}
----

For batch message processing, use the @EventBridgeMessage annotation with a List<Pojo> parameter.

[source, java]
----
@SqsListener("my-queue")
public void listen(@EventBridgeMessage List<Pojo> pojos) {
System.out.println(pojos.size());
}
----

===== Specifying a MessageListenerContainerFactory
A `MessageListenerContainerFactory` can be specified through the `factory` property.
Such factory will then be used to create the container for the annotated method.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2013-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.awspring.cloud.sqs.annotation;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/**
* Annotation that is used to map EventBridge messages value on an SQS Queue to a variable that is annotated. Used in
* controllers method for handling/receiving SQS notifications.
*
* @author Fredrik Jonsén
* @since 3.3.0
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.PARAMETER)
public @interface EventBridgeMessage {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.awspring.cloud.sqs.config.SqsEndpoint;
import io.awspring.cloud.sqs.listener.SqsHeaders;
import io.awspring.cloud.sqs.support.resolver.BatchVisibilityHandlerMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.EventBridgeMessageArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.NotificationMessageArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.QueueAttributesMethodArgumentResolver;
import io.awspring.cloud.sqs.support.resolver.SqsMessageMethodArgumentResolver;
Expand Down Expand Up @@ -84,6 +85,7 @@ protected Collection<HandlerMethodArgumentResolver> createAdditionalArgumentReso
List<HandlerMethodArgumentResolver> argumentResolvers = new ArrayList<>(createAdditionalArgumentResolvers());
if (objectMapper != null) {
argumentResolvers.add(new NotificationMessageArgumentResolver(messageConverter, objectMapper));
argumentResolvers.add(new EventBridgeMessageArgumentResolver(messageConverter, objectMapper));
}
return argumentResolvers;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package io.awspring.cloud.sqs.support.converter;


import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.lang.Nullable;
import org.springframework.messaging.converter.MessageConversionException;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;

public class EventBridgeMessageConverter extends WrappedMessageConverter {
private static final String WRITING_CONVERSION_ERROR = "This converter only supports reading an EventBridge message and not writing them";

public EventBridgeMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonMapper) {
super(payloadConverter, jsonMapper);
}

@Override
protected Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
JsonNode jsonNode;
try {
jsonNode = this.jsonMapper.readTree(message.getPayload().toString());
}
catch (Exception e) {
throw new MessageConversionException("Could not read JSON", e);
}

if (!jsonNode.has("detail")) {
throw new MessageConversionException(
"Payload: '" + message.getPayload() + "' does not contain a detail attribute", null);
}

// Unlike SNS, where the message is a nested JSON string, the message payload in EventBridge is a JSON object
JsonNode messagePayload = jsonNode.get("detail");
GenericMessage<JsonNode> genericMessage = new GenericMessage<>(messagePayload);
if (payloadConverter instanceof SmartMessageConverter payloadConverter) {
return payloadConverter.fromMessage(genericMessage, targetClass, conversionHint);
} else {
return payloadConverter.fromMessage(genericMessage, targetClass);
}
}

@Override
protected String getWritingConversionErrorMessage() {
return WRITING_CONVERSION_ERROR;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,48 +38,20 @@
* @author Wei Jiang
* @since 3.1.1
*/
public class SnsMessageConverter implements SmartMessageConverter {

private final ObjectMapper jsonMapper;

private final MessageConverter payloadConverter;
public class SnsMessageConverter extends WrappedMessageConverter {
private static final String WRITING_CONVERSION_ERROR = "This converter only supports reading a SNS notification and not writing them";

public SnsMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonMapper) {
Assert.notNull(payloadConverter, "payloadConverter must not be null");
Assert.notNull(jsonMapper, "jsonMapper must not be null");
this.payloadConverter = payloadConverter;
this.jsonMapper = jsonMapper;
super(payloadConverter, jsonMapper);
}

@Override
@SuppressWarnings("unchecked")
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Assert.notNull(message, "message must not be null");
Assert.notNull(targetClass, "target class must not be null");

Object payload = message.getPayload();

if (payload instanceof List messages) {
return fromGenericMessages(messages, targetClass, conversionHint);
}
else {
return fromGenericMessage((GenericMessage<?>) message, targetClass, conversionHint);
}
protected String getWritingConversionErrorMessage() {
return WRITING_CONVERSION_ERROR;
}

private Object fromGenericMessages(List<GenericMessage<?>> messages, Class<?> targetClass,
@Nullable Object conversionHint) {
Type resolvedType = getResolvedType(targetClass, conversionHint);
Class<?> resolvedClazz = ResolvableType.forType(resolvedType).resolve();

Object hint = targetClass.isAssignableFrom(List.class) && conversionHint instanceof MethodParameter mp
? mp.nested()
: conversionHint;

return messages.stream().map(message -> fromGenericMessage(message, resolvedClazz, hint)).toList();
}

private Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass,
@Override
protected Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass,
@Nullable Object conversionHint) {
JsonNode jsonNode;
try {
Expand Down Expand Up @@ -111,41 +83,4 @@ private Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClas
: this.payloadConverter.fromMessage(genericMessage, targetClass);
return convertedMessage;
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return fromMessage(message, targetClass, null);
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
throw new UnsupportedOperationException(
"This converter only supports reading a SNS notification and not writing them");
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
throw new UnsupportedOperationException(
"This converter only supports reading a SNS notification and not writing them");
}

private static Type getResolvedType(Class<?> targetClass, @Nullable Object conversionHint) {
if (conversionHint instanceof MethodParameter param) {
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Type genericParameterType = param.getNestedGenericParameterType();
Class<?> contextClass = param.getContainingClass();
Type resolveType = GenericTypeResolver.resolveType(genericParameterType, contextClass);
if (resolveType instanceof ParameterizedType parameterizedType) {
return parameterizedType.getActualTypeArguments()[0];
}
else {
return resolveType;
}
}
return targetClass;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.awspring.cloud.sqs.support.converter;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.List;
import org.springframework.core.GenericTypeResolver;
import org.springframework.core.MethodParameter;
import org.springframework.core.ResolvableType;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.util.Assert;

/**
* @author Fredrik Jonsen
* @since 3.3.0
*/
abstract class WrappedMessageConverter implements SmartMessageConverter {
protected final MessageConverter payloadConverter;
protected final ObjectMapper jsonMapper;

protected WrappedMessageConverter(MessageConverter payloadConverter, ObjectMapper jsonMapper) {
Assert.notNull(payloadConverter, "payloadConverter must not be null");
Assert.notNull(jsonMapper, "jsonMapper must not be null");
this.jsonMapper = jsonMapper;
this.payloadConverter = payloadConverter;
}

protected abstract String getWritingConversionErrorMessage();
protected abstract Object fromGenericMessage(GenericMessage<?> message, Class<?> targetClass, @Nullable Object conversionHint);

@Override
@SuppressWarnings("unchecked")
public Object fromMessage(Message<?> message, Class<?> targetClass, @Nullable Object conversionHint) {
Assert.notNull(message, "message must not be null");
Assert.notNull(targetClass, "target class must not be null");

Object payload = message.getPayload();

if (payload instanceof List messages) {
return fromGenericMessages(messages, targetClass, conversionHint);
}
else {
return fromGenericMessage((GenericMessage<?>) message, targetClass, conversionHint);
}
}

@Override
public Object fromMessage(Message<?> message, Class<?> targetClass) {
return fromMessage(message, targetClass, null);
}

protected Object fromGenericMessages(List<GenericMessage<?>> messages, Class<?> targetClass,
@Nullable Object conversionHint) {
Type resolvedType = getResolvedType(targetClass, conversionHint);
Class<?> resolvedClazz = ResolvableType.forType(resolvedType).resolve();

Object hint = targetClass.isAssignableFrom(List.class) && conversionHint instanceof MethodParameter mp
? mp.nested()
: conversionHint;

return messages.stream().map(message -> fromGenericMessage(message, resolvedClazz, hint)).toList();
}

protected static Type getResolvedType(Class<?> targetClass, @Nullable Object conversionHint) {
if (conversionHint instanceof MethodParameter param) {
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Type genericParameterType = param.getNestedGenericParameterType();
Class<?> contextClass = param.getContainingClass();
Type resolveType = GenericTypeResolver.resolveType(genericParameterType, contextClass);
if (resolveType instanceof ParameterizedType parameterizedType) {
return parameterizedType.getActualTypeArguments()[0];
}
else {
return resolveType;
}
}
return targetClass;
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers) {
throw new UnsupportedOperationException(getWritingConversionErrorMessage());
}

@Override
public Message<?> toMessage(Object payload, MessageHeaders headers, Object conversionHint) {
throw new UnsupportedOperationException(getWritingConversionErrorMessage());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package io.awspring.cloud.sqs.support.resolver;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.awspring.cloud.sqs.annotation.EventBridgeMessage;
import io.awspring.cloud.sqs.support.converter.EventBridgeMessageConverter;
import org.springframework.core.MethodParameter;
import org.springframework.messaging.Message;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.messaging.converter.SmartMessageConverter;
import org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver;

public class EventBridgeMessageArgumentResolver implements HandlerMethodArgumentResolver {
private final SmartMessageConverter converter;

public EventBridgeMessageArgumentResolver(MessageConverter converter, ObjectMapper jsonMapper) {
this.converter = new EventBridgeMessageConverter(converter, jsonMapper);
}

@Override
public boolean supportsParameter(MethodParameter parameter) {
return parameter.hasParameterAnnotation(EventBridgeMessage.class);
}

@Override
public Object resolveArgument(MethodParameter par, Message<?> msg) {
Class<?> parameterType = par.getParameterType();
return this.converter.fromMessage(msg, parameterType, par);
}
}
Loading
Loading