Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1759 - Periodically execute lambdas to keep a warm pool ready #1925

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
6060ad9
Adding support to keep lambdas warm
ab295382 Feb 28, 2024
d01353b
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Feb 28, 2024
e9c0f54
Revert config back and added new config for the warm lambda stack
ab295382 Feb 28, 2024
610775e
Fixing properties test
ab295382 Feb 28, 2024
9953e91
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Feb 29, 2024
e3055ae
Removing KeepLambdaWarmStack from default optional stacks, changed t…
ab295382 Mar 1, 2024
20f85b1
Merge branch '1759-periodically-execute-lambdas-to-keep-a-warm-pool-r…
ab295382 Mar 1, 2024
b9c39bf
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 1, 2024
91d8dc9
Adding support for Sleeper Types in the WarmQueryExecutorLambda
ab295382 Mar 1, 2024
e15a0f9
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 1, 2024
a95941d
Adding documentation for the KeepLambdaWarmStack
ab295382 Mar 1, 2024
31bf46b
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 5, 2024
1efb9c8
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 5, 2024
974903a
Updating documentation and renaming properties to be consistent.
ab295382 Mar 5, 2024
63ed404
Updating the generated instance properties examples
ab295382 Mar 6, 2024
af10f45
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 7, 2024
845bb1b
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 8, 2024
ac03a9a
public class WarmQueryExecutorLambdaIT {
ab295382 Mar 8, 2024
571bbf1
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 11, 2024
a89e94f
Updating comments to Given / When / Then
ab295382 Mar 11, 2024
573fd8a
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 11, 2024
7d5896c
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 11, 2024
08400d1
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 12, 2024
ab178c0
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 12, 2024
d825584
Merged conflict fixes
ab295382 Mar 12, 2024
cd57204
Updated assert to be more explicit for WarmQueryExecutorLambdaIT
ab295382 Mar 12, 2024
ff23f38
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 12, 2024
ac44bce
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 12, 2024
dce4068
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 20, 2024
38f90dc
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 27, 2024
fd499a1
Checkstyle fixes
ab295382 Mar 22, 2024
f2b3650
More checkstyle fixes
ab295382 Mar 22, 2024
299ac0b
Changed WarmQueryExecutorLambda to exact range query
ab295382 Mar 22, 2024
f8def4d
Updated WarmQueryExecutorLambda to create a range for each row key
ab295382 Mar 25, 2024
a743d8f
Adding unittest for WarmQueryExecutorLambda and condensed teh IT tests
ab295382 Mar 26, 2024
ac6e397
Merge branch 'develop' of github.com:gchq/sleeper into 1759-periodica…
ab295382 Mar 27, 2024
daa9276
Moving the multiple row keys test to be a unit test
ab295382 Mar 27, 2024
7444552
Changing the way the expected regions are created for the tests
ab295382 Mar 28, 2024
c57d8d0
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 28, 2024
59c6326
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Mar 28, 2024
ff2869d
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Apr 2, 2024
924b12b
Merge branch 'develop' into 1759-periodically-execute-lambdas-to-keep…
ab295382 Apr 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions docs/07-data-retrieval.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,16 @@ To send the results to a particular SQS queue use:
You will need to give Sleeper's writing data IAM role (given by the CloudFormation
export `<instance-id>-QueryLambdaRoleArn`) permission to write to the above S3 bucket or SQS queue.

## Keep Lambda Warm Optional Stack

Lambdas inherently have a startup time usually refer to as cold start.
This can add a significant delay thus increasing a queries execution time.

To address this issue the KeepLambdaWarmStack can be enabled. This will create an Event Rule running every 5 minutes which
triggers the query lambdas thus ensuring its in a warm state. Enabling this will incur extra charges since the Lambdas are running every 5 minutes.

This can be enabled by adding `KeepLambdaWarmStack` to the optional stacks. It is not enabled by default.

## Use the Java API directly

