Skip to content

Commit

Permalink
refactor: move s3 to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Dec 16, 2024
1 parent 99bb1af commit cc42920
Show file tree
Hide file tree
Showing 22 changed files with 219 additions and 260 deletions.
5 changes: 3 additions & 2 deletions src/main/java/io/kestra/plugin/aws/s3/AbstractS3Object.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.kestra.plugin.aws.s3;

import io.kestra.core.models.property.Property;
import io.kestra.plugin.aws.AbstractConnection;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -13,9 +14,9 @@
@Getter
@NoArgsConstructor
public abstract class AbstractS3Object extends AbstractConnection implements AbstractS3ObjectInterface {
protected String requestPayer;
protected Property<String> requestPayer;

protected String bucket;
protected Property<String> bucket;

static {
// Initializing CRT will download the S3 native library into /tmp.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,19 @@
package io.kestra.plugin.aws.s3;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.constraints.NotNull;

public interface AbstractS3ObjectInterface extends AbstractS3 {
@Schema(
title = "The S3 bucket name."
)
@PluginProperty(dynamic = true)
@NotNull
String getBucket();
Property<String> getBucket();

@Schema(
title = "Sets the value of the RequestPayer property for this object."
)
@PluginProperty(dynamic = true)
String getRequestPayer();
Property<String> getRequestPayer();
}
4 changes: 2 additions & 2 deletions src/main/java/io/kestra/plugin/aws/s3/ActionInterface.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.kestra.plugin.aws.s3;

import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.swagger.v3.oas.annotations.media.Schema;

import jakarta.validation.constraints.NotNull;
Expand All @@ -9,9 +10,8 @@ public interface ActionInterface {
@Schema(
title = "The action to perform on the retrieved files. If using 'NONE' make sure to handle the files inside your flow to avoid infinite triggering."
)
@PluginProperty(dynamic = true)
@NotNull
ActionInterface.Action getAction();
Property<Action> getAction();

@Schema(
title = "The destination bucket and key for `MOVE` action."
Expand Down
29 changes: 13 additions & 16 deletions src/main/java/io/kestra/plugin/aws/s3/Copy.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.AbstractConnection;
Expand Down Expand Up @@ -62,27 +63,26 @@ public class Copy extends AbstractConnection implements AbstractS3, RunnableTask
@Schema(
title = "Whether to delete the source file after download."
)
@PluginProperty
@Builder.Default
private Boolean delete = false;
private Property<Boolean> delete = Property.of(false);

@Override
public Output run(RunContext runContext) throws Exception {
try (S3Client client = this.client(runContext)) {
CopyObjectRequest.Builder builder = CopyObjectRequest.builder()
.sourceBucket(runContext.render(this.from.bucket))
.sourceKey(runContext.render(this.from.key))
.destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket))
.destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key));
.sourceBucket(runContext.render(this.from.bucket).as(String.class).orElseThrow())
.sourceKey(runContext.render(this.from.key).as(String.class).orElseThrow())
.destinationBucket(runContext.render(this.to.bucket != null ? this.to.bucket : this.from.bucket).as(String.class).orElseThrow())
.destinationKey(runContext.render(this.to.key != null ? this.to.key : this.from.key).as(String.class).orElseThrow());

if (this.from.versionId != null) {
builder.sourceVersionId(runContext.render(this.from.versionId));
builder.sourceVersionId(runContext.render(this.from.versionId).as(String.class).orElseThrow());
}

CopyObjectRequest request = builder.build();
CopyObjectResponse response = client.copyObject(request);

if (this.delete) {
if (runContext.render(this.delete).as(Boolean.class).orElseThrow()) {
Delete.builder()
.id(this.id)
.type(Delete.class.getName())
Expand All @@ -96,8 +96,8 @@ public Output run(RunContext runContext) throws Exception {
.stsRoleSessionDuration(this.stsRoleSessionDuration)
.stsRoleArn(this.stsRoleArn)
.stsEndpointOverride(this.stsEndpointOverride)
.bucket(request.sourceBucket())
.key(request.sourceKey())
.bucket(Property.of(request.sourceBucket()))
.key(Property.of(request.sourceKey()))
.build()
.run(runContext);
}
Expand All @@ -118,16 +118,14 @@ public static class CopyObject {
@Schema(
title = "The bucket name"
)
@PluginProperty(dynamic = true)
@NotNull
String bucket;
Property<String> bucket;

@Schema(
title = "The bucket key"
)
@PluginProperty(dynamic = true)
@NotNull
String key;
Property<String> key;
}

@SuperBuilder(toBuilder = true)
Expand All @@ -137,8 +135,7 @@ public static class CopyObjectFrom extends CopyObject {
@Schema(
title = "The specific version of the object."
)
@PluginProperty(dynamic = true)
private String versionId;
private Property<String> versionId;
}

@SuperBuilder
Expand Down
45 changes: 17 additions & 28 deletions src/main/java/io/kestra/plugin/aws/s3/CreateBucket.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.AbstractConnection;
Expand Down Expand Up @@ -46,91 +47,79 @@ public class CreateBucket extends AbstractConnection implements AbstractS3, Runn
@Schema(
description = "The S3 bucket name to create."
)
@PluginProperty(dynamic = true)
@NotNull
private String bucket;
private Property<String> bucket;

@Schema(
description = "Allows grantee the read, write, read ACP, and write ACP permissions on the bucket."
)
@PluginProperty(dynamic = true)
private String grantFullControl;
private Property<String> grantFullControl;

@Schema(
title = "Allows grantee to list the objects in the bucket."
)
@PluginProperty(dynamic = true)
private String grantRead;
private Property<String> grantRead;

@Schema(
title = "Allows grantee to list the ACL for the applicable bucket."
)
@PluginProperty(dynamic = true)
private String grantReadACP;
private Property<String> grantReadACP;

@Schema(
title = "Allows grantee to create, overwrite, and delete any object in the bucket."
)
@PluginProperty(dynamic = true)
private String grantWrite;
private Property<String> grantWrite;

@Schema(
title = "Allows grantee to write the ACL for the applicable bucket."
)
@PluginProperty(dynamic = true)
private String grantWriteACP;
private Property<String> grantWriteACP;

@Schema(
title = "The canned ACL to apply to the bucket."
)
@PluginProperty(dynamic = true)
private String acl;
private Property<String> acl;

@Schema(
title = "Specifies whether you want S3 Object Lock to be enabled for the new bucket."
)
@PluginProperty
private Boolean objectLockEnabledForBucket;
private Property<Boolean> objectLockEnabledForBucket;

@Override
public Output run(RunContext runContext) throws Exception {
String bucket = runContext.render(this.bucket);
String bucket = runContext.render(this.bucket).as(String.class).orElseThrow();

try (S3Client client = this.client(runContext)) {
CreateBucketRequest.Builder builder = CreateBucketRequest.builder()
.bucket(bucket);

if (grantFullControl != null) {
builder.grantFullControl(runContext.render(this.grantFullControl));
builder.grantFullControl(runContext.render(this.grantFullControl).as(String.class).orElseThrow());
}

if (grantRead != null) {
builder.grantRead(runContext.render(this.grantRead));
builder.grantRead(runContext.render(this.grantRead).as(String.class).orElseThrow());
}

if (grantReadACP != null) {
builder.grantReadACP(runContext.render(this.grantReadACP));
builder.grantReadACP(runContext.render(this.grantReadACP).as(String.class).orElseThrow());
}

if (grantWrite != null) {
builder.grantWrite(runContext.render(this.grantWrite));
builder.grantWrite(runContext.render(this.grantWrite).as(String.class).orElseThrow());
}


if (grantWriteACP != null) {
builder.grantWriteACP(runContext.render(this.grantWriteACP));
builder.grantWriteACP(runContext.render(this.grantWriteACP).as(String.class).orElseThrow());
}

if (acl != null) {
builder.acl(runContext.render(this.acl));
builder.acl(runContext.render(this.acl).as(String.class).orElseThrow());
}

if (objectLockEnabledForBucket != null) {
builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket);
}

if (objectLockEnabledForBucket != null) {
builder.objectLockEnabledForBucket(this.objectLockEnabledForBucket);
builder.objectLockEnabledForBucket(runContext.render(this.objectLockEnabledForBucket).as(Boolean.class).orElseThrow());
}

CreateBucketResponse response = client.createBucket(builder.build());
Expand Down
23 changes: 10 additions & 13 deletions src/main/java/io/kestra/plugin/aws/s3/Delete.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -45,51 +46,47 @@ public class Delete extends AbstractS3Object implements RunnableTask<Delete.Outp
@Schema(
title = "The key to delete."
)
@PluginProperty(dynamic = true)
@NotNull
private String key;
private Property<String> key;

@Schema(
title = "Indicates whether S3 Object Lock should bypass Governance-mode restrictions to process this operation."
)
@PluginProperty
private Boolean bypassGovernanceRetention;
private Property<Boolean> bypassGovernanceRetention;

@Schema(
title = "The concatenation of the authentication device's serial number, a space, and the value that is displayed on " +
"your authentication device.",
description = "Required to permanently delete a versioned object if versioning is configured " +
"with MFA delete enabled."
)
@PluginProperty(dynamic = true)
private String mfa;
private Property<String> mfa;

@Schema(
description = "Sets the value of the RequestPayer property for this object."
)
@PluginProperty(dynamic = true)
private String requestPayer;
private Property<String> requestPayer;

@Override
public Output run(RunContext runContext) throws Exception {
String bucket = runContext.render(this.bucket);
String key = runContext.render(this.key);
String bucket = runContext.render(this.bucket).as(String.class).orElseThrow();
String key = runContext.render(this.key).as(String.class).orElseThrow();

try (S3Client client = client(runContext)) {
DeleteObjectRequest.Builder builder = DeleteObjectRequest.builder()
.bucket(bucket)
.key(key);

if (this.bypassGovernanceRetention != null) {
builder.bypassGovernanceRetention(this.bypassGovernanceRetention);
builder.bypassGovernanceRetention(runContext.render(this.bypassGovernanceRetention).as(Boolean.class).orElseThrow());
}

if (this.mfa != null) {
builder.mfa(runContext.render(this.mfa));
builder.mfa(runContext.render(this.mfa).as(String.class).orElseThrow());
}

if (this.requestPayer != null) {
builder.requestPayer(runContext.render(this.requestPayer));
builder.requestPayer(runContext.render(this.requestPayer).as(String.class).orElseThrow());
}

DeleteObjectResponse response = client.deleteObject(builder.build());
Expand Down
Loading

0 comments on commit cc42920

Please sign in to comment.