diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java index 94d5eed..ebef4df 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.s3; +import io.kestra.core.models.property.Property; import io.kestra.plugin.aws.AbstractConnection; import lombok.EqualsAndHashCode; import lombok.Getter; @@ -13,9 +14,9 @@ @Getter @NoArgsConstructor public abstract class AbstractS3Object extends AbstractConnection implements AbstractS3ObjectInterface { - protected String requestPayer; + protected Property requestPayer; - protected String bucket; + protected Property bucket; static { // Initializing CRT will download the S3 native library into /tmp. diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java index 0dbfcda..76b19b6 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3ObjectInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -8,13 +9,11 @@ public interface AbstractS3ObjectInterface extends AbstractS3 { @Schema( title = "The S3 bucket name." ) - @PluginProperty(dynamic = true) @NotNull - String getBucket(); + Property getBucket(); @Schema( title = "Sets the value of the RequestPayer property for this object." ) - @PluginProperty(dynamic = true) - String getRequestPayer(); + Property getRequestPayer(); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java index 9d7190e..e32259e 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -9,9 +10,8 @@ public interface ActionInterface { @Schema( title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering." ) - @PluginProperty(dynamic = true) @NotNull - ActionInterface.Action getAction(); + Property getAction(); @Schema( title = "The destination bucket and key for `MOVE` action." diff --git a/src/main/java/io/kestra/plugin/aws/s3/Copy.java b/src/main/java/io/kestra/plugin/aws/s3/Copy.java index fc43839..433cc88 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Copy.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Copy.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; @@ -62,27 +63,26 @@ public class Copy extends AbstractConnection implements AbstractS3, RunnableTask @Schema( title = "Whether to delete the source file after download." ) - @PluginProperty @Builder.Default - private Boolean delete = false; + private Property delete = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { try (S3Client client = this.client(runContext)) { CopyObjectRequest.Builder builder = CopyObjectRequest.builder() - .sourceBucket(runContext.render(this.from.bucket)) - .sourceKey(runContext.render(this.from.key)) - .destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket)) - .destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key)); + .sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow()) + .sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow()) + .destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow()) + .destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key).as(String.class).orElseThrow()); if (this.from.versionId != null) { - builder.sourceVersionId(runContext.render(this.from.versionId)); + builder.sourceVersionId(runContext.render(this.from.versionId).as(String.class).orElseThrow()); } CopyObjectRequest request = builder.build(); CopyObjectResponse response = client.copyObject(request); - if (this.delete) { + if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) { Delete.builder() .id(this.id) .type(Delete.class.getName()) @@ -96,8 +96,8 @@ public Output run(RunContext runContext) throws Exception { .stsRoleSessionDuration(this.stsRoleSessionDuration) .stsRoleArn(this.stsRoleArn) .stsEndpointOverride(this.stsEndpointOverride) - .bucket(request.sourceBucket()) - .key(request.sourceKey()) + .bucket(Property.of(request.sourceBucket())) + .key(Property.of(request.sourceKey())) .build() .run(runContext); } @@ -118,16 +118,14 @@ public static class CopyObject { @Schema( title = "The bucket name" ) - @PluginProperty(dynamic = true) @NotNull - String bucket; + Property bucket; @Schema( title = "The bucket key" ) - @PluginProperty(dynamic = true) @NotNull - String key; + Property key; } @SuperBuilder(toBuilder = true) @@ -137,8 +135,7 @@ public static class CopyObjectFrom extends CopyObject { @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - private String versionId; + private Property versionId; } @SuperBuilder diff --git a/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java b/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java index 31bc0ee..5c75f5c 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java +++ b/src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.AbstractConnection; @@ -46,91 +47,79 @@ public class CreateBucket extends AbstractConnection implements AbstractS3, Runn @Schema( description = "The S3 bucket name to create." ) - @PluginProperty(dynamic = true) @NotNull - private String bucket; + private Property bucket; @Schema( description = "Allows grantee the read, write, read ACP, and write ACP permissions on the bucket." ) - @PluginProperty(dynamic = true) - private String grantFullControl; + private Property grantFullControl; @Schema( title = "Allows grantee to list the objects in the bucket." ) - @PluginProperty(dynamic = true) - private String grantRead; + private Property grantRead; @Schema( title = "Allows grantee to list the ACL for the applicable bucket." ) - @PluginProperty(dynamic = true) - private String grantReadACP; + private Property grantReadACP; @Schema( title = "Allows grantee to create, overwrite, and delete any object in the bucket." ) - @PluginProperty(dynamic = true) - private String grantWrite; + private Property grantWrite; @Schema( title = "Allows grantee to write the ACL for the applicable bucket." ) - @PluginProperty(dynamic = true) - private String grantWriteACP; + private Property grantWriteACP; @Schema( title = "The canned ACL to apply to the bucket." ) - @PluginProperty(dynamic = true) - private String acl; + private Property acl; @Schema( title = "Specifies whether you want S3 Object Lock to be enabled for the new bucket." ) - @PluginProperty - private Boolean objectLockEnabledForBucket; + private Property objectLockEnabledForBucket; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); try (S3Client client = this.client(runContext)) { CreateBucketRequest.Builder builder = CreateBucketRequest.builder() .bucket(bucket); if (grantFullControl != null) { - builder.grantFullControl(runContext.render(this.grantFullControl)); + builder.grantFullControl(runContext.render(this.grantFullControl).as(String.class).orElseThrow()); } if (grantRead != null) { - builder.grantRead(runContext.render(this.grantRead)); + builder.grantRead(runContext.render(this.grantRead).as(String.class).orElseThrow()); } if (grantReadACP != null) { - builder.grantReadACP(runContext.render(this.grantReadACP)); + builder.grantReadACP(runContext.render(this.grantReadACP).as(String.class).orElseThrow()); } if (grantWrite != null) { - builder.grantWrite(runContext.render(this.grantWrite)); + builder.grantWrite(runContext.render(this.grantWrite).as(String.class).orElseThrow()); } if (grantWriteACP != null) { - builder.grantWriteACP(runContext.render(this.grantWriteACP)); + builder.grantWriteACP(runContext.render(this.grantWriteACP).as(String.class).orElseThrow()); } if (acl != null) { - builder.acl(runContext.render(this.acl)); + builder.acl(runContext.render(this.acl).as(String.class).orElseThrow()); } if (objectLockEnabledForBucket != null) { - builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket); - } - - if (objectLockEnabledForBucket != null) { - builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket); + builder.objectLockEnabledForBucket(runContext.render(this.objectLockEnabledForBucket).as(Boolean.class).orElseThrow()); } CreateBucketResponse response = client.createBucket(builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Delete.java b/src/main/java/io/kestra/plugin/aws/s3/Delete.java index a5b0ba0..6b341b4 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Delete.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Delete.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -45,15 +46,13 @@ public class Delete extends AbstractS3Object implements RunnableTask key; @Schema( title = "Indicates whether S3 Object Lock should bypass Governance-mode restrictions to process this operation." ) - @PluginProperty - private Boolean bypassGovernanceRetention; + private Property bypassGovernanceRetention; @Schema( title = "The concatenation of the authentication device's serial number, a space, and the value that is displayed on " + @@ -61,19 +60,17 @@ public class Delete extends AbstractS3Object implements RunnableTask mfa; @Schema( description = "Sets the value of the RequestPayer property for this object." ) - @PluginProperty(dynamic = true) - private String requestPayer; + private Property requestPayer; @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3Client client = client(runContext)) { DeleteObjectRequest.Builder builder = DeleteObjectRequest.builder() @@ -81,15 +78,15 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.bypassGovernanceRetention != null) { - builder.bypassGovernanceRetention(this.bypassGovernanceRetention); + builder.bypassGovernanceRetention(runContext.render(this.bypassGovernanceRetention).as(Boolean.class).orElseThrow()); } if (this.mfa != null) { - builder.mfa(runContext.render(this.mfa)); + builder.mfa(runContext.render(this.mfa).as(String.class).orElseThrow()); } if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } DeleteObjectResponse response = client.deleteObject(builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java b/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java index 5874f69..b6544d5 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java +++ b/src/main/java/io/kestra/plugin/aws/s3/DeleteList.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -54,23 +55,23 @@ title = "Delete a list of keys on a S3 bucket." ) public class DeleteList extends AbstractS3Object implements RunnableTask, ListInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); @Min(2) @Schema( @@ -82,14 +83,13 @@ public class DeleteList extends AbstractS3Object implements RunnableTask errorOnEmpty = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { Logger logger = runContext.logger(); - String bucket = runContext.render(this.bucket); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); try (S3Client client = this.client(runContext)) { @@ -122,11 +122,11 @@ public Output run(RunContext runContext) throws Exception { runContext.metric(Counter.of("count", finalResult.getLeft())); runContext.metric(Counter.of("size", finalResult.getRight())); - if (errorOnEmpty && finalResult.getLeft() == 0) { + if (runContext.render(errorOnEmpty).as(Boolean.class).orElseThrow() && finalResult.getLeft() == 0) { throw new NoSuchElementException("Unable to find any files to delete on " + - runContext.render(this.bucket) + " " + - "with regexp='" + runContext.render(this.regexp) + "', " + - "prefix='" + runContext.render(this.prefix) + "'" + runContext.render(this.bucket).as(String.class).orElseThrow() + " " + + "with regexp='" + runContext.render(this.regexp).as(String.class).orElse(null) + "', " + + "prefix='" + runContext.render(this.prefix).as(String.class).orElse(null) + "'" ); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/Download.java b/src/main/java/io/kestra/plugin/aws/s3/Download.java index 5a5c6ac..89ebf24 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Download.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Download.java @@ -51,27 +51,24 @@ public class Download extends AbstractS3Object implements RunnableTask key; @Schema( title = "The specific version of the object." ) - @PluginProperty(dynamic = true) - protected String versionId; + protected Property versionId; @Schema( title = "If set to true, the task will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which better integrates with S3-compatible services but restricts uploads and downloads to 2GB." ) - @PluginProperty @Builder.Default private Property compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3AsyncClient client = this.asyncClient(runContext)) { GetObjectRequest.Builder builder = GetObjectRequest.builder() @@ -79,11 +76,11 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.versionId != null) { - builder.versionId(runContext.render(this.versionId)); + builder.versionId(runContext.render(this.versionId).as(String.class).orElseThrow()); } if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } Pair download = S3Service.download(runContext, client, builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java index d8d5fe0..848a84a 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java @@ -52,33 +52,32 @@ title = "Downloads multiple files from a S3 bucket." ) public class Downloads extends AbstractS3Object implements RunnableTask, ListInterface, ActionInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); @Schema( title = "This property will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which maximizes compatibility with S3-compatible services but restricts uploads and downloads to 2GB." ) - @PluginProperty @Builder.Default private Property compatibilityMode = Property.of(false); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); - private ActionInterface.Action action; + private Property action; private Copy.CopyObject moveTo; @@ -116,7 +115,7 @@ public Output run(RunContext runContext) throws Exception { .stream() .map(throwFunction(object -> { GetObjectRequest.Builder builder = GetObjectRequest.builder() - .bucket(runContext.render(bucket)) + .bucket(runContext.render(bucket).as(String.class).orElseThrow()) .key(object.getKey()); Pair download = S3Service.download(runContext, client, builder.build()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/List.java b/src/main/java/io/kestra/plugin/aws/s3/List.java index 3dc8f47..228aa97 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/List.java +++ b/src/main/java/io/kestra/plugin/aws/s3/List.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -41,23 +42,23 @@ title = "List keys on a S3 bucket." ) public class List extends AbstractS3Object implements RunnableTask, ListInterface { - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); @Override public Output run(RunContext runContext) throws Exception { @@ -69,9 +70,9 @@ public Output run(RunContext runContext) throws Exception { runContext.logger().debug( "Found '{}' keys on {} with regexp='{}', prefix={}", list.size(), - runContext.render(bucket), - runContext.render(regexp), - runContext.render(prefix) + runContext.render(bucket).as(String.class).orElseThrow(), + runContext.render(regexp).as(String.class).orElse(null), + runContext.render(prefix).as(String.class).orElse(null) ); return Output.builder() diff --git a/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java b/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java index b23f961..4a3e366 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java +++ b/src/main/java/io/kestra/plugin/aws/s3/ListInterface.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.swagger.v3.oas.annotations.media.Schema; import jakarta.validation.constraints.NotNull; @@ -9,48 +10,41 @@ public interface ListInterface { @Schema( title = "The S3 bucket where to download the file." ) - @PluginProperty(dynamic = true) @NotNull - String getBucket(); + Property getBucket(); @Schema( title = "Limits the response to keys that begin with the specified prefix." ) - @PluginProperty(dynamic = true) - String getPrefix(); + Property getPrefix(); @Schema( title = "A delimiter is a character you use to group keys." ) - @PluginProperty(dynamic = true) - String getDelimiter(); + Property getDelimiter(); @Schema( title = "Marker is where you want Amazon S3 to start listing from.", description = "Amazon S3 starts listing after this specified key. Marker can be any key in the bucket." ) - @PluginProperty(dynamic = true) - String getMarker(); + Property getMarker(); @Schema( title = "The EncodingType property for this object." ) - @PluginProperty(dynamic = true) - String getEncodingType(); + Property getEncodingType(); @Schema( title = "Sets the maximum number of keys returned in the response.", description = "By default, the action returns up to 1,000 key names. The response might contain fewer keys but will never contain more." ) - @PluginProperty(dynamic = true) - Integer getMaxKeys(); + Property getMaxKeys(); @Schema( title = "The account ID of the expected bucket owner.", description = "If the bucket is owned by a different account, the request fails with the HTTP status code 403 Forbidden (access denied)." ) - @PluginProperty(dynamic = true) - String getExpectedBucketOwner(); + Property getExpectedBucketOwner(); @Schema( title = "A regexp to filter on full key.", @@ -58,14 +52,12 @@ public interface ListInterface { "`regExp: .*` to match all files\n"+ "`regExp: .*2020-01-0.\\\\.csv` to match files between 01 and 09 of january ending with `.csv`" ) - @PluginProperty(dynamic = true) - String getRegexp(); + Property getRegexp(); @Schema( title = "The type of objects to filter: files, directory, or both." ) - @PluginProperty - Filter getFilter(); + Property getFilter(); enum Filter { diff --git a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java index f0fb05d..637014e 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/S3Service.java +++ b/src/main/java/io/kestra/plugin/aws/s3/S3Service.java @@ -2,6 +2,7 @@ import io.kestra.core.exceptions.IllegalVariableEvaluationException; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContext; import io.kestra.core.utils.FileUtils; import io.kestra.plugin.aws.AbstractConnectionInterface; @@ -27,6 +28,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; +import static io.kestra.core.utils.Rethrow.throwPredicate; + public class S3Service { public static void initCrt() { @@ -58,14 +61,15 @@ public static Pair download(RunContext runContext, S3Asy static void performAction( java.util.List s3Objects, - ActionInterface.Action action, + Property action, Copy.CopyObject moveTo, RunContext runContext, AbstractS3ObjectInterface abstractS3Object, AbstractConnectionInterface abstractS3, AbstractConnectionInterface abstractConnection ) throws Exception { - if (action == ActionInterface.Action.DELETE) { + var renderedAction = runContext.render(action).as(ActionInterface.Action.class).orElseThrow(); + if (renderedAction == ActionInterface.Action.DELETE) { for (S3Object object : s3Objects) { Delete delete = Delete.builder() .id("archive") @@ -74,7 +78,7 @@ static void performAction( .endpointOverride(abstractS3.getEndpointOverride()) .accessKeyId(abstractConnection.getAccessKeyId()) .secretKeyId(abstractConnection.getSecretKeyId()) - .key(object.getKey()) + .key(Property.of(object.getKey())) .bucket(abstractS3Object.getBucket()) .stsRoleArn(abstractConnection.getStsRoleArn()) .stsRoleExternalId(abstractConnection.getStsRoleExternalId()) @@ -84,7 +88,7 @@ static void performAction( .build(); delete.run(runContext); } - } else if (action == ActionInterface.Action.MOVE) { + } else if (renderedAction == ActionInterface.Action.MOVE) { for (S3Object object : s3Objects) { Copy copy = Copy.builder() .id("archive") @@ -100,16 +104,16 @@ static void performAction( .stsEndpointOverride(abstractConnection.getStsEndpointOverride()) .from(Copy.CopyObjectFrom.builder() .bucket(abstractS3Object.getBucket()) - .key(object.getKey()) + .key(Property.of(object.getKey())) .build() ) .to(moveTo.toBuilder() - .key(StringUtils.stripEnd(moveTo.getKey() + "/", "/") + .key(Property.of(StringUtils.stripEnd(moveTo.getKey() + "/", "/") + "/" + FilenameUtils.getName(object.getKey()) - ) + )) .build() ) - .delete(true) + .delete(Property.of(true)) .build(); copy.run(runContext); } @@ -118,41 +122,41 @@ static void performAction( public static List list(RunContext runContext, S3Client client, ListInterface list, AbstractS3Object abstractS3) throws IllegalVariableEvaluationException { ListObjectsRequest.Builder builder = ListObjectsRequest.builder() - .bucket(runContext.render(list.getBucket())) - .maxKeys(list.getMaxKeys()); + .bucket(runContext.render(list.getBucket()).as(String.class).orElseThrow()) + .maxKeys(runContext.render(list.getMaxKeys()).as(Integer.class).orElse(1000)); if (list.getPrefix() != null) { - builder.prefix(runContext.render(list.getPrefix())); + builder.prefix(runContext.render(list.getPrefix()).as(String.class).orElseThrow()); } if (list.getDelimiter() != null) { - builder.delimiter(runContext.render(list.getDelimiter())); + builder.delimiter(runContext.render(list.getDelimiter()).as(String.class).orElseThrow()); } if (list.getMarker() != null) { - builder.marker(runContext.render(list.getMarker())); + builder.marker(runContext.render(list.getMarker()).as(String.class).orElseThrow()); } if (list.getEncodingType() != null) { - builder.encodingType(runContext.render(list.getEncodingType())); + builder.encodingType(runContext.render(list.getEncodingType()).as(String.class).orElseThrow()); } if (list.getExpectedBucketOwner() != null) { - builder.expectedBucketOwner(runContext.render(list.getExpectedBucketOwner())); + builder.expectedBucketOwner(runContext.render(list.getExpectedBucketOwner()).as(String.class).orElseThrow()); } if (abstractS3.getRequestPayer() != null) { - builder.requestPayer(runContext.render(abstractS3.getRequestPayer())); + builder.requestPayer(runContext.render(abstractS3.getRequestPayer()).as(String.class).orElseThrow()); } - String regExp = runContext.render(list.getRegexp()); + String regExp = runContext.render(list.getRegexp()).as(String.class).orElse(null); ListObjectsResponse listObjectsResponse = client.listObjects(builder.build()); return listObjectsResponse .contents() .stream() - .filter(s3Object -> S3Service.filter(s3Object, regExp, list.getFilter())) + .filter(throwPredicate(s3Object -> S3Service.filter(s3Object, regExp, runContext.render(list.getFilter()).as(ListInterface.Filter.class).orElseThrow()))) .map(S3Object::of) .collect(Collectors.toList()); } diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index 7d51daa..b238ba4 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -117,29 +117,29 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, protected Property endpointOverride; - protected String requestPayer; + protected Property requestPayer; - protected String bucket; + protected Property bucket; - private String prefix; + private Property prefix; - private String delimiter; + private Property delimiter; - private String marker; + private Property marker; - private String encodingType; + private Property encodingType; @Builder.Default - private Integer maxKeys = 1000; + private Property maxKeys = Property.of(1000); - private String expectedBucketOwner; + private Property expectedBucketOwner; - protected String regexp; + protected Property regexp; @Builder.Default - protected final Filter filter = Filter.BOTH; + protected final Property filter = Property.of(Filter.BOTH); - private ActionInterface.Action action; + private Property action; private Copy.CopyObject moveTo; @@ -204,7 +204,7 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo .stsEndpointOverride(this.stsEndpointOverride) .requestPayer(this.requestPayer) .bucket(this.bucket) - .key(object.getKey()) + .key(Property.of(object.getKey())) .build(); Download.Output downloadOutput = download.run(runContext); diff --git a/src/main/java/io/kestra/plugin/aws/s3/Upload.java b/src/main/java/io/kestra/plugin/aws/s3/Upload.java index 43b5aca..72b8b5b 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Upload.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Upload.java @@ -77,142 +77,121 @@ public class Upload extends AbstractS3Object implements RunnableTask key; @Schema( title = "A map of metadata to store with the object in S3." ) - @PluginProperty(dynamic = true) - private Map metadata; + private Property> metadata; @Schema( title = "Can be used to specify caching behavior along the request/response chain." ) - @PluginProperty(dynamic = true) - private String cacheControl; + private Property cacheControl; @Schema( title = "A standard MIME type describing the format of the contents." ) - @PluginProperty(dynamic = true) - private String contentType; + private Property contentType; @Schema( title = "Specifies what content encodings have been applied to the object.", description = "And thus, what decoding mechanisms must be applied to obtain the media-type referenced by the Content-Type header field." ) - @PluginProperty(dynamic = true) - private String contentEncoding; + private Property contentEncoding; @Schema( title = "Specifies presentational information for the object." ) - @PluginProperty(dynamic = true) - private String contentDisposition; + private Property contentDisposition; @Schema( title = "The language the content is in." ) - @PluginProperty(dynamic = true) - private String contentLanguage; + private Property contentLanguage; @Schema( title = "The size of the body in bytes.", description = "This parameter is useful when the size of the body cannot be determined automatically." ) - @PluginProperty - private Long contentLength; + private Property contentLength; @Schema( title = "The date and time after which the object is no longer cacheable." ) - @PluginProperty(dynamic = true) - private String expires; + private Property expires; @Schema( title = "The canned ACL to apply to the object." ) - @PluginProperty(dynamic = true) - private String acl; + private Property acl; @Schema( title = "If you don't specify, S3 Standard is the default storage class. Amazon S3 supports other storage classes." ) - @PluginProperty - private StorageClass storageClass; + private Property storageClass; @Schema( title = "The server-side encryption algorithm used when storing this object in Amazon S3.", description = "For example, AES256, aws:kms, aws:kms:dsse" ) - @PluginProperty - private ServerSideEncryption serverSideEncryption; + private Property serverSideEncryption; @Schema( title = "Specifies whether Amazon S3 should use an S3 Bucket Key for object encryption with server-side encryption using Key Management Service (KMS) keys (SSE-KMS).", description = "Setting this header to true causes Amazon S3 to use an S3 Bucket Key for object encryption with SSE-KMS." ) - @PluginProperty - private Boolean bucketKeyEnabled; + private Property bucketKeyEnabled; @Schema( title = "Indicates the algorithm used to create the checksum for the object when using the SDK." ) - @PluginProperty - private ChecksumAlgorithm checksumAlgorithm; + private Property checksumAlgorithm; @Schema( title = "The account ID of the expected bucket owner.", description = "If the bucket is owned by a different account, the request fails " + "with the HTTP status code `403 Forbidden` (access denied)." ) - @PluginProperty(dynamic = true) - private String expectedBucketOwner; + private Property expectedBucketOwner; @Schema( title = "The Object Lock mode that you want to apply to this object." ) - @PluginProperty - private ObjectLockMode objectLockMode; + private Property objectLockMode; @Schema( title = "Specifies whether a legal hold will be applied to this object." ) - @PluginProperty - private ObjectLockLegalHoldStatus objectLockLegalHoldStatus; + private Property objectLockLegalHoldStatus; @Schema( title = "The date and time when you want this object's Object Lock to expire. " ) - @PluginProperty(dynamic = true) - private String objectLockRetainUntilDate; + private Property objectLockRetainUntilDate; @Schema( title = "The checksum data integrity check to verify that the data received is the same data that was originally sent.", description = "Must be used in pair with `checksumAlgorithm` to defined the expect algorithm of these values" ) - @PluginProperty(dynamic = true) - private String checksum; + private Property checksum; @Schema( title = "The tag-set for the object." ) - @PluginProperty - private Map tagging; + private Property> tagging; @Schema( title = "This property will use the AWS S3 DefaultAsyncClient instead of the S3CrtAsyncClient, which maximizes compatibility with S3-compatible services but restricts uploads and downloads to 2GB. For some S3 endpoints such as CloudFlare R2, you may need to set this value to `true`." ) - @PluginProperty @Builder.Default private Property compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { - String bucket = runContext.render(this.bucket); - String key = runContext.render(this.key); + String bucket = runContext.render(this.bucket).as(String.class).orElseThrow(); + String key = runContext.render(this.key).as(String.class).orElseThrow(); try (S3AsyncClient client = this.asyncClient(runContext)) { PutObjectRequest.Builder builder = PutObjectRequest @@ -221,86 +200,88 @@ public Output run(RunContext runContext) throws Exception { .key(key); if (this.requestPayer != null) { - builder.requestPayer(runContext.render(this.requestPayer)); + builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow()); } if (this.metadata != null) { - builder.metadata(runContext.renderMap(this.metadata)); + builder.metadata(runContext.render(this.metadata).asMap(String.class, String.class)); } if (this.cacheControl != null) { - builder.cacheControl(runContext.render(this.cacheControl)); + builder.cacheControl(runContext.render(this.cacheControl).as(String.class).orElseThrow()); } if (this.contentType != null) { - builder.contentType(runContext.render(this.contentType)); + builder.contentType(runContext.render(this.contentType).as(String.class).orElseThrow()); } if (this.contentEncoding != null) { - builder.contentEncoding(runContext.render(this.contentEncoding)); + builder.contentEncoding(runContext.render(this.contentEncoding).as(String.class).orElseThrow()); } if (this.contentDisposition != null) { - builder.contentDisposition(runContext.render(this.contentDisposition)); + builder.contentDisposition(runContext.render(this.contentDisposition).as(String.class).orElseThrow()); } if (this.contentLanguage != null) { - builder.contentLanguage(runContext.render(this.contentLanguage)); + builder.contentLanguage(runContext.render(this.contentLanguage).as(String.class).orElseThrow()); } if (this.contentLength != null) { - builder.contentLength(this.contentLength); + builder.contentLength(runContext.render(this.contentLength).as(Long.class).orElseThrow()); } if (this.expires != null) { - builder.expires(Instant.parse(runContext.render(this.expires))); + builder.expires(Instant.parse(runContext.render(this.expires).as(String.class).orElseThrow())); } if (this.acl != null) { - builder.acl(runContext.render(this.acl)); + builder.acl(runContext.render(this.acl).as(String.class).orElseThrow()); } if (this.storageClass != null) { - builder.storageClass(this.storageClass); + builder.storageClass(runContext.render(this.storageClass).as(StorageClass.class).orElseThrow()); } if (this.serverSideEncryption != null) { - builder.serverSideEncryption(this.serverSideEncryption); + builder.serverSideEncryption(runContext.render(this.serverSideEncryption).as(ServerSideEncryption.class).orElseThrow()); } if (this.bucketKeyEnabled != null) { - builder.bucketKeyEnabled(this.bucketKeyEnabled); + builder.bucketKeyEnabled(runContext.render(this.bucketKeyEnabled).as(Boolean.class).orElseThrow()); } if (this.checksumAlgorithm != null) { - builder.checksumAlgorithm(this.checksumAlgorithm); - switch (this.checksumAlgorithm) { - case SHA1 -> builder.checksumSHA1(runContext.render(this.checksum)); - case SHA256 -> builder.checksumSHA256(runContext.render(this.checksum)); - case CRC32 -> builder.checksumCRC32(runContext.render(this.checksum)); - case CRC32_C -> builder.checksumCRC32C(runContext.render(this.checksum)); + var renderedAlgorithm = runContext.render(this.checksumAlgorithm).as(ChecksumAlgorithm.class).orElseThrow(); + var sum = runContext.render(this.checksum).as(String.class).orElse(null); + builder.checksumAlgorithm(renderedAlgorithm); + switch (renderedAlgorithm) { + case SHA1 -> builder.checksumSHA1(sum); + case SHA256 -> builder.checksumSHA256(sum); + case CRC32 -> builder.checksumCRC32(sum); + case CRC32_C -> builder.checksumCRC32C(sum); } } if (this.expectedBucketOwner != null) { - builder.expectedBucketOwner(runContext.render(this.expectedBucketOwner)); + builder.expectedBucketOwner(runContext.render(this.expectedBucketOwner).as(String.class).orElseThrow()); } if (this.objectLockMode != null) { - builder.objectLockMode(this.objectLockMode); + builder.objectLockMode(runContext.render(this.objectLockMode).as(ObjectLockMode.class).orElseThrow()); } if (this.objectLockLegalHoldStatus != null) { - builder.objectLockLegalHoldStatus(this.objectLockLegalHoldStatus); + builder.objectLockLegalHoldStatus(runContext.render(this.objectLockLegalHoldStatus).as(ObjectLockLegalHoldStatus.class).orElseThrow()); } if (this.objectLockRetainUntilDate != null) { - builder.objectLockRetainUntilDate(Instant.parse(runContext.render(this.objectLockRetainUntilDate))); + builder.objectLockRetainUntilDate(Instant.parse(runContext.render(this.objectLockRetainUntilDate).as(String.class).orElseThrow())); } if (this.tagging != null) { builder.tagging(Tagging.builder() - .tagSet(runContext.renderMap(this.tagging) + .tagSet(runContext.render(this.tagging).asMap(String.class, String.class) .entrySet() .stream() .map(e -> Tag.builder() diff --git a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java index fe5ed4c..bf2b17e 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java @@ -45,7 +45,7 @@ protected String createBucket(String bucket) throws Exception { CreateBucket createBucket = CreateBucket.builder() .id(AllTest.class.getSimpleName()) .type(CreateBucket.class.getName()) - .bucket(bucket) + .bucket(Property.of(bucket)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) @@ -77,24 +77,24 @@ protected String upload(String dir, String bucket) throws Exception { Upload upload = Upload.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(bucket) + .bucket(Property.of(bucket)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(source.toString()) - .key(dir + "/" + out + ".yml") + .key(Property.of(dir + "/" + out + ".yml")) .build(); upload.run(runContext(upload)); - return upload.getKey(); + return upload.getKey().toString(); } protected List.ListBuilder list() { return List.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) diff --git a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java index 660b97b..326f24c 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java @@ -27,12 +27,12 @@ void run() throws Exception { List list = List.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .prefix("tasks/aws/upload/") + .prefix(Property.of("tasks/aws/upload/")) .build(); List.Output listOutput = list.run(runContext(list)); @@ -42,12 +42,12 @@ void run() throws Exception { Download download = Download.builder() .id(AllTest.class.getSimpleName()) .type(Download.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .key(key) + .key(Property.of(key)) .build(); Download.Output run = download.run(runContext(download)); @@ -61,12 +61,12 @@ void run() throws Exception { Delete delete = Delete.builder() .id(AllTest.class.getSimpleName()) .type(Delete.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .key(key) + .key(Property.of(key)) .build(); Delete.Output deleteOutput = delete.run(runContext(delete)); assertThat(deleteOutput.getDeleteMarker(), is(nullValue())); diff --git a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java index e2c28d6..a21b5ca 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java @@ -24,28 +24,28 @@ void run(Boolean delete) throws Exception { .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(Copy.CopyObjectFrom.builder() - .bucket(this.BUCKET) - .key(upload) + .bucket(Property.of(this.BUCKET)) + .key(Property.of(upload)) .build() ) .to(Copy.CopyObject.builder() - .key(move) + .key(Property.of(move)) .build() ) - .delete(delete) + .delete(Property.of(delete)) .build(); Copy.Output run = task.run(runContext(task)); assertThat(run.getKey(), is(move)); // list - List list = list().prefix(move).build(); + List list = list().prefix(Property.of(move)).build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(1)); // original is here - list = list().prefix(upload).build(); + list = list().prefix(Property.of(upload)).build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(delete ? 0 : 1)); diff --git a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java index 63bf7e6..9b766f9 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java @@ -21,7 +21,7 @@ void run() throws Exception { DeleteList task = DeleteList.builder() .id(ListTest.class.getSimpleName()) .type(List.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) diff --git a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java index 65056bf..b11ecf2 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java @@ -26,12 +26,12 @@ void delete() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .action(ActionInterface.Action.DELETE) + .action(Property.of(ActionInterface.Action.DELETE)) .build(); Downloads.Output run = task.run(runContext(task)); @@ -55,14 +55,14 @@ void move() throws Exception { Downloads task = Downloads.builder() .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) - .bucket("{{bucket}}") + .bucket(new Property<>("{{bucket}}")) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) - .action(ActionInterface.Action.MOVE) + .action(Property.of(ActionInterface.Action.MOVE)) .moveTo(Copy.CopyObject.builder() - .key("/tasks/s3-move") + .key(Property.of("/tasks/s3-move")) .build() ) .build(); @@ -72,11 +72,11 @@ void move() throws Exception { assertThat(run.getObjects().size(), is(2)); assertThat(run.getOutputFiles().size(), is(2)); - List list = list().prefix("/tasks/s3-from").build(); + List list = list().prefix(Property.of("/tasks/s3-from")).build(); List.Output listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(0)); - list = list().prefix("/tasks/s3-move").build(); + list = list().prefix(Property.of("/tasks/s3-move")).build(); listOutput = list.run(runContext(list)); assertThat(listOutput.getObjects().size(), is(2)); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/ListTest.java b/src/test/java/io/kestra/plugin/aws/s3/ListTest.java index e617bf8..4af39d7 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/ListTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/ListTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.s3; +import io.kestra.core.models.property.Property; import io.kestra.core.serializers.JacksonMapper; import io.kestra.core.utils.IdUtils; import org.apache.commons.lang3.StringUtils; @@ -41,22 +42,22 @@ void run() throws Exception { // Dir listing task = list() - .filter(ListInterface.Filter.FILES) - .prefix("/tasks/s3/" + dir + "/sub") + .filter(Property.of(ListInterface.Filter.FILES)) + .prefix(Property.of("/tasks/s3/" + dir + "/sub")) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); // prefix task = list() - .prefix("/tasks/s3/" + dir + "/sub") + .prefix(Property.of("/tasks/s3/" + dir + "/sub")) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); // regexp task = list() - .regexp("/tasks/s3/.*/" + StringUtils.substringAfterLast(lastFileName, "/")) + .regexp(Property.of("/tasks/s3/.*/" + StringUtils.substringAfterLast(lastFileName, "/"))) .build(); run = task.run(runContext(task)); assertThat(run.getObjects().size(), is(1)); diff --git a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java index efa7ea3..dcccafd 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/TriggerTest.java @@ -1,6 +1,7 @@ package io.kestra.plugin.aws.s3; import io.kestra.core.models.executions.Execution; +import io.kestra.core.models.property.Property; import io.kestra.core.queues.QueueFactoryInterface; import io.kestra.core.queues.QueueInterface; import io.kestra.core.repositories.LocalFlowRepositoryLoader; @@ -43,7 +44,7 @@ class TriggerTest extends AbstractTest { void deleteAction() throws Exception { String bucket = "trigger-test"; this.createBucket(bucket); - List listTask = list().bucket(bucket).build(); + List listTask = list().bucket(Property.of(bucket)).build(); // mock flow listeners CountDownLatch queueCount = new CountDownLatch(1); @@ -100,7 +101,7 @@ void deleteAction() throws Exception { void noneAction() throws Exception { String bucket = "trigger-none-action-test"; this.createBucket(bucket); - List listTask = list().bucket(bucket).build(); + List listTask = list().bucket(Property.of(bucket)).build(); // wait for execution CountDownLatch queueCount = new CountDownLatch(1); diff --git a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java index cc72e34..808e5f0 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java @@ -24,13 +24,13 @@ void run() throws Exception { Upload upload = Upload.builder() .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(java.util.List.of(source1.toString(), source2.toString(), source3.toString(), source4.toString())) - .key(IdUtils.create() + "/") + .key(Property.of(IdUtils.create() + "/")) .build(); upload.run(runContext(upload)); @@ -38,7 +38,7 @@ void run() throws Exception { List list = List.builder() .id(UploadsTest.class.getSimpleName()) .type(Upload.class.getName()) - .bucket(this.BUCKET) + .bucket(Property.of(this.BUCKET)) .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) .accessKeyId(Property.of(localstack.getAccessKey())) .secretKeyId(Property.of(localstack.getSecretKey()))