You can also retrieve data using the Java class `QueryExecutor`.
Expand Down
4 changes: 4 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,10 @@ sleeper.default.query.results.rowgroup.size=8388608
# The value given below is 128KiB. This value can be overridden using the query config.
sleeper.default.query.results.page.size=131072

# The rate at which the query lambda runs to keep it warm (in minutes, must be >=1). This only
# applies when the KeepLambdaWarmStack is enabled
sleeper.query.warm.lambda.period.minutes=5


## The following properties relate to the dashboard.

Expand Down
15 changes: 13 additions & 2 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import sleeper.cdk.stack.IngestStack;
import sleeper.cdk.stack.IngestStacks;
import sleeper.cdk.stack.IngestStatusStoreStack;
import sleeper.cdk.stack.KeepLambdaWarmStack;
import sleeper.cdk.stack.ManagedPoliciesStack;
import sleeper.cdk.stack.PartitionSplittingStack;
import sleeper.cdk.stack.PropertiesStack;
Expand Down Expand Up @@ -88,7 +89,7 @@ public class SleeperCdkApp extends Stack {
private PersistentEmrBulkImportStack persistentEmrBulkImportStack;
private EksBulkImportStack eksBulkImportStack;
private IngestStatusStoreStack ingestStatusStoreStack;
private QueryStack queryStack;
private QueryQueueStack queryQueueStack;

public SleeperCdkApp(App app, String id, StackProps props, InstanceProperties instanceProperties, BuiltJars jars) {
super(app, id, props);
Expand Down Expand Up @@ -237,9 +238,10 @@ public void create() {
topicStack.getTopic());
}

QueryStack queryStack = null;
// Stack to execute queries
if (QUERY_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
QueryQueueStack queryQueueStack = new QueryQueueStack(this, "QueryQueue", instanceProperties);
queryQueueStack = new QueryQueueStack(this, "QueryQueue", instanceProperties);
queryStack = new QueryStack(this,
"Query",
instanceProperties, jars,
Expand Down Expand Up @@ -280,6 +282,15 @@ public void create() {
instanceProperties);
}

if (optionalStacks.contains(KeepLambdaWarmStack.class.getSimpleName())) {
new KeepLambdaWarmStack(this,
"KeepLambdaWarmExecution",
instanceProperties,
jars,
coreStacks,
queryQueueStack);
}
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved

this.generateProperties();
addTags(app);
}
Expand Down
109 changes: 109 additions & 0 deletions java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.cdk.stack;

import software.amazon.awscdk.CfnOutput;
import software.amazon.awscdk.CfnOutputProps;
import software.amazon.awscdk.Duration;
import software.amazon.awscdk.NestedStack;
import software.amazon.awscdk.services.events.Rule;
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.constructs.Construct;

import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
import sleeper.cdk.jars.LambdaCode;
import sleeper.configuration.properties.SleeperScheduleRule;
import sleeper.configuration.properties.instance.InstanceProperties;

import java.util.Collections;
import java.util.Locale;

import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.cdk.Utils.shouldDeployPaused;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.QUERY_WARM_LAMBDA_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
import static sleeper.configuration.properties.instance.QueryProperty.QUERY_PROCESSOR_LAMBDA_MEMORY_IN_MB;
import static sleeper.configuration.properties.instance.QueryProperty.QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS;
import static sleeper.configuration.properties.instance.QueryProperty.QUERY_WARM_LAMBDA_EXECUTION_PERIOD_IN_MINUTES;

/*
* A {@link NestedStack} to handle keeping lambdas warm. This consists of a {@link Rule} that runs periodically triggering
* a lambda {@link Function} to create the queries that are placed on the Query {@link Queue} for processing.
* This will trigger the query lambdas thus keeping them warm.
*/
public class KeepLambdaWarmStack extends NestedStack {

public KeepLambdaWarmStack(Construct scope,
String id,
InstanceProperties instanceProperties,
BuiltJars jars,
CoreStacks coreStacks,
QueryQueueStack queryQueueStack) {
super(scope, id);

String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "warm-query-executor"));

IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName());
LambdaCode queryJar = jars.lambdaCode(BuiltJar.QUERY, jarsBucket);

