Skip to content

Commit

Permalink
fix(app): kestra-io#567 Semaphore in sqs while loop
Browse files Browse the repository at this point in the history
Attempt to limit the requests hitting aws.
  • Loading branch information
mmmoli committed Dec 18, 2024
1 parent 115f116 commit 4f8251a
Showing 1 changed file with 36 additions and 29 deletions.
65 changes: 36 additions & 29 deletions src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.Semaphore;

@SuperBuilder
@ToString
Expand Down Expand Up @@ -135,21 +136,25 @@ public Publisher<Execution> evaluate(ConditionContext conditionContext, TriggerC
}

public Flux<Message> publisher(final Consume task,
final RunContext runContext) throws Exception {
var renderedQueueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow();

return Flux.create(
fluxSink -> {
try (SqsAsyncClient sqsClient = task.asyncClient(runContext, runContext.render(clientRetryMaxAttempts).as(Integer.class).orElseThrow())) {
while (isActive.get()) {
ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(renderedQueueUrl)
.waitTimeSeconds((int) runContext.render(waitTime).as(Duration.class).orElseThrow().toSeconds())
.maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow())
.build();

sqsClient.receiveMessage(receiveRequest)
.whenComplete((messageResponse, throwable) -> {
final RunContext runContext) throws Exception {
var renderedQueueUrl = runContext.render(getQueueUrl()).as(String.class).orElseThrow();
final Semaphore semaphore = new Semaphore(10); // Limit concurrent requests

return Flux.create(
fluxSink -> {
try (SqsAsyncClient sqsClient = task.asyncClient(runContext, runContext.render(clientRetryMaxAttempts).as(Integer.class).orElseThrow())) {
while (isActive.get()) {
semaphore.acquire();

ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
.queueUrl(renderedQueueUrl)
.waitTimeSeconds((int) runContext.render(waitTime).as(Duration.class).orElseThrow().toSeconds())
.maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow())
.build();

sqsClient.receiveMessage(receiveRequest)
.whenComplete((messageResponse, throwable) -> {
try {
if (throwable != null) {
fluxSink.error(throwable);
} else {
Expand All @@ -164,21 +169,23 @@ public Flux<Message> publisher(final Consume task,
)
);
}
});
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
isActive.set(false); // proactively stop polling
}
}
} catch (Throwable e) {
fluxSink.error(e);
} finally {
fluxSink.complete();
this.waitForTermination.countDown();
} finally {
semaphore.release(); // Release the semaphore permit
}
});

Thread.sleep(100);
}
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
isActive.set(false); // proactively stop polling
} catch (Throwable e) {
fluxSink.error(e);
} finally {
fluxSink.complete();
this.waitForTermination.countDown();
}
});
}

/**
Expand Down

0 comments on commit 4f8251a

Please sign in to comment.