Skip to content

Commit

Permalink
refactor: move DynamoDB to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Dec 16, 2024
1 parent 8ac933f commit de13b2e
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.common.FetchOutput;
import io.kestra.core.models.tasks.common.FetchType;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -40,9 +41,8 @@
@NoArgsConstructor
public abstract class AbstractDynamoDb extends AbstractConnection {
@Schema(title = "The DynamoDB table name.")
@PluginProperty(dynamic = true)
@NotNull
protected String tableName;
protected Property<String> tableName;

protected DynamoDbClient client(final RunContext runContext) throws IllegalVariableEvaluationException {
final AwsClientConfig clientConfig = awsClientConfig(runContext);
Expand Down Expand Up @@ -109,7 +109,7 @@ protected AttributeValue objectFrom(Object value) {
return AttributeValue.fromS(value.toString());
}

protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, FetchType fetchType, RunContext runContext) throws IOException {
protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, FetchType fetchType, RunContext runContext) throws IOException, IllegalVariableEvaluationException {
var outputBuilder = FetchOutput.builder();
switch (fetchType) {
case FETCH:
Expand Down Expand Up @@ -139,7 +139,7 @@ protected FetchOutput fetchOutputs(List<Map<String, AttributeValue>> items, Fetc

runContext.metric(Counter.of(
"records", output.getSize(),
"tableName", getTableName()
"tableName", runContext.render(getTableName()).as(String.class).orElseThrow()
));

return output;
Expand Down
11 changes: 6 additions & 5 deletions src/main/java/io/kestra/plugin/aws/dynamodb/DeleteItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.VoidOutput;
import io.kestra.core.runners.RunContext;
Expand Down Expand Up @@ -41,7 +42,7 @@
secretKeyId: "<secret-key>"
region: "eu-central-1"
tableName: "persons"
key:
key:
id: "1"
"""
)
Expand All @@ -52,16 +53,16 @@ public class DeleteItem extends AbstractDynamoDb implements RunnableTask<VoidOut
title = "The DynamoDB item key.",
description = "The DynamoDB item identifier."
)
@PluginProperty
private Map<String, Object> key;
private Property<Map<String, Object>> key;

@Override
public VoidOutput run(RunContext runContext) throws Exception {
try (var dynamoDb = client(runContext)) {
Map<String, AttributeValue> key = valueMapFrom(getKey());
var renderedKey = runContext.render(this.key).asMap(String.class, Object.class);
Map<String, AttributeValue> key = valueMapFrom(renderedKey);

var deleteRequest = DeleteItemRequest.builder()
.tableName(runContext.render(this.getTableName()))
.tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow())
.key(key)
.build();

Expand Down
10 changes: 5 additions & 5 deletions src/main/java/io/kestra/plugin/aws/dynamodb/GetItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.swagger.v3.oas.annotations.media.Schema;
Expand Down Expand Up @@ -37,7 +38,7 @@
secretKeyId: "<secret-key>"
region: "eu-central-1"
tableName: "persons"
key:
key:
id: "1"
"""
)
Expand All @@ -48,16 +49,15 @@ public class GetItem extends AbstractDynamoDb implements RunnableTask<GetItem.Ou
title = "The DynamoDB item key.",
description = "The DynamoDB item identifier."
)
@PluginProperty(dynamic = true)
private Map<String, Object> key;
private Property<Map<String, Object>> key;

@Override
public Output run(RunContext runContext) throws Exception {
try (var dynamoDb = client(runContext)) {
Map<String, AttributeValue> key = valueMapFrom(runContext.render(this.key));
Map<String, AttributeValue> key = valueMapFrom(runContext.render(this.key).asMap(String.class, Object.class));

var getRequest = GetItemRequest.builder()
.tableName(runContext.render(this.tableName))
.tableName(runContext.render(this.tableName).as(String.class).orElseThrow())
.key(key)
.build();

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/dynamodb/PutItem.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public VoidOutput run(RunContext runContext) throws Exception {
var item = valueMapFrom(fields);

var putRequest = PutItemRequest.builder()
.tableName(runContext.render(this.tableName))
.tableName(runContext.render(this.tableName).as(String.class).orElseThrow())
.item(item)
.build();
dynamoDb.putItem(putRequest);
Expand Down
28 changes: 12 additions & 16 deletions src/main/java/io/kestra/plugin/aws/dynamodb/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.common.FetchOutput;
import io.kestra.core.models.tasks.common.FetchType;
Expand Down Expand Up @@ -75,56 +76,51 @@ public class Query extends AbstractDynamoDb implements RunnableTask<FetchOutput>
+ "NONE do nothing."
)
@Builder.Default
@PluginProperty
private FetchType fetchType = FetchType.STORE;
private Property<FetchType> fetchType = Property.of(FetchType.STORE);

@Schema(
title = "Maximum numbers of returned results."
)
@PluginProperty
private Integer limit;
private Property<Integer> limit;

@Schema(
title = "Query key condition expression."
)
@PluginProperty(dynamic = true)
@NotNull
private String keyConditionExpression;
private Property<String> keyConditionExpression;

@Schema(
title = "Query expression attributes.",
description = "It's a map of string -> object."
)
@PluginProperty(dynamic = true)
@NotNull
private Map<String, Object> expressionAttributeValues;
private Property<Map<String, Object>> expressionAttributeValues;

@Schema(
title = "Query filter expression.",
description = "Query filter expression."
)
@PluginProperty(dynamic = true)
private String filterExpression;
private Property<String> filterExpression;

@Override
public FetchOutput run(RunContext runContext) throws Exception {
try (var dynamoDb = client(runContext)) {
var queryBuilder = QueryRequest.builder()
.tableName(runContext.render(this.getTableName()))
.keyConditionExpression(runContext.render(keyConditionExpression))
.expressionAttributeValues(valueMapFrom(expressionAttributeValues));
.tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow())
.keyConditionExpression(runContext.render(keyConditionExpression).as(String.class).orElseThrow())
.expressionAttributeValues(valueMapFrom(runContext.render(expressionAttributeValues).asMap(String.class, Object.class)));

if(limit != null) {
queryBuilder.limit(limit);
queryBuilder.limit(runContext.render(limit).as(Integer.class).orElseThrow());
}
if(filterExpression != null){
queryBuilder.filterExpression(runContext.render(filterExpression));
queryBuilder.filterExpression(runContext.render(filterExpression).as(String.class).orElseThrow());
}

var query = queryBuilder.build();
var items = dynamoDb.query(query).items();

return this.fetchOutputs(items, this.fetchType, runContext);
return this.fetchOutputs(items, runContext.render(this.fetchType).as(FetchType.class).orElseThrow(), runContext);
}
}
}
26 changes: 12 additions & 14 deletions src/main/java/io/kestra/plugin/aws/dynamodb/Scan.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.common.FetchOutput;
import io.kestra.core.models.tasks.common.FetchType;
Expand Down Expand Up @@ -71,52 +72,49 @@ public class Scan extends AbstractDynamoDb implements RunnableTask<FetchOutput>
+ "NONE do nothing."
)
@Builder.Default
@PluginProperty
private FetchType fetchType = FetchType.STORE;
private Property<FetchType> fetchType = Property.of(FetchType.STORE);

@Schema(
title = "Maximum numbers of returned results."
)
@PluginProperty
private Integer limit;
private Property<Integer> limit;

@Schema(
title = "Scan filter expression.",
description = "When used, `expressionAttributeValues` property must also be provided."
)
@PluginProperty(dynamic = true)
private String filterExpression;
private Property<String> filterExpression;

@Schema(
title = "Scan expression attributes.",
description = "It's a map of string -> object."
)
@PluginProperty(dynamic = true)
private Map<String, Object> expressionAttributeValues;
private Property<Map<String, Object>> expressionAttributeValues;


@Override
public FetchOutput run(RunContext runContext) throws Exception {
try (var dynamoDb = client(runContext)) {
var scanBuilder = ScanRequest.builder()
.tableName(runContext.render(this.getTableName()));
.tableName(runContext.render(this.getTableName()).as(String.class).orElseThrow());

if(limit != null) {
scanBuilder.limit(limit);
scanBuilder.limit(runContext.render(limit).as(Integer.class).orElseThrow());
}
if(filterExpression != null){
if(expressionAttributeValues == null){
var attributes = runContext.render(expressionAttributeValues).asMap(String.class, Object.class);
if(attributes.isEmpty()){
throw new IllegalArgumentException("'expressionAttributeValues' must be provided when 'filterExpression' is used");
}
scanBuilder.filterExpression(runContext.render(filterExpression));
scanBuilder.expressionAttributeValues(valueMapFrom(expressionAttributeValues));
scanBuilder.filterExpression(runContext.render(filterExpression).as(String.class).orElseThrow());
scanBuilder.expressionAttributeValues(valueMapFrom(attributes));
}


var scan = scanBuilder.build();
var items = dynamoDb.scan(scan).items();

return this.fetchOutputs(items, this.fetchType, runContext);
return this.fetchOutputs(items, runContext.render(this.fetchType).as(FetchType.class).orElseThrow(), runContext);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ void run() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.key(Map.of("id", "1"))
.tableName(Property.of("persons"))
.key(Property.of(Map.of("id", "1")))
.build();

createTable(runContext, delete);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/aws/dynamodb/GetItemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ void run() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.key(Map.of("id", "1"))
.tableName(Property.of("persons"))
.key(Property.of(Map.of("id", "1")))
.build();

createTable(runContext, get);
Expand Down
4 changes: 2 additions & 2 deletions src/test/java/io/kestra/plugin/aws/dynamodb/PutItemTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ void runMap() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.tableName(Property.of("persons"))
.item(Map.of(
"id", "1",
"firstname", "John",
Expand All @@ -44,7 +44,7 @@ void runString() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.tableName(Property.of("persons"))
.item("{\"id\": \"1\", \"firstname\": \"Jane\", \"lastname\": \"Doe\"}")
.build();

Expand Down
42 changes: 21 additions & 21 deletions src/test/java/io/kestra/plugin/aws/dynamodb/QueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ void runFetch() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.keyConditionExpression("id = :id")
.expressionAttributeValues(Map.of(":id", "1"))
.fetchType(FetchType.FETCH)
.tableName(Property.of("persons"))
.keyConditionExpression(Property.of("id = :id"))
.expressionAttributeValues(Property.of(Map.of(":id", "1")))
.fetchType(Property.of(FetchType.FETCH))
.build();

createTable(runContext, query);
Expand All @@ -52,11 +52,11 @@ void runFetchWithExpression() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.keyConditionExpression("id = :id")
.filterExpression("lastname = :lastname")
.expressionAttributeValues(Map.of(":id", "1", ":lastname", "Doe"))
.fetchType(FetchType.FETCH)
.tableName(Property.of("persons"))
.keyConditionExpression(Property.of("id = :id"))
.filterExpression(Property.of("lastname = :lastname"))
.expressionAttributeValues(Property.of(Map.of(":id", "1", ":lastname", "Doe")))
.fetchType(Property.of(FetchType.FETCH))
.build();

createTable(runContext, query);
Expand All @@ -80,10 +80,10 @@ void runFetchMultipleExpression() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.keyConditionExpression("id = :id")
.expressionAttributeValues(Map.of(":id", "1"))
.fetchType(FetchType.FETCH)
.tableName(Property.of("persons"))
.keyConditionExpression(Property.of("id = :id"))
.expressionAttributeValues(Property.of(Map.of(":id", "1")))
.fetchType(Property.of(FetchType.FETCH))
.build();

createTable(runContext, query);
Expand All @@ -107,10 +107,10 @@ void runFetchOne() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.keyConditionExpression("id = :id")
.expressionAttributeValues(Map.of(":id", "1"))
.fetchType(FetchType.FETCH_ONE)
.tableName(Property.of("persons"))
.keyConditionExpression(Property.of("id = :id"))
.expressionAttributeValues(Property.of(Map.of(":id", "1")))
.fetchType(Property.of(FetchType.FETCH_ONE))
.build();

createTable(runContext, query);
Expand All @@ -135,10 +135,10 @@ void runStored() throws Exception {
.region(Property.of(localstack.getRegion()))
.accessKeyId(Property.of(localstack.getAccessKey()))
.secretKeyId(Property.of(localstack.getSecretKey()))
.tableName("persons")
.keyConditionExpression("id = :id")
.expressionAttributeValues(Map.of(":id", "1"))
.fetchType(FetchType.STORE)
.tableName(Property.of("persons"))
.keyConditionExpression(Property.of("id = :id"))
.expressionAttributeValues(Property.of(Map.of(":id", "1")))
.fetchType(Property.of(FetchType.STORE))
.build();

createTable(runContext, query);
Expand Down
Loading

0 comments on commit de13b2e

Please sign in to comment.