From 32c42ef56bf96ddac4db4dbade2076243f286972 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Mathieu?= Date: Mon, 22 May 2023 16:20:26 +0200 Subject: [PATCH] fix(sqs): SQS Consume should delete the message after use (#156) --- src/main/java/io/kestra/plugin/aws/sqs/Consume.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Consume.java b/src/main/java/io/kestra/plugin/aws/sqs/Consume.java index 63d1a3e9..c814d7f0 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Consume.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Consume.java @@ -10,6 +10,7 @@ import io.swagger.v3.oas.annotations.media.Schema; import lombok.*; import lombok.experimental.SuperBuilder; +import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import java.io.BufferedOutputStream; @@ -69,6 +70,10 @@ public Output run(RunContext runContext) throws Exception { var msg = sqsClient.receiveMessage(receiveRequest); msg.messages().forEach(throwConsumer(m -> { FileSerde.write(outputFile, m.body()); + sqsClient.deleteMessage(DeleteMessageRequest.builder() + .queueUrl(getQueueUrl()) + .receiptHandle(m.receiptHandle()).build() + ); total.getAndIncrement(); }));