Skip to content

Commit

Permalink
Allow to use DefaultAsyncClient (#237)
Browse files Browse the repository at this point in the history
* feat: add configuration to use DefaultAsyncClient

* feat: update configuration name

---------

Co-authored-by: Sébastien ALLEMAND <[email protected]>
  • Loading branch information
2 people authored and loicmathieu committed Sep 1, 2023
1 parent cddd445 commit 22f6485
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 18 deletions.
2 changes: 2 additions & 0 deletions src/main/java/io/kestra/plugin/aws/AbstractConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public abstract class AbstractConnection extends Task implements AbstractConnect

protected String endpointOverride;

private Boolean compatibilityMode;

protected AwsCredentialsProvider credentials(RunContext runContext) throws IllegalVariableEvaluationException {
String accessKeyId = runContext.render(this.accessKeyId);
String secretKeyId = runContext.render(this.secretKeyId);
Expand Down
36 changes: 27 additions & 9 deletions src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
@EqualsAndHashCode
@Getter
@NoArgsConstructor
public abstract class AbstractS3 extends AbstractConnection {
public abstract class AbstractS3 extends AbstractConnection {

protected S3Client client(RunContext runContext) throws IllegalVariableEvaluationException {
S3ClientBuilder s3ClientBuilder = S3Client.builder()
Expand All @@ -36,18 +36,36 @@ protected S3Client client(RunContext runContext) throws IllegalVariableEvaluatio

return s3ClientBuilder.build();
}

protected S3AsyncClient asyncClient(RunContext runContext) throws IllegalVariableEvaluationException {
S3CrtAsyncClientBuilder s3ClientBuilder = S3AsyncClient.crtBuilder()
.credentialsProvider(this.credentials(runContext));

if (this.region != null) {
s3ClientBuilder.region(Region.of(runContext.render(this.region)));
}
if (this.getCompatibilityMode()) {
S3AsyncClientBuilder s3ClientBuilder = S3AsyncClient.builder()
.credentialsProvider(this.credentials(runContext));

if (this.endpointOverride != null) {
s3ClientBuilder.endpointOverride(URI.create(runContext.render(this.endpointOverride)));
if (this.region != null) {
s3ClientBuilder.region(Region.of(runContext.render(this.region)));
}

if (this.endpointOverride != null) {
s3ClientBuilder.endpointOverride(URI.create(runContext.render(this.endpointOverride)));
}
return s3ClientBuilder.build();

} else {
S3CrtAsyncClientBuilder s3ClientBuilder = S3AsyncClient.crtBuilder()
.credentialsProvider(this.credentials(runContext));

if (this.region != null) {
s3ClientBuilder.region(Region.of(runContext.render(this.region)));
}

if (this.endpointOverride != null) {
s3ClientBuilder.endpointOverride(URI.create(runContext.render(this.endpointOverride)));
}

return s3ClientBuilder.build();
}

return s3ClientBuilder.build();
}
}
12 changes: 8 additions & 4 deletions src/main/java/io/kestra/plugin/aws/s3/Download.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,7 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import org.apache.commons.lang3.tuple.Pair;
import software.amazon.awssdk.services.s3.S3AsyncClient;
Expand Down Expand Up @@ -54,6 +51,13 @@ public class Download extends AbstractS3Object implements RunnableTask<Download.
@PluginProperty(dynamic = true)
protected String versionId;

@Schema(
title = "this property will use the AsynS3Client instead of the S3CrtAsynClient which maximize compatibility with S3-compatible services but restrict uploads and downloads to 2GB"
)
@PluginProperty
@Builder.Default
private Boolean compatibilityMode = false;

@Override
public Output run(RunContext runContext) throws Exception {
String bucket = runContext.render(this.bucket);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/io/kestra/plugin/aws/s3/Downloads.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.aws.s3.models.S3Object;
Expand Down Expand Up @@ -51,6 +52,14 @@ public class Downloads extends AbstractS3Object implements RunnableTask<List.Out
@Builder.Default
private Integer maxKeys = 1000;

@Schema(
title = "this property will use the AsynS3Client instead of the S3CrtAsynClient which maximize compatibility with S3-compatible services but restrict uploads and downloads to 2GB"
)
@PluginProperty
@Builder.Default
private Boolean compatibilityMode = false;


private String expectedBucketOwner;

protected String regexp;
Expand Down
14 changes: 9 additions & 5 deletions src/main/java/io/kestra/plugin/aws/s3/Upload.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,12 @@
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.*;
import lombok.experimental.SuperBuilder;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;
import software.amazon.awssdk.transfer.s3.S3TransferManager;
import software.amazon.awssdk.transfer.s3.model.CompletedFileUpload;
import software.amazon.awssdk.transfer.s3.model.FileUpload;
import software.amazon.awssdk.transfer.s3.model.UploadFileRequest;

Expand Down Expand Up @@ -67,6 +63,13 @@ public class Upload extends AbstractS3Object implements RunnableTask<Upload.Outp
@PluginProperty
private Map<String, String> metadata;

@Schema(
title = "this property will use the AsynS3Client instead of the S3CrtAsynClient which maximize compatibility with S3-compatible services but restrict uploads and downloads to 2GB"
)
@PluginProperty
@Builder.Default
private Boolean compatibilityMode = false;

@Schema(
title = "If you don't specify, S3 Standard is the default storage class. Amazon S3 supports other storage classes."
)
Expand All @@ -79,6 +82,7 @@ public Output run(RunContext runContext) throws Exception {
String key = runContext.render(this.key);

try (S3AsyncClient client = this.asyncClient(runContext)) {

File tempFile = runContext.tempFile().toFile();
URI from = new URI(runContext.render(this.from));
Files.copy(runContext.uriToInputStream(from), tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
Expand Down

0 comments on commit 22f6485

Please sign in to comment.