// Keep lambda warm function
IFunction handler = queryJar.buildFunction(this, "WarmQueryExecutorLambda", builder -> builder
.functionName(functionName)
.description("Sends a message to query-executor lambda in order for it to stay warm")
.runtime(Runtime.JAVA_11)
.memorySize(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.query.lambda.WarmQueryExecutorLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
.logGroup(createLambdaLogGroup(this, id + "LogGroup", functionName, instanceProperties)));

// Cloudwatch rule to trigger this lambda
Rule rule = Rule.Builder
.create(this, "QueryExecutionPeriodicTrigger")
.ruleName(SleeperScheduleRule.QUERY_WARM_LAMBDA.buildRuleName(instanceProperties))
.description("A rule to periodically trigger the query execution lambda")
.enabled(!shouldDeployPaused(this))
.schedule(Schedule.rate(Duration.minutes(instanceProperties
.getInt(QUERY_WARM_LAMBDA_EXECUTION_PERIOD_IN_MINUTES))))
.targets(Collections.singletonList(LambdaFunction.Builder
.create(handler)
.build()))
.build();

queryQueueStack.grantSendMessages(handler);

coreStacks.grantReadInstanceConfig(handler);
coreStacks.grantReadTablesAndData(handler);

instanceProperties.set(QUERY_WARM_LAMBDA_CLOUDWATCH_RULE, rule.getRuleName());
CfnOutputProps ruleArn = new CfnOutputProps.Builder()
.value(rule.getRuleArn())
.exportName(instanceProperties.get(ID) + "-" + "QueryExecutionPeriodicTriggerRuleARN")
.build();
new CfnOutput(this, "QueryExecutionPeriodicTriggerRuleARN", ruleArn);

Utils.addStackTagIfSet(this, instanceProperties);
}
}
1 change: 0 additions & 1 deletion java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,6 @@ public QueryStack(Construct scope,
instanceProperties.set(QUERY_TRACKER_TABLE_NAME, queryTrackingTable.getTableName());

LambdaCode queryJar = jars.lambdaCode(BuiltJar.QUERY, jarsBucket);

queryExecutorLambda = setupQueryExecutorLambda(coreStacks, queryQueueStack, instanceProperties, queryJar, jarsBucket, queryTrackingTable);
leafPartitionQueryLambda = setupLeafPartitionQueryQueueAndLambda(coreStacks, instanceProperties, queryJar, jarsBucket, queryTrackingTable);
Utils.addStackTagIfSet(this, instanceProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.INGEST_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.QUERY_RESULTS_BUCKET;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.QUERY_WARM_LAMBDA_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_RULE;
import static sleeper.configuration.properties.instance.CommonProperty.ACCOUNT;
import static sleeper.configuration.properties.instance.CommonProperty.ECR_REPOSITORY_PREFIX;
Expand Down Expand Up @@ -149,6 +150,7 @@ void shouldGenerateDefaultInstancePropertiesFromInstanceId() {
expected.set(INGEST_CLOUDWATCH_RULE, "test-instance-IngestTasksCreationRule");
expected.set(INGEST_BATCHER_JOB_CREATION_CLOUDWATCH_RULE, "test-instance-IngestBatcherJobCreationRule");
expected.set(TABLE_METRICS_RULE, "test-instance-MetricsPublishRule");
expected.set(QUERY_WARM_LAMBDA_CLOUDWATCH_RULE, "test-instance-QueryWarmLambdaRule");

assertThat(properties).isEqualTo(expected);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.INGEST_BATCHER_JOB_CREATION_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.INGEST_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.PARTITION_SPLITTING_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.QUERY_WARM_LAMBDA_CLOUDWATCH_RULE;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_RULE;
import static sleeper.configuration.properties.instance.CommonProperty.ID;

Expand All @@ -55,6 +56,9 @@ public class SleeperScheduleRule {
public static final SleeperScheduleRule INGEST_BATCHER_JOB_CREATION = add(
INGEST_BATCHER_JOB_CREATION_CLOUDWATCH_RULE, "%s-IngestBatcherJobCreationRule");
public static final SleeperScheduleRule TABLE_METRICS = add(TABLE_METRICS_RULE, "%s-MetricsPublishRule");
// Rule that triggers the query lambdas to keep warm
public static final SleeperScheduleRule QUERY_WARM_LAMBDA = add(
QUERY_WARM_LAMBDA_CLOUDWATCH_RULE, "%s-QueryWarmLambdaRule");

private final InstanceProperty property;
private final String nameFormat;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.description("The name of the table responsible for tracking query progress.")
.propertyGroup(InstancePropertyGroup.QUERY)
.build();
CdkDefinedInstanceProperty QUERY_WARM_LAMBDA_CLOUDWATCH_RULE = Index.propertyBuilder("sleeper.query.warm.lambda.rule")
.description("The name of the CloudWatch rule to trigger the query lambda to keep it warm.")
.propertyGroup(InstancePropertyGroup.QUERY)
.build();
CdkDefinedInstanceProperty LEAF_PARTITION_QUERY_QUEUE_URL = Index.propertyBuilder("sleeper.query.leaf.partition.queue.url")
.description("The URL of the queue responsible for sending a leaf partition query to sleeper.")
.propertyGroup(InstancePropertyGroup.QUERY)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,14 @@ public interface QueryProperty {
"value given below is 128KiB. This value can be overridden using the query config.")
.defaultValue("" + (128 * 1024)) // 128 KiB
.propertyGroup(InstancePropertyGroup.QUERY).build();
UserDefinedInstanceProperty QUERY_WARM_LAMBDA_EXECUTION_PERIOD_IN_MINUTES = Index
.propertyBuilder("sleeper.query.warm.lambda.period.minutes")
.description("The rate at which the query lambda runs to keep it warm (in minutes, must be >=1). " +
" This only applies when the KeepLambdaWarmStack is enabled")
.defaultValue("5")
.validationPredicate(Utils::isPositiveInteger)
.propertyGroup(InstancePropertyGroup.QUERY)
.runCdkDeployWhenChanged(true).build();

static List<UserDefinedInstanceProperty> getAll() {
return Index.INSTANCE.getAll();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import sleeper.query.output.ResultsOutput;
import sleeper.query.output.ResultsOutputConstants;
import sleeper.query.output.ResultsOutputInfo;
import sleeper.query.runner.output.NoResultsOutput;
import sleeper.query.runner.output.S3ResultsOutput;
import sleeper.query.runner.output.SQSResultsOutput;
import sleeper.query.runner.output.WebSocketResultsOutput;
Expand All @@ -53,6 +54,7 @@

import static sleeper.configuration.properties.instance.QueryProperty.QUERY_PROCESSOR_LAMBDA_RECORD_RETRIEVAL_THREADS;
import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME;
import static sleeper.query.runner.output.NoResultsOutput.NO_RESULTS_OUTPUT;

public class SqsLeafPartitionQueryProcessor {
private static final Logger LOGGER = LoggerFactory.getLogger(SqsLeafPartitionQueryProcessor.class);
Expand Down Expand Up @@ -131,6 +133,8 @@ private ResultsOutput getResultsOutput(TableProperties tableProperties, Map<Stri
return new S3ResultsOutput(instanceProperties, tableProperties, resultsPublisherConfig);
} else if (WebSocketResultsOutput.DESTINATION_NAME.equals(destination)) {
return new WebSocketResultsOutput(resultsPublisherConfig);
} else if (NO_RESULTS_OUTPUT.equals(destination)) {
return new NoResultsOutput();
ab295382 marked this conversation as resolved.
Show resolved Hide resolved
} else {
LOGGER.info("Unknown results publisher from config {}", resultsPublisherConfig);
return (query, results) -> new ResultsOutputInfo(0, Collections.emptyList(),
Expand Down
Loading
Loading