Skip to content

Commit

Permalink
fix(trigger): add example with 'NONE' action
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jan 18, 2024
1 parent 00cdfc4 commit 307a3fd
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 15 deletions.
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

public interface ActionInterface {
@Schema(
title = "The action to perform on the retrieved files."
title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering."
)
@PluginProperty(dynamic = true)
@NotNull
ActionInterface.Action getAction();

@Schema(
title = "The destination bucket and key."
title = "The destination bucket and key for `MOVE` action."
)
@PluginProperty(dynamic = true)
Copy.CopyObject getMoveTo();
Expand Down
37 changes: 36 additions & 1 deletion src/main/java/io/kestra/plugin/aws/s3/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,41 @@
" moveTo: ",
" key: archive",
}
),
@Example(
title = "Wait for a list of files on a s3 bucket and iterate through the files. Delete files manually after processing to prevent infinite triggering.",
full = true,
code = {
"id: s3-listen",
"namespace: io.kestra.tests",
"",
"tasks:",
" - id: each",
" type: io.kestra.core.tasks.flows.EachSequential",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{ taskrun.value }}\"",
" - id: delete",
" type: io.kestra.plugin.aws.s3.Delete",
" accessKeyId: \"<access-key>\"",
" secretKeyId: \"<secret-key>\"",
" region: \"eu-central-1\"",
" bucket: \"my-bucket\"",
" key: \"{{ taskrun.value }}\"",
" value: \"{{ trigger.objects | jq('.[].key') }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.aws.s3.Trigger",
" interval: \"PT5M\"",
" accessKeyId: \"<access-key>\"",
" secretKeyId: \"<secret-key>\"",
" region: \"eu-central-1\"",
" bucket: \"my-bucket\"",
" prefix: \"sub-dir\"",
" action: NONE",
}
)
}
)
Expand Down Expand Up @@ -134,7 +169,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
.build();
List.Output run = task.run(runContext);

if (run.getObjects().size() == 0) {
if (run.getObjects().isEmpty()) {
return Optional.empty();
}

Expand Down
30 changes: 18 additions & 12 deletions src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ void deleteAction() throws Exception {
executionQueue.receive(TriggerTest.class, executionWithError -> {
Execution execution = executionWithError.getLeft();

last.set(execution);
queueCount.countDown();

assertThat(execution.getFlowId(), is("s3-listen"));
if (execution.getFlowId().equals("s3-listen")) {
last.set(execution);
queueCount.countDown();
}
});


Expand All @@ -79,8 +79,11 @@ void deleteAction() throws Exception {
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml")));

boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));
worker.shutdown();
try {
assertThat(await, is(true));
} finally {
worker.shutdown();
}

@SuppressWarnings("unchecked")
java.util.List<S3Object> trigger = (java.util.List<S3Object>) last.get().getTrigger().getVariables().get("objects");
Expand Down Expand Up @@ -118,10 +121,10 @@ void noneAction() throws Exception {
executionQueue.receive(TriggerTest.class, executionWithError -> {
Execution execution = executionWithError.getLeft();

last.set(execution);
queueCount.countDown();

assertThat(execution.getFlowId(), is("s3-listen-none-action"));
if (execution.getFlowId().equals("s3-listen-none-action")) {
last.set(execution);
queueCount.countDown();
}
});


Expand All @@ -133,8 +136,11 @@ void noneAction() throws Exception {
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml")));

boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));
worker.shutdown();
try {
assertThat(await, is(true));
} finally {
worker.shutdown();
}

@SuppressWarnings("unchecked")
java.util.List<S3Object> trigger = (java.util.List<S3Object>) last.get().getTrigger().getVariables().get("objects");
Expand Down

0 comments on commit 307a3fd

Please sign in to comment.