From 00cdfc4480b10ee8c5267300102030ac4d849ad1 Mon Sep 17 00:00:00 2001 From: "brian.mulier" Date: Thu, 18 Jan 2024 10:41:29 +0100 Subject: [PATCH] fix(trigger): allow 'NONE' action part of kestra-io/kestra#1842 --- .../kestra/plugin/aws/s3/ActionInterface.java | 3 +- .../io/kestra/plugin/aws/s3/Downloads.java | 2 +- .../io/kestra/plugin/aws/s3/S3Service.java | 2 +- .../java/io/kestra/plugin/aws/s3/Trigger.java | 3 +- .../io/kestra/plugin/aws/s3/TriggerTest.java | 83 ++++++++++++++++--- .../flows/s3/s3-listen-none-action.yaml | 19 +++++ 6 files changed, 96 insertions(+), 16 deletions(-) create mode 100644 src/test/resources/flows/s3/s3-listen-none-action.yaml diff --git a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java index 733b939a..0df884e5 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java @@ -21,6 +21,7 @@ public interface ActionInterface { enum Action { MOVE, - DELETE + DELETE, + NONE } } diff --git a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java index 7b97459c..a7cce3be 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java @@ -109,7 +109,7 @@ public List.Output run(RunContext runContext) throws Exception { .collect(Collectors.toList()); - S3Service.archive( + S3Service.performAction( run.getObjects(), this.action, this.moveTo, diff --git a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java index 1f169311..7349f343 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java +++ b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java @@ -48,7 +48,7 @@ public static Pair download(RunContext runContext, S3Asy } } - static void archive( + static void performAction( java.util.List s3Objects, ActionInterface.Action action, Copy.CopyObject moveTo, diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index fb906cf7..a3e7b2d4 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -11,7 +11,6 @@ import io.kestra.core.models.triggers.TriggerContext; import io.kestra.core.models.triggers.TriggerOutput; import io.kestra.core.runners.RunContext; -import io.kestra.core.utils.IdUtils; import io.kestra.plugin.aws.AbstractConnectionInterface; import io.kestra.plugin.aws.s3.models.S3Object; import io.swagger.v3.oas.annotations.media.Schema; @@ -160,7 +159,7 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo })) .collect(Collectors.toList()); - S3Service.archive( + S3Service.performAction( run.getObjects(), this.action, this.moveTo, diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index ad22adaa..4faa0921 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -41,12 +41,64 @@ class TriggerTest extends AbstractTest { protected LocalFlowRepositoryLoader repositoryLoader; @Test - void flow() throws Exception { - try { - this.createBucket("trigger-test"); - } catch (Exception ignored) { + void deleteAction() throws Exception { + String bucket = "trigger-test"; + this.createBucket(bucket); + List listTask = list().bucket(bucket).build(); + // mock flow listeners + CountDownLatch queueCount = new CountDownLatch(1); + + // scheduler + Worker worker = new Worker(applicationContext, 8, null); + try ( + AbstractScheduler scheduler = new DefaultScheduler( + this.applicationContext, + this.flowListenersService, + this.triggerState + ); + ) { + AtomicReference last = new AtomicReference<>(); + + // wait for execution + executionQueue.receive(TriggerTest.class, executionWithError -> { + Execution execution = executionWithError.getLeft(); + + last.set(execution); + queueCount.countDown(); + + assertThat(execution.getFlowId(), is("s3-listen")); + }); + + + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); + + worker.run(); + scheduler.run(); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml"))); + + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); + worker.shutdown(); + + @SuppressWarnings("unchecked") + java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); + + assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(0)); } + } + + @Test + void noneAction() throws Exception { + String bucket = "trigger-none-action-test"; + this.createBucket(bucket); + List listTask = list().bucket(bucket).build(); // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); @@ -63,27 +115,36 @@ void flow() throws Exception { AtomicReference last = new AtomicReference<>(); // wait for execution - executionQueue.receive(TriggerTest.class, execution -> { - last.set(execution.getLeft()); + executionQueue.receive(TriggerTest.class, executionWithError -> { + Execution execution = executionWithError.getLeft(); + last.set(execution); queueCount.countDown(); - assertThat(execution.getLeft().getFlowId(), is("s3-listen")); + + assertThat(execution.getFlowId(), is("s3-listen-none-action")); }); - upload("trigger/s3", "trigger-test"); - upload("trigger/s3", "trigger-test"); + upload("trigger/s3", bucket); + upload("trigger/s3", bucket); worker.run(); scheduler.run(); - repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3"))); + repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml"))); - queueCount.await(1, TimeUnit.MINUTES); + boolean await = queueCount.await(10, TimeUnit.SECONDS); + assertThat(await, is(true)); + worker.shutdown(); @SuppressWarnings("unchecked") java.util.List trigger = (java.util.List) last.get().getTrigger().getVariables().get("objects"); assertThat(trigger.size(), is(2)); + + int remainingFilesOnBucket = listTask.run(runContext(listTask)) + .getObjects() + .size(); + assertThat(remainingFilesOnBucket, is(2)); } } } diff --git a/src/test/resources/flows/s3/s3-listen-none-action.yaml b/src/test/resources/flows/s3/s3-listen-none-action.yaml new file mode 100644 index 00000000..8e9ca3f9 --- /dev/null +++ b/src/test/resources/flows/s3/s3-listen-none-action.yaml @@ -0,0 +1,19 @@ +id: s3-listen-none-action +namespace: io.kestra.tests + +triggers: + - id: watch + type: io.kestra.plugin.aws.s3.Trigger + bucket: trigger-none-action-test + interval: PT10S + action: NONE + # We need to use this endpoint for Localstack and DNS style path, see https://github.com/localstack/localstack/issues/2631 + endpointOverride: http://s3.localhost.localstack.cloud:4566 + region: us-east-1 + accessKeyId: accesskey + secretKeyId: secretkey + +tasks: + - id: end + type: io.kestra.core.tasks.debugs.Return + format: "{{task.id}} > {{taskrun.startDate}}"