From 8ac933f60d033100c206d6328f950e2a5c8aa027 Mon Sep 17 00:00:00 2001 From: Mathieu Gabelle Date: Mon, 16 Dec 2024 15:50:12 +0100 Subject: [PATCH] refactor: move Query and abstract connection to dynamic properties --- .../kestra/plugin/aws/AbstractConnection.java | 20 +++---- .../aws/AbstractConnectionInterface.java | 52 ++++++++----------- .../io/kestra/plugin/aws/athena/Query.java | 32 +++++------- .../io/kestra/plugin/aws/auth/EksToken.java | 6 +-- .../java/io/kestra/plugin/aws/cli/AwsCLI.java | 19 +++---- .../io/kestra/plugin/aws/s3/AbstractS3.java | 2 +- .../io/kestra/plugin/aws/s3/Download.java | 3 +- .../io/kestra/plugin/aws/s3/Downloads.java | 3 +- .../java/io/kestra/plugin/aws/s3/Trigger.java | 28 +++++----- .../java/io/kestra/plugin/aws/s3/Upload.java | 5 +- .../plugin/aws/sqs/RealtimeTrigger.java | 28 +++++----- .../io/kestra/plugin/aws/sqs/Trigger.java | 26 +++++----- .../kestra/plugin/aws/athena/QueryTest.java | 12 ++--- .../io/kestra/plugin/aws/cli/AwsCLITest.java | 6 +-- .../plugin/aws/dynamodb/DeleteItemTest.java | 6 +-- .../plugin/aws/dynamodb/GetItemTest.java | 6 +-- .../plugin/aws/dynamodb/PutItemTest.java | 12 ++--- .../kestra/plugin/aws/dynamodb/QueryTest.java | 30 +++++------ .../kestra/plugin/aws/dynamodb/ScanTest.java | 30 +++++------ .../plugin/aws/ecr/GetAuthTokenTest.java | 6 +-- .../plugin/aws/eventbridge/PutEventsTest.java | 24 ++++----- .../plugin/aws/kinesis/PutRecordsTest.java | 18 +++---- .../kestra/plugin/aws/lambda/InvokeTest.java | 18 +++---- .../plugin/aws/lambda/InvokeUnitTest.java | 24 ++++----- .../io/kestra/plugin/aws/s3/AbstractTest.java | 18 +++---- .../java/io/kestra/plugin/aws/s3/AllTest.java | 18 +++---- .../io/kestra/plugin/aws/s3/CopyTest.java | 6 +-- .../kestra/plugin/aws/s3/DeleteListTest.java | 6 +-- .../kestra/plugin/aws/s3/DownloadsTest.java | 12 ++--- .../io/kestra/plugin/aws/s3/UploadsTest.java | 12 ++--- .../io/kestra/plugin/aws/sns/PublishTest.java | 6 +-- .../plugin/aws/sqs/AbstractSqsTest.java | 1 + .../aws/sqs/PublishThenConsumeTest.java | 24 ++++----- .../plugin/aws/sqs/RealtimeTriggerTest.java | 6 +-- .../io/kestra/plugin/aws/sqs/TriggerTest.java | 6 +-- 35 files changed, 261 insertions(+), 270 deletions(-) diff --git a/src/main/java/io/kestra/plugin/aws/AbstractConnection.java b/src/main/java/io/kestra/plugin/aws/AbstractConnection.java index b338207..54af953 100644 --- a/src/main/java/io/kestra/plugin/aws/AbstractConnection.java +++ b/src/main/java/io/kestra/plugin/aws/AbstractConnection.java @@ -16,21 +16,21 @@ public abstract class AbstractConnection extends Task implements AbstractConnectionInterface { protected Property region; - protected String endpointOverride; - protected Boolean compatibilityMode; + protected Property endpointOverride; + protected Property compatibilityMode; // Configuration for StaticCredentialsProvider - protected String accessKeyId; - protected String secretKeyId; - protected String sessionToken; + protected Property accessKeyId; + protected Property secretKeyId; + protected Property sessionToken; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); /** * Common AWS Client configuration properties. diff --git a/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java b/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java index 08e9a42..7097d95 100644 --- a/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java +++ b/src/main/java/io/kestra/plugin/aws/AbstractConnectionInterface.java @@ -16,56 +16,48 @@ public interface AbstractConnectionInterface { title = "Access Key Id in order to connect to AWS.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getAccessKeyId(); + Property getAccessKeyId(); @Schema( title = "Secret Key Id in order to connect to AWS.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getSecretKeyId(); + Property getSecretKeyId(); @Schema( title = "AWS session token, retrieved from an AWS token service, used for authenticating that this user has received temporary permissions to access a given resource.", description = "If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getSessionToken(); + Property getSessionToken(); @Schema( title = "AWS STS Role.", description = "The Amazon Resource Name (ARN) of the role to assume. If set the task will use the `StsAssumeRoleCredentialsProvider`. If no credentials are defined, we will use the [default credentials provider chain](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials-chain.html) to fetch credentials." ) - @PluginProperty(dynamic = true) - String getStsRoleArn(); + Property getStsRoleArn(); @Schema( title = "AWS STS External Id.", description = " A unique identifier that might be required when you assume a role in another account. This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty(dynamic = true) - String getStsRoleExternalId(); + Property getStsRoleExternalId(); @Schema( title = "AWS STS Session name.", description = "This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty(dynamic = true) - String getStsRoleSessionName(); + Property getStsRoleSessionName(); @Schema( title = "AWS STS Session duration.", description = "The duration of the role session (default: 15 minutes, i.e., PT15M). This property is only used when an `stsRoleArn` is defined." ) - @PluginProperty - Duration getStsRoleSessionDuration(); + Property getStsRoleSessionDuration(); @Schema( title = "The AWS STS endpoint with which the SDKClient should communicate." ) - @PluginProperty(dynamic = true) - String getStsEndpointOverride(); + Property getStsEndpointOverride(); @Schema( title = "AWS region with which the SDK should communicate." @@ -76,26 +68,24 @@ public interface AbstractConnectionInterface { title = "The endpoint with which the SDK should communicate.", description = "This property allows you to use a different S3 compatible storage backend." ) - @PluginProperty(dynamic = true) - String getEndpointOverride(); + Property getEndpointOverride(); - @PluginProperty(dynamic = true) - default Boolean getCompatibilityMode() { - return false; + default Property getCompatibilityMode() { + return Property.of(false); } default AbstractConnection.AwsClientConfig awsClientConfig(final RunContext runContext) throws IllegalVariableEvaluationException { return new AbstractConnection.AwsClientConfig( - runContext.render(this.getAccessKeyId()), - runContext.render(this.getSecretKeyId()), - runContext.render(this.getSessionToken()), - runContext.render(this.getStsRoleArn()), - runContext.render(this.getStsRoleExternalId()), - runContext.render(this.getStsRoleSessionName()), - runContext.render(this.getStsEndpointOverride()), - getStsRoleSessionDuration(), - this.getRegion() == null ? null : this.getRegion().as(runContext, String.class), - runContext.render(this.getEndpointOverride()) + runContext.render(this.getAccessKeyId()).as(String.class).orElse(null), + runContext.render(this.getSecretKeyId()).as(String.class).orElse(null), + runContext.render(this.getSessionToken()).as(String.class).orElse(null), + runContext.render(this.getStsRoleArn()).as(String.class).orElse(null), + runContext.render(this.getStsRoleExternalId()).as(String.class).orElse(null), + runContext.render(this.getStsRoleSessionName()).as(String.class).orElse(null), + runContext.render(this.getStsEndpointOverride()).as(String.class).orElse(null), + runContext.render(this.getStsRoleSessionDuration()).as(Duration.class).orElse(null), + runContext.render(this.getRegion()).as(String.class).orElse(null), + runContext.render(this.getEndpointOverride()).as(String.class).orElse(null) ); } } diff --git a/src/main/java/io/kestra/plugin/aws/athena/Query.java b/src/main/java/io/kestra/plugin/aws/athena/Query.java index 19fcb54..8f4ca72 100644 --- a/src/main/java/io/kestra/plugin/aws/athena/Query.java +++ b/src/main/java/io/kestra/plugin/aws/athena/Query.java @@ -5,6 +5,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.Output; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.models.tasks.common.FetchType; @@ -49,7 +50,7 @@ title = "Query an Athena table.", description = """ The query will wait for completion, except if fetchMode is set to `NONE`, and will output converted rows. - Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html). + Row conversion is based on the types listed [here](https://docs.aws.amazon.com/athena/latest/ug/data-types.html). Complex data types like array, map and struct will be converted to a string.""" ) @Plugin( @@ -78,26 +79,22 @@ ) public class Query extends AbstractConnection implements RunnableTask { @Schema(title = "Athena catalog.") - @PluginProperty(dynamic = true) - private String catalog; + private Property catalog; @Schema(title = "Athena database.") @NotNull - @PluginProperty(dynamic = true) - private String database; + private Property database; @Schema( title = "Athena output location.", description = "The query results will be stored in this output location. Must be an existing S3 bucket." ) @NotNull - @PluginProperty(dynamic = true) - private String outputLocation; + private Property outputLocation; @Schema(title = "Athena SQL query.") @NotNull - @PluginProperty(dynamic = true) - private String query; + private Property query; @Schema( title = "The way you want to store the data.", @@ -107,15 +104,13 @@ public class Query extends AbstractConnection implements RunnableTask fetchType = Property.of(FetchType.STORE); @Schema(title = "Whether to skip the first row which is usually the header.") @NotNull - @PluginProperty @Builder.Default - private boolean skipHeader = true; + private Property skipHeader = Property.of(true); private static DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); @@ -125,23 +120,24 @@ public class Query extends AbstractConnection implements RunnableTask results = getQueryResultsResults.resultSet().rows(); - if (skipHeader && results != null && !results.isEmpty()) { + if (runContext.render(skipHeader).as(Boolean.class).orElseThrow() && results != null && !results.isEmpty()) { // we skip the first row, this is usually needed as by default Athena returns the header as the first row results = results.subList(1, results.size()); } diff --git a/src/main/java/io/kestra/plugin/aws/auth/EksToken.java b/src/main/java/io/kestra/plugin/aws/auth/EksToken.java index d898c62..3e6cce5 100644 --- a/src/main/java/io/kestra/plugin/aws/auth/EksToken.java +++ b/src/main/java/io/kestra/plugin/aws/auth/EksToken.java @@ -70,18 +70,18 @@ public Output run(RunContext runContext) throws Exception { if(this.getRegion() == null) { throw new RuntimeException("Region is required"); } - final Region awsRegion = Region.of(this.getRegion().as(runContext, String.class)); + final Region awsRegion = Region.of(runContext.render(this.getRegion()).as(String.class).orElseThrow()); SdkHttpFullRequest requestToSign = SdkHttpFullRequest .builder() .method(SdkHttpMethod.GET) .uri(getStsRegionalEndpointUri(runContext, awsRegion)) - .appendHeader("x-k8s-aws-id", this.clusterName.as(runContext, String.class)) + .appendHeader("x-k8s-aws-id", runContext.render(this.clusterName).as(String.class).orElseThrow()) .appendRawQueryParameter("Action", "GetCallerIdentity") .appendRawQueryParameter("Version", "2011-06-15") .build(); - ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(expirationDuration.as(runContext, Long.class)); + ZonedDateTime expirationDate = ZonedDateTime.now().plusSeconds(runContext.render(expirationDuration).as(Long.class).orElseThrow()); Aws4PresignerParams presignerParams = Aws4PresignerParams.builder() .awsCredentials(ConnectionUtils.credentialsProvider(this.awsClientConfig(runContext)).resolveCredentials()) .signingRegion(awsRegion) diff --git a/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java b/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java index fba20e2..b05e734 100644 --- a/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java +++ b/src/main/java/io/kestra/plugin/aws/cli/AwsCLI.java @@ -20,6 +20,7 @@ import lombok.*; import lombok.experimental.SuperBuilder; +import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -148,19 +149,19 @@ public ScriptOutput run(RunContext runContext) throws Exception { // hack for missing env vars supports: https://github.com/aws/aws-cli/issues/5639 if (this.stsRoleArn != null) { - allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn)); + allCommands.add("aws configure set role_arn " + runContext.render(this.stsRoleArn).as(String.class).orElseThrow()); } if (this.stsRoleSessionName != null) { - allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName)); + allCommands.add("aws configure set role_session_name " + runContext.render(this.stsRoleSessionName).as(String.class).orElseThrow()); } if (this.stsRoleExternalId != null) { - allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId)); + allCommands.add("aws configure set external_id " + runContext.render(this.stsRoleExternalId).as(String.class).orElseThrow()); } if (this.stsRoleSessionDuration != null) { - allCommands.add("aws configure set duration_seconds " + stsRoleSessionDuration.getSeconds()); + allCommands.add("aws configure set duration_seconds " + runContext.render(stsRoleSessionDuration).as(Duration.class).orElseThrow().getSeconds()); } if (this.stsCredentialSource != null) { @@ -206,23 +207,23 @@ private Map getEnv(RunContext runContext) throws IllegalVariable Map envs = new HashMap<>(); if (this.accessKeyId != null) { - envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId)); + envs.put("AWS_ACCESS_KEY_ID", runContext.render(this.accessKeyId).as(String.class).orElseThrow()); } if (this.secretKeyId != null) { - envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId)); + envs.put("AWS_SECRET_ACCESS_KEY", runContext.render(this.secretKeyId).as(String.class).orElseThrow()); } if (this.region != null) { - envs.put("AWS_DEFAULT_REGION", this.region.as(runContext, String.class)); + envs.put("AWS_DEFAULT_REGION", runContext.render(this.region).as(String.class).orElseThrow()); } if (this.sessionToken != null) { - envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken)); + envs.put("AWS_SESSION_TOKEN", runContext.render(this.sessionToken).as(String.class).orElseThrow()); } if (this.endpointOverride != null) { - envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride)); + envs.put("AWS_ENDPOINT_URL", runContext.render(this.endpointOverride).as(String.class).orElseThrow()); } envs.put("AWS_DEFAULT_OUTPUT", this.outputFormat.toString()); diff --git a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java index 0f7cb07..4f62fb5 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java +++ b/src/main/java/io/kestra/plugin/aws/s3/AbstractS3.java @@ -20,7 +20,7 @@ default S3Client client(final RunContext runContext) throws IllegalVariableEvalu default S3AsyncClient asyncClient(final RunContext runContext) throws IllegalVariableEvaluationException { final AbstractConnection.AwsClientConfig clientConfig = awsClientConfig(runContext); - if (this.getCompatibilityMode()) { + if (runContext.render(this.getCompatibilityMode()).as(Boolean.class).orElse(false)) { return ConnectionUtils.configureAsyncClient(clientConfig, S3AsyncClient.builder()).build(); } else { S3CrtAsyncClientBuilder s3ClientBuilder = S3AsyncClient.crtBuilder() diff --git a/src/main/java/io/kestra/plugin/aws/s3/Download.java b/src/main/java/io/kestra/plugin/aws/s3/Download.java index 01e006d..5a5c6ac 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Download.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Download.java @@ -3,6 +3,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.swagger.v3.oas.annotations.media.Schema; @@ -65,7 +66,7 @@ public class Download extends AbstractS3Object implements RunnableTask compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { diff --git a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java index 2af7d90..d8d5fe0 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Downloads.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Downloads.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Example; import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.plugin.aws.s3.models.S3Object; @@ -67,7 +68,7 @@ public class Downloads extends AbstractS3Object implements RunnableTask compatibilityMode = Property.of(false); private String expectedBucketOwner; diff --git a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java index 06e13bd..7d51daa 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Trigger.java @@ -40,7 +40,7 @@ code = """ id: s3_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -49,7 +49,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.aws.s3.Trigger @@ -71,7 +71,7 @@ code = """ id: s3_listen namespace: company.team - + tasks: - id: each type: io.kestra.plugin.core.flow.ForEach @@ -80,7 +80,7 @@ - id: return type: io.kestra.plugin.core.debug.Return format: "{{ taskrun.value }}" - + - id: delete type: io.kestra.plugin.aws.s3.Delete accessKeyId: "" @@ -88,7 +88,7 @@ region: "eu-central-1" bucket: "my-bucket" key: "{{ taskrun.value }}" - + triggers: - id: watch type: io.kestra.plugin.aws.s3.Trigger @@ -107,15 +107,15 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, @Builder.Default private final Duration interval = Duration.ofSeconds(60); - protected String accessKeyId; + protected Property accessKeyId; - protected String secretKeyId; + protected Property secretKeyId; - protected String sessionToken; + protected Property sessionToken; protected Property region; - protected String endpointOverride; + protected Property endpointOverride; protected String requestPayer; @@ -144,12 +144,12 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, private Copy.CopyObject moveTo; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); @Override public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { diff --git a/src/main/java/io/kestra/plugin/aws/s3/Upload.java b/src/main/java/io/kestra/plugin/aws/s3/Upload.java index 5e1e804..43b5aca 100644 --- a/src/main/java/io/kestra/plugin/aws/s3/Upload.java +++ b/src/main/java/io/kestra/plugin/aws/s3/Upload.java @@ -4,6 +4,7 @@ import io.kestra.core.models.annotations.Plugin; import io.kestra.core.models.annotations.PluginProperty; import io.kestra.core.models.executions.metrics.Counter; +import io.kestra.core.models.property.Property; import io.kestra.core.models.tasks.RunnableTask; import io.kestra.core.runners.RunContext; import io.kestra.core.serializers.JacksonMapper; @@ -45,7 +46,7 @@ inputs: - id: myfile type: FILE - + tasks: - id: upload type: io.kestra.plugin.aws.s3.Upload @@ -206,7 +207,7 @@ public class Upload extends AbstractS3Object implements RunnableTask compatibilityMode = Property.of(false); @Override public Output run(RunContext runContext) throws Exception { diff --git a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java index 724925c..e275565 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java @@ -54,7 +54,7 @@ accessKeyId: "access_key" secretKeyId: "secret_key" region: "eu-central-1" - queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/test-queue""" + queueUrl: https://sqs.eu-central-1.amazonaws.com/000000000000/test-queue""" ) } ) @@ -62,15 +62,15 @@ public class RealtimeTrigger extends AbstractTrigger implements RealtimeTriggerI private String queueUrl; - private String accessKeyId; + private Property accessKeyId; - private String secretKeyId; + private Property secretKeyId; - private String sessionToken; + private Property sessionToken; private Property region; - private String endpointOverride; + private Property endpointOverride; @Builder.Default @PluginProperty @@ -79,12 +79,12 @@ public class RealtimeTrigger extends AbstractTrigger implements RealtimeTriggerI private SerdeType serdeType = SerdeType.STRING; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); // Default read timeout is 20s, so we cannot use a bigger wait time, or we would need to increase the read timeout. @PluginProperty @@ -121,11 +121,11 @@ public Publisher evaluate(ConditionContext conditionContext, TriggerC Consume task = Consume.builder() .queueUrl(runContext.render(queueUrl)) - .accessKeyId(runContext.render(accessKeyId)) - .secretKeyId(runContext.render(secretKeyId)) - .sessionToken(runContext.render(sessionToken)) + .accessKeyId(accessKeyId) + .secretKeyId(secretKeyId) + .sessionToken(sessionToken) .region(region) - .endpointOverride(runContext.render(endpointOverride)) + .endpointOverride(endpointOverride) .serdeType(this.serdeType) .stsRoleArn(this.stsRoleArn) .stsRoleSessionName(this.stsRoleSessionName) diff --git a/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java b/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java index 2b70f76..efc3e2b 100644 --- a/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java +++ b/src/main/java/io/kestra/plugin/aws/sqs/Trigger.java @@ -57,15 +57,15 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, private String queueUrl; - private String accessKeyId; + private Property accessKeyId; - private String secretKeyId; + private Property secretKeyId; - private String sessionToken; + private Property sessionToken; private Property region; - private String endpointOverride; + private Property endpointOverride; @Builder.Default private final Duration interval = Duration.ofSeconds(60); @@ -85,12 +85,12 @@ public class Trigger extends AbstractTrigger implements PollingTriggerInterface, private SerdeType serdeType = SerdeType.STRING; // Configuration for AWS STS AssumeRole - protected String stsRoleArn; - protected String stsRoleExternalId; - protected String stsRoleSessionName; - protected String stsEndpointOverride; + protected Property stsRoleArn; + protected Property stsRoleExternalId; + protected Property stsRoleSessionName; + protected Property stsEndpointOverride; @Builder.Default - protected Duration stsRoleSessionDuration = AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION; + protected Property stsRoleSessionDuration = Property.of(AbstractConnectionInterface.AWS_MIN_STS_ROLE_SESSION_DURATION); @Override public Optional evaluate(ConditionContext conditionContext, TriggerContext context) throws Exception { @@ -99,11 +99,11 @@ public Optional evaluate(ConditionContext conditionContext, TriggerCo Consume task = Consume.builder() .queueUrl(runContext.render(queueUrl)) - .accessKeyId(runContext.render(accessKeyId)) - .secretKeyId(runContext.render(secretKeyId)) - .sessionToken(runContext.render(sessionToken)) + .accessKeyId(accessKeyId) + .secretKeyId(secretKeyId) + .sessionToken(sessionToken) .region(region) - .endpointOverride(runContext.render(endpointOverride)) + .endpointOverride(endpointOverride) .maxRecords(this.maxRecords) .maxDuration(this.maxDuration) .serdeType(this.serdeType) diff --git a/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java b/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java index 04f6741..b787718 100644 --- a/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java +++ b/src/test/java/io/kestra/plugin/aws/athena/QueryTest.java @@ -38,12 +38,12 @@ void run() throws Exception { .id("hello") .type(Query.class.getName()) .region(Property.of("eu-west-3")) - .accessKeyId(accessKey) - .secretKeyId(secretKey) - .database("units") - .fetchType(FetchType.FETCH) - .outputLocation("s3://kestra-unit-test") - .query("select * from types") + .accessKeyId(Property.of(accessKey)) + .secretKeyId(Property.of(secretKey)) + .database(Property.of("units")) + .fetchType(Property.of(FetchType.FETCH)) + .outputLocation(Property.of("s3://kestra-unit-test")) + .query(Property.of("select * from types")) .build(); var output = query.run(runContext); diff --git a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java index fd73001..c6d1035 100644 --- a/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java +++ b/src/test/java/io/kestra/plugin/aws/cli/AwsCLITest.java @@ -40,9 +40,9 @@ void run() throws Exception { .image("amazon/aws-cli") .entryPoint(Collections.emptyList()) .build()) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .env(Map.of("{{ inputs.envKey }}", "{{ inputs.envValue }}")) .commands(List.of( diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java index ade7d89..ffdd601 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/DeleteItemTest.java @@ -18,10 +18,10 @@ void run() throws Exception { var runContext = runContextFactory.of(); var delete = DeleteItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .key(Map.of("id", "1")) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java index aa2b72a..4613689 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java @@ -17,10 +17,10 @@ void run() throws Exception { var runContext = runContextFactory.of(); var get = GetItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .key(Map.of("id", "1")) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java index 462d07c..32b0615 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java @@ -16,10 +16,10 @@ void runMap() throws Exception { var runContext = runContextFactory.of(); var put = PutItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .item(Map.of( "id", "1", @@ -40,10 +40,10 @@ void runString() throws Exception { var runContext = runContextFactory.of(); var put = PutItem.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .item("{\"id\": \"1\", \"firstname\": \"Jane\", \"lastname\": \"Doe\"}") .build(); diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java index 495f231..f815805 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java @@ -21,10 +21,10 @@ void runFetch() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .keyConditionExpression("id = :id") .expressionAttributeValues(Map.of(":id", "1")) @@ -48,10 +48,10 @@ void runFetchWithExpression() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .keyConditionExpression("id = :id") .filterExpression("lastname = :lastname") @@ -76,10 +76,10 @@ void runFetchMultipleExpression() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .keyConditionExpression("id = :id") .expressionAttributeValues(Map.of(":id", "1")) @@ -103,10 +103,10 @@ void runFetchOne() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .keyConditionExpression("id = :id") .expressionAttributeValues(Map.of(":id", "1")) @@ -131,10 +131,10 @@ void runStored() throws Exception { var runContext = runContextFactory.of(); var query = Query.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .keyConditionExpression("id = :id") .expressionAttributeValues(Map.of(":id", "1")) diff --git a/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java b/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java index 888f721..952bab5 100644 --- a/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java +++ b/src/test/java/io/kestra/plugin/aws/dynamodb/ScanTest.java @@ -21,10 +21,10 @@ void runFetch() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .filterExpression("lastname = :lastname") .expressionAttributeValues(Map.of(":lastname", "Doe")) @@ -48,10 +48,10 @@ void runFetchNoExpression() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .fetchType(FetchType.FETCH) .build(); @@ -73,10 +73,10 @@ void runFetchMultipleExpression() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .filterExpression("lastname = :lastname and firstname = :firstname") .expressionAttributeValues(Map.of(":lastname", "Doe", ":firstname", "Jane")) @@ -100,10 +100,10 @@ void runFetchOne() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .filterExpression("lastname = :lastname") .expressionAttributeValues(Map.of(":lastname", "Doe")) @@ -128,10 +128,10 @@ void runStored() throws Exception { var runContext = runContextFactory.of(); var scan = Scan.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.DYNAMODB).toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .tableName("persons") .filterExpression("lastname = :lastname") .expressionAttributeValues(Map.of(":lastname", "Doe")) diff --git a/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java b/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java index 3cbcb72..d6a1a85 100644 --- a/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java +++ b/src/test/java/io/kestra/plugin/aws/ecr/GetAuthTokenTest.java @@ -28,9 +28,9 @@ void run() throws Exception { RunContext runContext = runContextFactory.of(); GetAuthToken query = GetAuthToken.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.EC2).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.EC2).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java index cb9a11e..6496889 100644 --- a/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java +++ b/src/test/java/io/kestra/plugin/aws/eventbridge/PutEventsTest.java @@ -79,10 +79,10 @@ void runMap() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry2, entry3)) .build(); @@ -143,10 +143,10 @@ void runStorage() throws Exception { } var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(runContext.storage().putFile(tempFile).toString()) .build(); @@ -185,10 +185,10 @@ void runString() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry, entry)) .build(); @@ -215,10 +215,10 @@ void runStringUpperCase() throws Exception { )) .build(); var put = PutEvents.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .entries(List.of(entry, entry, entry)) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java index d99cd27..351a820 100644 --- a/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java +++ b/src/test/java/io/kestra/plugin/aws/kinesis/PutRecordsTest.java @@ -117,10 +117,10 @@ void runMap() throws Exception { .data("record 3") .build(); var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .streamName("streamName") .records(List.of(record, record2, record3)) .build(); @@ -169,10 +169,10 @@ void runStorage() throws Exception { } var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .records(runContext.storage().putFile(tempFile).toString()) .streamName("streamName") .build(); @@ -222,10 +222,10 @@ void runStorageUpperCase() throws Exception { } var put = PutRecords.builder() - .endpointOverride(localstack.getEndpoint().toString()) + .endpointOverride(Property.of(localstack.getEndpoint().toString())) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .records(runContext.storage().putFile(tempFile).toString()) .streamName("streamName") .build(); diff --git a/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java index cf7fd5a..6898fe5 100644 --- a/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java +++ b/src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java @@ -28,13 +28,13 @@ public void setUp() { public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Exception { // Given var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) .functionArn(FUNCTION_NAME) .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); @@ -62,13 +62,13 @@ public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Excep public void givenNotFoundLambda_whenInvoked_thenErrorNoMetrics() throws Exception { // Given var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) .functionArn("Fake_ARN") .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); @@ -89,14 +89,14 @@ public void givenFailingLambda_whenInvoked_thenFailureNoMetrics() throws Excepti // ask for an error in the Lambda by function param (see test resource lambda/test.py) params.put("action", "error"); var invoke = Invoke.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString())) .functionArn(FUNCTION_NAME) .functionPayload(params) .id(InvokeTest.class.getSimpleName()) .type(InvokeTest.class.getName()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .build(); var client = invoke.client(context); diff --git a/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java b/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java index 8a3eedd..49f81cf 100644 --- a/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java +++ b/src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java @@ -42,7 +42,7 @@ @ExtendWith(MockitoExtension.class) public class InvokeUnitTest { - + private Invoke invoke; @Mock(strictness = Strictness.LENIENT) @@ -81,10 +81,10 @@ public URI answer(InvocationOnMock invocation) throws Throwable { .functionPayload(null) // w/o paramters now .id(InvokeUnitTest.class.getSimpleName()) .type(InvokeUnitTest.class.getName()) - .accessKeyId("test_accessKeyId") - .secretKeyId("test_secretKeyId") + .accessKeyId(Property.of("test_accessKeyId")) + .secretKeyId(Property.of("test_secretKeyId")) .region(Property.of("test_region")) - .build(); + .build(); } @AfterEach @@ -99,12 +99,12 @@ void tearDown() { void testParseContentType_NoContentType_Binary() { assertEquals(ContentType.APPLICATION_OCTET_STREAM, invoke.parseContentType(Optional.empty()), "Should be binary"); } - + @Test void testParseContentType_BadContent_Binary() { assertEquals(ContentType.APPLICATION_OCTET_STREAM, invoke.parseContentType(Optional.of("fake/type")), "Should be binary"); } - + @Test void testParseContentType_JSON() { assertEquals(ContentType.APPLICATION_JSON.getMimeType().toString(), @@ -155,13 +155,13 @@ private void checkOutput(final String originalData, final Output result) throws "Content type must mach"); } - // ******** BDD usecases ******** + // ******** BDD usecases ******** @Test void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile( @Mock LambdaClient awsLambda, @Mock InvokeResponse awsResponse, @Mock SdkHttpResponse awsHttpResponse, - @Mock SdkBytes payload) + @Mock SdkBytes payload) throws IllegalVariableEvaluationException, IOException { //Given: functionArn and no input params, AWS Lambda clinet mocked for the expected behaviour var data = "some raw data"; @@ -171,16 +171,16 @@ void givenFunctionArnNoParams_whenInvokeLambda_thenOutputWithFile( given(awsResponse.sdkHttpResponse()).willReturn(awsHttpResponse); given(awsResponse.payload()).willReturn(payload); given(awsLambda.invoke(any(InvokeRequest.class))).willReturn(awsResponse); - + // Mock AbstractLambdaInvoke.client() to return the mocked AWS client var spyInvoke = spy(invoke); doReturn(awsLambda).when(spyInvoke).client(any()); - - // When + + // When Output res = assertDoesNotThrow(() -> { return spyInvoke.run(context); }, "No exception should be thrown"); - + // Then checkOutput(data, res); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java index ef506a0..fe5ed4c 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AbstractTest.java @@ -46,9 +46,9 @@ protected String createBucket(String bucket) throws Exception { .id(AllTest.class.getSimpleName()) .type(CreateBucket.class.getName()) .bucket(bucket) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .build(); @@ -78,9 +78,9 @@ protected String upload(String dir, String bucket) throws Exception { .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) .bucket(bucket) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(source.toString()) .key(dir + "/" + out + ".yml") @@ -95,9 +95,9 @@ protected String upload(String dir, String bucket) throws Exception { .id(ListTest.class.getSimpleName()) .type(List.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())); } diff --git a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java index db6f445..660b97b 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/AllTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/AllTest.java @@ -28,9 +28,9 @@ void run() throws Exception { .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .prefix("tasks/aws/upload/") .build(); @@ -43,9 +43,9 @@ void run() throws Exception { .id(AllTest.class.getSimpleName()) .type(Download.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .key(key) .build(); @@ -62,9 +62,9 @@ void run() throws Exception { .id(AllTest.class.getSimpleName()) .type(Delete.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .key(key) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java index c8e4023..e2c28d6 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/CopyTest.java @@ -19,9 +19,9 @@ void run(Boolean delete) throws Exception { Copy task = Copy.builder() .id(CopyTest.class.getSimpleName()) .type(List.class.getName()) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(Copy.CopyObjectFrom.builder() .bucket(this.BUCKET) diff --git a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java index 07f1370..63bf7e6 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DeleteListTest.java @@ -22,9 +22,9 @@ void run() throws Exception { .id(ListTest.class.getSimpleName()) .type(List.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .concurrent(5) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java index 71303a2..65056bf 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/DownloadsTest.java @@ -27,9 +27,9 @@ void delete() throws Exception { .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .action(ActionInterface.Action.DELETE) .build(); @@ -56,9 +56,9 @@ void move() throws Exception { .id(DownloadsTest.class.getSimpleName()) .type(Downloads.class.getName()) .bucket("{{bucket}}") - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .action(ActionInterface.Action.MOVE) .moveTo(Copy.CopyObject.builder() diff --git a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java index 00ed744..cc72e34 100644 --- a/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java +++ b/src/test/java/io/kestra/plugin/aws/s3/UploadsTest.java @@ -25,9 +25,9 @@ void run() throws Exception { .id(AllTest.class.getSimpleName()) .type(Upload.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .from(java.util.List.of(source1.toString(), source2.toString(), source3.toString(), source4.toString())) .key(IdUtils.create() + "/") @@ -39,9 +39,9 @@ void run() throws Exception { .id(UploadsTest.class.getSimpleName()) .type(Upload.class.getName()) .bucket(this.BUCKET) - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString()) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.S3).toString())) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .region(Property.of(localstack.getRegion())) .prefix(upload.getKey()) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java b/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java index d0918f7..f80a6da 100644 --- a/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java +++ b/src/test/java/io/kestra/plugin/aws/sns/PublishTest.java @@ -16,11 +16,11 @@ void run() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SNS).toString())) .topicArn(TOPIC_ARN) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(), diff --git a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java index 4eb127c..ff70d62 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/AbstractSqsTest.java @@ -1,5 +1,6 @@ package io.kestra.plugin.aws.sqs; +import io.kestra.core.models.property.Property; import io.kestra.core.runners.RunContextFactory; import io.kestra.plugin.aws.AbstractLocalStackTest; import io.kestra.core.junit.annotations.KestraTest; diff --git a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java index 6dcb229..46dd189 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/PublishThenConsumeTest.java @@ -17,11 +17,11 @@ void runText() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(), @@ -34,11 +34,11 @@ void runText() throws Exception { assertThat(publishOutput.getMessagesCount(), is(2)); var consume = Consume.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .maxRecords(2) .build(); @@ -51,11 +51,11 @@ void runJson() throws Exception { var runContext = runContextFactory.of(); var publish = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data(""" @@ -70,11 +70,11 @@ void runJson() throws Exception { assertThat(publishOutput.getMessagesCount(), is(2)); var consume = Consume.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .serdeType(SerdeType.JSON) .maxRecords(2) .build(); diff --git a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java index 859858e..b1f8a96 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/RealtimeTriggerTest.java @@ -71,11 +71,11 @@ void flow() throws Exception { // publish two messages to trigger the flow Publish task = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build() diff --git a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java index c9eded4..94ed86d 100644 --- a/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java +++ b/src/test/java/io/kestra/plugin/aws/sqs/TriggerTest.java @@ -73,11 +73,11 @@ void flow() throws Exception { // publish two messages to trigger the flow Publish task = Publish.builder() - .endpointOverride(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString()) + .endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.SQS).toString())) .queueUrl(queueUrl()) .region(Property.of(localstack.getRegion())) - .accessKeyId(localstack.getAccessKey()) - .secretKeyId(localstack.getSecretKey()) + .accessKeyId(Property.of(localstack.getAccessKey())) + .secretKeyId(Property.of(localstack.getSecretKey())) .from( List.of( Message.builder().data("Hello World").build(),