Skip to content

Commit

Permalink
fix(trigger): allow 'NONE' action
Browse files Browse the repository at this point in the history
  • Loading branch information
brian-mulier-p committed Jan 18, 2024
1 parent 4105e25 commit 00cdfc4
Show file tree
Hide file tree
Showing 6 changed files with 96 additions and 16 deletions.
3 changes: 2 additions & 1 deletion src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public interface ActionInterface {

enum Action {
MOVE,
DELETE
DELETE,
NONE
}
}
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/s3/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public List.Output run(RunContext runContext) throws Exception {
.collect(Collectors.toList());


S3Service.archive(
S3Service.performAction(
run.getObjects(),
this.action,
this.moveTo,
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/s3/S3Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public static Pair<GetObjectResponse, URI> download(RunContext runContext, S3Asy
}
}

static void archive(
static void performAction(
java.util.List<S3Object> s3Objects,
ActionInterface.Action action,
Copy.CopyObject moveTo,
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/io/kestra/plugin/aws/s3/Trigger.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import io.kestra.core.models.triggers.TriggerContext;
import io.kestra.core.models.triggers.TriggerOutput;
import io.kestra.core.runners.RunContext;
import io.kestra.core.utils.IdUtils;
import io.kestra.plugin.aws.AbstractConnectionInterface;
import io.kestra.plugin.aws.s3.models.S3Object;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -160,7 +159,7 @@ public Optional<Execution> evaluate(ConditionContext conditionContext, TriggerCo
}))
.collect(Collectors.toList());

S3Service.archive(
S3Service.performAction(
run.getObjects(),
this.action,
this.moveTo,
Expand Down
83 changes: 72 additions & 11 deletions src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,64 @@ class TriggerTest extends AbstractTest {
protected LocalFlowRepositoryLoader repositoryLoader;

@Test
void flow() throws Exception {
try {
this.createBucket("trigger-test");
} catch (Exception ignored) {
void deleteAction() throws Exception {
String bucket = "trigger-test";
this.createBucket(bucket);
List listTask = list().bucket(bucket).build();

// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);

// scheduler
Worker worker = new Worker(applicationContext, 8, null);
try (
AbstractScheduler scheduler = new DefaultScheduler(
this.applicationContext,
this.flowListenersService,
this.triggerState
);
) {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
executionQueue.receive(TriggerTest.class, executionWithError -> {
Execution execution = executionWithError.getLeft();

last.set(execution);
queueCount.countDown();

assertThat(execution.getFlowId(), is("s3-listen"));
});


upload("trigger/s3", bucket);
upload("trigger/s3", bucket);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen.yaml")));

boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));
worker.shutdown();

@SuppressWarnings("unchecked")
java.util.List<S3Object> trigger = (java.util.List<S3Object>) last.get().getTrigger().getVariables().get("objects");

assertThat(trigger.size(), is(2));

int remainingFilesOnBucket = listTask.run(runContext(listTask))
.getObjects()
.size();
assertThat(remainingFilesOnBucket, is(0));
}
}

@Test
void noneAction() throws Exception {
String bucket = "trigger-none-action-test";
this.createBucket(bucket);
List listTask = list().bucket(bucket).build();

// mock flow listeners
CountDownLatch queueCount = new CountDownLatch(1);
Expand All @@ -63,27 +115,36 @@ void flow() throws Exception {
AtomicReference<Execution> last = new AtomicReference<>();

// wait for execution
executionQueue.receive(TriggerTest.class, execution -> {
last.set(execution.getLeft());
executionQueue.receive(TriggerTest.class, executionWithError -> {
Execution execution = executionWithError.getLeft();

last.set(execution);
queueCount.countDown();
assertThat(execution.getLeft().getFlowId(), is("s3-listen"));

assertThat(execution.getFlowId(), is("s3-listen-none-action"));
});


upload("trigger/s3", "trigger-test");
upload("trigger/s3", "trigger-test");
upload("trigger/s3", bucket);
upload("trigger/s3", bucket);

worker.run();
scheduler.run();
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3")));
repositoryLoader.load(Objects.requireNonNull(TriggerTest.class.getClassLoader().getResource("flows/s3/s3-listen-none-action.yaml")));

queueCount.await(1, TimeUnit.MINUTES);
boolean await = queueCount.await(10, TimeUnit.SECONDS);
assertThat(await, is(true));
worker.shutdown();

@SuppressWarnings("unchecked")
java.util.List<S3Object> trigger = (java.util.List<S3Object>) last.get().getTrigger().getVariables().get("objects");

assertThat(trigger.size(), is(2));

int remainingFilesOnBucket = listTask.run(runContext(listTask))
.getObjects()
.size();
assertThat(remainingFilesOnBucket, is(2));
}
}
}
19 changes: 19 additions & 0 deletions src/test/resources/flows/s3/s3-listen-none-action.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
id: s3-listen-none-action
namespace: io.kestra.tests

triggers:
- id: watch
type: io.kestra.plugin.aws.s3.Trigger
bucket: trigger-none-action-test
interval: PT10S
action: NONE
# We need to use this endpoint for Localstack and DNS style path, see https://github.com/localstack/localstack/issues/2631
endpointOverride: http://s3.localhost.localstack.cloud:4566
region: us-east-1
accessKeyId: accesskey
secretKeyId: secretkey

tasks:
- id: end
type: io.kestra.core.tasks.debugs.Return
format: "{{task.id}} > {{taskrun.startDate}}"

0 comments on commit 00cdfc4

Please sign in to comment.