diff --git a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java index 1a522b4..c550c11 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java @@ -24,6 +24,7 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.Semaphore; @SuperBuilder @ToString @@ -135,21 +136,25 @@ public Publisher evaluate(ConditionContext conditionContext, TriggerC } public Flux publisher(final Consume task, - final RunContext runContext) throws Exception { - var renderedQueueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); - - return Flux.create( - fluxSink -> { - try (SqsAsyncClient sqsClient = task.asyncClient(runContext, runContext.render(clientRetryMaxAttempts).as(Integer.class).orElseThrow())) { - while (isActive.get()) { - ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() - .queueUrl(renderedQueueUrl) - .waitTimeSeconds((int) runContext.render(waitTime).as(Duration.class).orElseThrow().toSeconds()) - .maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow()) - .build(); - - sqsClient.receiveMessage(receiveRequest) - .whenComplete((messageResponse, throwable) -> { + final RunContext runContext) throws Exception { + var renderedQueueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow(); + final Semaphore semaphore = new Semaphore(10); // Limit concurrent requests + + return Flux.create( + fluxSink -> { + try (SqsAsyncClient sqsClient = task.asyncClient(runContext, runContext.render(clientRetryMaxAttempts).as(Integer.class).orElseThrow())) { + while (isActive.get()) { + semaphore.acquire(); + + ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder() + .queueUrl(renderedQueueUrl) + .waitTimeSeconds((int) runContext.render(waitTime).as(Duration.class).orElseThrow().toSeconds()) + .maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow()) + .build(); + + sqsClient.receiveMessage(receiveRequest) + .whenComplete((messageResponse, throwable) -> { + try { if (throwable != null) { fluxSink.error(throwable); } else { @@ -164,21 +169,23 @@ public Flux publisher(final Consume task, ) ); } - }); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - isActive.set(false); // proactively stop polling - } - } - } catch (Throwable e) { - fluxSink.error(e); - } finally { - fluxSink.complete(); - this.waitForTermination.countDown(); + } finally { + semaphore.release(); // Release the semaphore permit + } + }); + + Thread.sleep(100); } - }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + isActive.set(false); // proactively stop polling + } catch (Throwable e) { + fluxSink.error(e); + } finally { + fluxSink.complete(); + this.waitForTermination.countDown(); + } + }); } /**