Skip to content

Commit

Permalink
chore: migrate to new kestra storage API
Browse files Browse the repository at this point in the history
  • Loading branch information
fhussonnois committed Mar 6, 2024
1 parent f27c6df commit bf80a58
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 15 deletions.
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/athena/Query.java
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ private Pair<URI, Long> store(List<ColumnInfo> columnInfo, List<Row> results, Ru
}

return Pair.of(
runContext.putTempFile(tempFile),
runContext.storage().putFile(tempFile),
count.get()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private Pair<URI, Long> store(RunContext runContext, List<Map<String, AttributeV
}

return Pair.of(
runContext.putTempFile(tempFile),
runContext.storage().putFile(tempFile),
count.get()
);
}
Expand Down
5 changes: 2 additions & 3 deletions src/main/java/io/kestra/plugin/aws/eventbridge/PutEvents.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public PutEvents.Output run(RunContext runContext) throws Exception {

File tempFile = writeOutputFile(runContext, putEventsResponse, entryList);
return Output.builder()
.uri(runContext.putTempFile(tempFile))
.uri(runContext.storage().putFile(tempFile))
.failedEntryCount(putEventsResponse.failedEntryCount())
.entryCount(entryList.size())
.build();
Expand Down Expand Up @@ -168,14 +168,13 @@ private EventBridgeClient client(final RunContext runContext) throws IllegalVari
return configureSyncClient(clientConfig, EventBridgeClient.builder()).build();
}

@SuppressWarnings("unchecked")
private List<Entry> readEntryList(RunContext runContext, Object entries) throws IllegalVariableEvaluationException, URISyntaxException, IOException {
if (entries instanceof String) {
URI from = new URI(runContext.render((String) entries));
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid entries parameter, must be a Kestra internal storage URI, or a list of entries.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
return Flux.create(FileSerde.reader(inputStream, Entry.class), FluxSink.OverflowStrategy.BUFFER)
.collectList().block();
}
Expand Down
6 changes: 2 additions & 4 deletions src/main/java/io/kestra/plugin/aws/kinesis/PutRecords.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import lombok.experimental.SuperBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.KinesisClientBuilder;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
Expand Down Expand Up @@ -138,7 +136,7 @@ public Output run(RunContext runContext) throws Exception {

File tempFile = writeOutputFile(runContext, putRecordsResponse, records);
return Output.builder()
.uri(runContext.putTempFile(tempFile))
.uri(runContext.storage().putFile(tempFile))
.failedRecordsCount(putRecordsResponse.failedRecordCount())
.recordCount(records.size())
.build();
Expand Down Expand Up @@ -174,7 +172,7 @@ private List<Record> getRecordList(Object records, RunContext runContext) throws
if (!from.getScheme().equals("kestra")) {
throw new IllegalArgumentException("Invalid records parameter, must be a Kestra internal storage URI, or a list of records.");
}
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
return Flux.create(FileSerde.reader(inputStream, Record.class), FluxSink.OverflowStrategy.BUFFER)
.collectList().block();
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/lambda/Invoke.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
var size = Files.copy(dataStream, tempFile.toPath());
// Doing the same as in S3Service.download()
runContext.metric(Counter.of("file.size", size));
var uri = runContext.putTempFile(tempFile);
var uri = runContext.storage().putFile(tempFile);
if (log.isDebugEnabled()) {
log.debug("Lambda invokation task completed {}: response type: {}, file: `{}",
functionArn, contentType, uri);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/s3/S3Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public static Pair<GetObjectResponse, URI> download(RunContext runContext, S3Asy

runContext.metric(Counter.of("file.size", response.contentLength()));

return Pair.of(response, runContext.putTempFile(tempFile));
return Pair.of(response, runContext.storage().putFile(tempFile));
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/sns/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public Publish.Output run(RunContext runContext) throws Exception {
throw new Exception("Invalid 'from' parameter, must be a Kestra internal storage URI");
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {
try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, snsClient, topicArn, runContext);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/kestra/plugin/aws/sqs/Consume.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Output run(RunContext runContext) throws Exception {
}

return Output.builder()
.uri(runContext.putTempFile(tempFile))
.uri(runContext.storage().putFile(tempFile))
.count(total.get())
.build();
}
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/kestra/plugin/aws/sqs/Publish.java
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ public Output run(RunContext runContext) throws Exception {
throw new Exception("Invalid from parameter, must be a Kestra internal storage URI");
}

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.uriToInputStream(from)))) {

try (BufferedReader inputStream = new BufferedReader(new InputStreamReader(runContext.storage().getFile(from)))) {
flowable = Flux.create(FileSerde.reader(inputStream, Message.class), FluxSink.OverflowStrategy.BUFFER);
resultFlowable = this.buildFlowable(flowable, sqsClient, queueUrl, runContext);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import java.net.URI;
import java.nio.file.Files;
import java.util.Optional;

import io.kestra.core.storages.Storage;
import org.apache.http.entity.ContentType;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -44,10 +46,14 @@ public class InvokeUnitTest {
@Mock(strictness = Strictness.LENIENT)
private RunContext context;

@Mock(strictness = Strictness.LENIENT)
private Storage storage;

private File tempFile;

@BeforeEach
public void setUp() throws IOException, IllegalVariableEvaluationException {
given(context.storage()).willReturn(storage);
given(context.tempFile()).willReturn(Files.createTempFile("test", "lambdainvoke"));
given(context.metric(any())).willReturn(context);
given(context.render(anyString())).willAnswer(new Answer<String>() {
Expand All @@ -56,7 +62,7 @@ public String answer(InvocationOnMock invocation) throws Throwable {
return invocation.getArgument(0, String.class).toString();
}
});
given(context.putTempFile(any(File.class))).willAnswer(new Answer<URI>() {
given(storage.putFile(any(File.class))).willAnswer(new Answer<URI>() {
@Override
public URI answer(InvocationOnMock invocation) throws Throwable {
tempFile = invocation.getArgument(0, File.class);
Expand Down

0 comments on commit bf80a58

Please sign in to comment.