Skip to content

Commit

Permalink
refactor: move Lambda to dynamic properties
Browse files Browse the repository at this point in the history
  • Loading branch information
mgabelle committed Dec 17, 2024
1 parent eebaace commit 6ee4ffb
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 23 deletions.
20 changes: 11 additions & 9 deletions src/main/java/io/kestra/plugin/aws/lambda/Invoke.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.executions.metrics.Counter;
import io.kestra.core.models.executions.metrics.Timer;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
Expand Down Expand Up @@ -98,22 +99,23 @@ public class Invoke extends AbstractConnection implements RunnableTask<Output> {
private static final ObjectMapper OBJECT_MAPPER = JacksonMapper.ofJson();

@Schema(title = "The Lambda function name.")
@PluginProperty(dynamic = true)
@NotNull
private String functionArn;
private Property<String> functionArn;

@Schema(
title = "Function request payload.",
description = "Request payload. It's a map of string -> object."
)
@PluginProperty(dynamic = true)
private Map<String, Object> functionPayload;
)
private Property<Map<String, Object>> functionPayload;

@Override
public Output run(RunContext runContext) throws Exception {
final long start = System.nanoTime();
var functionArn = runContext.render(this.functionArn);
var requestPayload = this.functionPayload != null ? runContext.render(this.functionPayload) : null;
var functionArn = runContext.render(this.functionArn).as(String.class).orElseThrow();
var requestPayload = runContext.render(this.functionPayload).asMap(String.class, Object.class).isEmpty() ?
null :
runContext.render(this.functionPayload).asMap(String.class, Object.class);

try (var lambda = client(runContext)) {
var builder = InvokeRequest.builder().functionName(functionArn);
if (requestPayload != null && requestPayload.size() > 0) {
Expand Down Expand Up @@ -205,7 +207,7 @@ void handleError(String functionArn, ContentType contentType, SdkBytes payload)
log.debug("Lambda function error for {}: response type: {}, response payload: {}",
functionArn, contentType, errorPayload);
}
if (errorPayload != null
if (errorPayload != null
&& ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
throw new LambdaInvokeException(
"Lambda Invoke task responded with error for function: " + functionArn
Expand All @@ -229,7 +231,7 @@ Output handleContent(RunContext runContext, String functionArn, ContentType cont
runContext.metric(Counter.of("file.size", size));
var uri = runContext.storage().putFile(tempFile);
if (log.isDebugEnabled()) {
log.debug("Lambda invokation task completed {}: response type: {}, file: `{}",
log.debug("Lambda invokation task completed {}: response type: {}, file: `{}",
functionArn, contentType, uri);
}
return Output.builder()
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/kestra/plugin/aws/lambda/InvokeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void givenExistingLambda_whenInvoked_thenOutputOkMetricsOk() throws Excep
// Given
var invoke = Invoke.builder()
.endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()))
.functionArn(FUNCTION_NAME)
.functionArn(Property.of(FUNCTION_NAME))
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(Property.of(localstack.getRegion()))
Expand Down Expand Up @@ -63,7 +63,7 @@ public void givenNotFoundLambda_whenInvoked_thenErrorNoMetrics() throws Exceptio
// Given
var invoke = Invoke.builder()
.endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()))
.functionArn("Fake_ARN")
.functionArn(Property.of("Fake_ARN"))
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(Property.of(localstack.getRegion()))
Expand All @@ -90,8 +90,8 @@ public void givenFailingLambda_whenInvoked_thenFailureNoMetrics() throws Excepti
params.put("action", "error");
var invoke = Invoke.builder()
.endpointOverride(Property.of(localstack.getEndpointOverride(LocalStackContainer.Service.LAMBDA).toString()))
.functionArn(FUNCTION_NAME)
.functionPayload(params)
.functionArn(Property.of(FUNCTION_NAME))
.functionPayload(Property.of(params))
.id(InvokeTest.class.getSimpleName())
.type(InvokeTest.class.getName())
.region(Property.of(localstack.getRegion()))
Expand Down
38 changes: 28 additions & 10 deletions src/test/java/io/kestra/plugin/aws/lambda/InvokeUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,19 @@
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.*;

import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import io.kestra.core.models.property.Property;
import io.kestra.core.runners.RunContextProperty;
import io.kestra.core.storages.Storage;
import org.apache.http.entity.ContentType;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -48,6 +51,9 @@ public class InvokeUnitTest {
@Mock(strictness = Strictness.LENIENT)
private RunContext context;

@Mock(strictness = Strictness.LENIENT)
private RunContextProperty runContextProperty;

@Mock(strictness = Strictness.LENIENT)
private Storage storage;

Expand All @@ -56,18 +62,30 @@ public class InvokeUnitTest {

private File tempFile;

private String testValue;

@BeforeEach
void setUp() throws IOException, IllegalVariableEvaluationException {
given(context.storage()).willReturn(storage);
given(context.workingDir()).willReturn(workingDir);
given(context.workingDir().createTempFile()).willReturn(Files.createTempFile("test", "lambdainvoke"));
given(context.metric(any())).willReturn(context);
given(context.render(anyString())).willAnswer(new Answer<String>() {
given(context.render(any(Property.class))).willAnswer(new Answer<RunContextProperty<String>>() {
@Override
public String answer(InvocationOnMock invocation) throws Throwable {
return invocation.getArgument(0, String.class).toString();
public RunContextProperty<String> answer(InvocationOnMock invocation) throws Throwable {
testValue = invocation.getArgument(0, Property.class).toString();
return runContextProperty;
}
});
given(runContextProperty.as(String.class)).willAnswer(new Answer<Optional<String>>() {
@Override
public Optional<String> answer(InvocationOnMock invocation) throws Throwable {
return Optional.of(testValue);
}
});

given(runContextProperty.asMap(any(), any())).willAnswer((Answer<Map>) invocation -> Collections.emptyMap());

given(storage.putFile(any(File.class))).willAnswer(new Answer<URI>() {
@Override
public URI answer(InvocationOnMock invocation) throws Throwable {
Expand All @@ -77,8 +95,8 @@ public URI answer(InvocationOnMock invocation) throws Throwable {
});

invoke = Invoke.builder()
.functionArn("test_function_arn")
.functionPayload(null) // w/o paramters now
.functionArn(Property.of("test_function_arn"))
.functionPayload(Property.of((Map.of()))) // w/o paramters now
.id(InvokeUnitTest.class.getSimpleName())
.type(InvokeUnitTest.class.getName())
.accessKeyId(Property.of("test_accessKeyId"))
Expand Down Expand Up @@ -115,7 +133,7 @@ void testParseContentType_JSON() {
@Test
void testReadError_NotJsonType(@Mock SdkBytes bytes) {
assertThrows(LambdaInvokeException.class, () -> {
invoke.handleError(invoke.getFunctionArn(), ContentType.APPLICATION_OCTET_STREAM, bytes);
invoke.handleError(invoke.getFunctionArn().toString(), ContentType.APPLICATION_OCTET_STREAM, bytes);
}, "Should throw an error"
);
}
Expand All @@ -126,7 +144,7 @@ void testReadError_FromJsonMessage(@Mock SdkBytes bytes) {
given(bytes.asUtf8String()).willReturn(
"{\"errorMessage\": \"" + errorText + "\", \"errorType\": \"KeyError\"}");
Throwable throwable = assertThrows(LambdaInvokeException.class, () -> {
invoke.handleError(invoke.getFunctionArn(), ContentType.APPLICATION_JSON, bytes);
invoke.handleError(invoke.getFunctionArn().toString(), ContentType.APPLICATION_JSON, bytes);
}, "Should throw an error");
assertTrue(throwable.getMessage().indexOf(errorText) > 0,
"Exception message should contain an original message");
Expand All @@ -138,7 +156,7 @@ void testHandleContent_SaveFile_ReturnOutput(@Mock SdkBytes bytes) throws IOExce
var data = "some raw data";
given(bytes.asInputStream()).willReturn(new ByteArrayInputStream(data.getBytes()));

Output res = invoke.handleContent(context, invoke.getFunctionArn(), ContentType.APPLICATION_OCTET_STREAM, bytes);
Output res = invoke.handleContent(context, invoke.getFunctionArn().toString(), ContentType.APPLICATION_OCTET_STREAM, bytes);

checkOutput(data, res);
}
Expand Down

0 comments on commit 6ee4ffb

Please sign in to comment.