From 307a3fdc171bed56ee78dac0450c3f841cb86c27 Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Thu, 18 Jan 2024 11:06:50 +0100 Subject: [PATCH] fix(trigger): add example with 'NONE' action --- .../kestra/plugin/aws/s3/ActionInterface.java | 4 +- .../java/io/kestra/plugin/aws/s3/Trigger.java | 37 ++++++++++++++++++- .../io/kestra/plugin/aws/s3/TriggerTest.java | 30 +++++++++------ 3 files changed, 56 insertions(+), 15 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java index 0df884e5..51f39726 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java @@ -7,14 +7,14 @@ public interface ActionInterface { @Schema( - title = "The action to perform on the retrieved files." + title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering." ) @PluginProperty(dynamic = true) @NotNull ActionInterface.Action getAction(); @Schema( - title = "The destination bucket and key." + title = "The destination bucket and key for `MOVE` action." ) @PluginProperty(dynamic = true) Copy.CopyObject getMoveTo(); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index a3e7b2d4..a63af6ab 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -67,6 +67,41 @@ " moveTo: ", " key: archive", } + ), + @Example( + title = "Wait for a list of files on a s3 bucket and iterate through the files. Delete files manually after processing to prevent infinite triggering.", + full = true, + code = { + "id: s3-listen", + "namespace: io.kestra.tests", + "", + "tasks:", + " - id: each", + " type: io.kestra.core.tasks.flows.EachSequential", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{ taskrun.value }}\"", + " - id: delete", + " type: io.kestra.plugin.aws.s3.Delete", + " accessKeyId: \"\"", + " secretKeyId: \"\"", + " region: \"eu-central-1\"", + " bucket: \"my-bucket\"", + " key: \"{{ taskrun.value }}\"", + " value: \"{{ trigger.objects | jq('.[].key') }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.aws.s3.Trigger", + " interval: \"PT5M\"", + " accessKeyId: \"\"", + " secretKeyId: \"\"", + " region: \"eu-central-1\"", + " bucket: \"my-bucket\"", + " prefix: \"sub-dir\"", + " action: NONE", + } ) } ) @@ -134,7 +169,7 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo .build(); List.Output run = task.run(runContext); - if (run.getObjects().size() == 0) { + if (run.getObjects().isEmpty()) { return Optional.empty(); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index 4faa0921..cd0711ed 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -64,10 +64,10 @@ void deleteAction() throws Exception { executionQueue.receive(TriggerTest.class, executionWithError -> { Execution execution = executionWithError.getLeft(); - last.set(execution); - queueCount.countDown(); - - assertThat(execution.getFlowId(), is("s3-listen")); + if (execution.getFlowId().equals("s3-listen")) { + last.set(execution); + queueCount.countDown(); + } }); @@ -79,8 +79,11 @@ void deleteAction() throws Exception { repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); boolean await = queueCount.await(10, TimeUnit.SECONDS); - assertThat(await, is(true)); - worker.shutdown(); + try { + assertThat(await, is(true)); + } finally { + worker.shutdown(); + } @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); @@ -118,10 +121,10 @@ void noneAction() throws Exception { executionQueue.receive(TriggerTest.class, executionWithError -> { Execution execution = executionWithError.getLeft(); - last.set(execution); - queueCount.countDown(); - - assertThat(execution.getFlowId(), is("s3-listen-none-action")); + if (execution.getFlowId().equals("s3-listen-none-action")) { + last.set(execution); + queueCount.countDown(); + } }); @@ -133,8 +136,11 @@ void noneAction() throws Exception { repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); boolean await = queueCount.await(10, TimeUnit.SECONDS); - assertThat(await, is(true)); - worker.shutdown(); + try { + assertThat(await, is(true)); + } finally { + worker.shutdown(); + } @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects");