Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1353 - Table metrics for all tables #1380

Merged
merged 21 commits into from
Oct 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
47976b8
Start testing code from TableMetricsLambda
patchwork01 Sep 28, 2023
e8be3c0
Adjust test helpers to allow for multiple tables
patchwork01 Sep 28, 2023
5581726
Add table metrics test for one table with multiple partitions
kr565370 Sep 28, 2023
293bf13
Add table metrics test for one file in one partition
kr565370 Sep 28, 2023
d70eae6
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
kr565370 Sep 28, 2023
e4fc448
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
patchwork01 Sep 28, 2023
8a8bc28
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
patchwork01 Sep 28, 2023
f65c930
Move tests under nested class SingleTable
patchwork01 Sep 28, 2023
bfabbea
Add test for multiple files with different record counts
kr565370 Sep 28, 2023
6e85f57
Simplify test for multiple files with different record counts
kr565370 Sep 28, 2023
989dea5
Add test for multiple partitions with different file counts
kr565370 Sep 28, 2023
fcb3eaa
Add test for multiple partitions where one leaf has no files
kr565370 Sep 28, 2023
e988bbf
Test multiple tables
patchwork01 Sep 28, 2023
d6e45e6
Remove extra test helper
patchwork01 Sep 28, 2023
d33b180
Use TableMetrics in TableMetricsLambda
patchwork01 Sep 28, 2023
03797b0
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
kr565370 Sep 28, 2023
c2d3233
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
kr565370 Sep 29, 2023
eb4b42c
Create and use nested stack for TableMetricsStack
kr565370 Sep 29, 2023
776cfb3
Merge branch '1352-s3-state-store-for-all-tables' into 1353-table-met…
patchwork01 Sep 29, 2023
ea62424
Read TableMetrics as a stream
patchwork01 Sep 29, 2023
90a2a2b
Avoid collecting TableMetrics for all tables at once
patchwork01 Sep 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import sleeper.cdk.stack.S3StateStoreStack;
import sleeper.cdk.stack.StateStoreStacks;
import sleeper.cdk.stack.TableDataStack;
import sleeper.cdk.stack.TableMetricsStack;
import sleeper.cdk.stack.TableStack;
import sleeper.cdk.stack.TopicStack;
import sleeper.cdk.stack.VpcStack;
Expand Down Expand Up @@ -128,6 +129,7 @@ public void create() {
stateStoreStacks = new StateStoreStacks(
new DynamoDBStateStoreStack(this, "DynamoDBStateStore", instanceProperties),
new S3StateStoreStack(this, "S3StateStore", instanceProperties, dataStack));
new TableMetricsStack(this, "TableMetrics", instanceProperties, jars, stateStoreStacks);
tableStack = new TableStack(this, "Table", instanceProperties, jars, dataStack, stateStoreStacks);

// Stack for Athena analytics
Expand Down
78 changes: 78 additions & 0 deletions java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright 2022-2023 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package sleeper.cdk.stack;

import software.amazon.awscdk.Duration;
import software.amazon.awscdk.NestedStack;
import software.amazon.awscdk.services.events.Rule;
import software.amazon.awscdk.services.events.RuleTargetInput;
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.constructs.Construct;

import sleeper.cdk.Utils;
import sleeper.cdk.jars.BuiltJar;
import sleeper.cdk.jars.BuiltJars;
import sleeper.cdk.jars.LambdaCode;
import sleeper.configuration.properties.instance.InstanceProperties;

import java.util.Collections;

import static sleeper.cdk.Utils.shouldDeployPaused;
import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
import static sleeper.configuration.properties.instance.CommonProperty.LOG_RETENTION_IN_DAYS;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.TABLE_METRICS_RULES;

public class TableMetricsStack extends NestedStack {
public TableMetricsStack(Construct scope,
String id,
InstanceProperties instanceProperties,
BuiltJars jars,
StateStoreStacks stateStoreStacks) {
super(scope, id);
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", instanceProperties.get(JARS_BUCKET));
IBucket configBucket = Bucket.fromBucketName(this, "ConfigBucket", instanceProperties.get(CONFIG_BUCKET));
LambdaCode metricsJar = jars.lambdaCode(BuiltJar.METRICS, jarsBucket);
// Metrics generation and publishing
IFunction tableMetricsPublisher = metricsJar.buildFunction(this, "MetricsPublisher", builder -> builder
.description("Generates metrics for a Sleeper table based on info in its state store, and publishes them to CloudWatch")
.runtime(Runtime.JAVA_11)
.handler("sleeper.metrics.TableMetricsLambda::handleRequest")
.memorySize(256)
.timeout(Duration.seconds(60))
.logRetention(Utils.getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))));

configBucket.grantRead(tableMetricsPublisher);
stateStoreStacks.grantReadActiveFilesAndPartitions(tableMetricsPublisher);

Rule rule = Rule.Builder.create(this, "MetricsPublishSchedule")
.schedule(Schedule.rate(Duration.minutes(1)))
.targets(Collections.singletonList(
LambdaFunction.Builder.create(tableMetricsPublisher)
.event(RuleTargetInput.fromText(configBucket.getBucketName()))
.build()
))
.enabled(!shouldDeployPaused(this))
.build();
instanceProperties.set(TABLE_METRICS_RULES, rule.getRuleName());
}
}
48 changes: 4 additions & 44 deletions java/cdk/src/main/java/sleeper/cdk/stack/TableStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import software.amazon.awscdk.Duration;
import software.amazon.awscdk.NestedStack;
import software.amazon.awscdk.customresources.Provider;
import software.amazon.awscdk.services.events.Rule;
import software.amazon.awscdk.services.events.RuleTargetInput;
import software.amazon.awscdk.services.events.Schedule;
import software.amazon.awscdk.services.events.targets.LambdaFunction;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.Runtime;
import software.amazon.awscdk.services.s3.Bucket;
Expand All @@ -41,17 +37,14 @@
import sleeper.configuration.properties.table.TableProperties;

import java.io.File;
import java.util.Collections;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;

import static sleeper.cdk.Utils.shouldDeployPaused;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
import static sleeper.configuration.properties.instance.CommonProperty.LOG_RETENTION_IN_DAYS;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.CONFIG_BUCKET;
import static sleeper.configuration.properties.instance.SystemDefinedInstanceProperty.TABLE_METRICS_RULES;
import static sleeper.configuration.properties.table.TableProperty.SPLIT_POINTS_FILE;
import static sleeper.configuration.properties.table.TableProperty.SPLIT_POINTS_KEY;
import static sleeper.configuration.properties.table.TableProperty.TABLE_NAME;
Expand All @@ -68,7 +61,6 @@ public TableStack(
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", instanceProperties.get(JARS_BUCKET));
IBucket configBucket = Bucket.fromBucketName(this, "ConfigBucket", instanceProperties.get(CONFIG_BUCKET));
LambdaCode jar = jars.lambdaCode(BuiltJar.CUSTOM_RESOURCES, jarsBucket);
LambdaCode metricsJar = jars.lambdaCode(BuiltJar.METRICS, jarsBucket);

String functionName = Utils.truncateTo64Characters(String.join("-", "sleeper",
instanceProperties.get(ID).toLowerCase(Locale.ROOT), "sleeper-table"));
Expand All @@ -93,26 +85,22 @@ public TableStack(

stateStoreStacks.grantReadWriteAllFilesAndPartitions(sleeperTableProvider.getOnEventHandler());

createTables(scope, instanceProperties, sleeperTableProvider, stateStoreStacks, configBucket, metricsJar);
createTables(scope, instanceProperties, sleeperTableProvider, configBucket);
Utils.addStackTagIfSet(this, instanceProperties);
}

private void createTables(Construct scope,
InstanceProperties instanceProperties,
Provider tablesProvider,
StateStoreStacks stateStoreStacks,
IBucket configBucket,
LambdaCode metricsJar) {
IBucket configBucket) {
Utils.getAllTableProperties(instanceProperties, scope).forEach(tableProperties ->
createTable(instanceProperties, tableProperties, tablesProvider, stateStoreStacks, configBucket, metricsJar));
createTable(instanceProperties, tableProperties, tablesProvider, configBucket));
}

private void createTable(InstanceProperties instanceProperties,
TableProperties tableProperties,
Provider sleeperTablesProvider,
StateStoreStacks stateStoreStacks,
IBucket configBucket,
LambdaCode metricsJar) {
IBucket configBucket) {
String tableName = tableProperties.get(TABLE_NAME);

BucketDeployment splitPoints = null;
Expand Down Expand Up @@ -148,33 +136,5 @@ private void createTable(InstanceProperties instanceProperties,
if (splitPoints != null) {
tableInitialisation.getNode().addDependency(splitPoints);
}

// Metrics generation and publishing
IFunction tableMetricsPublisher = metricsJar.buildFunction(this, tableName + "MetricsPublisher", builder -> builder
.description("Generates metrics for a Sleeper table based on info in its state store, and publishes them to CloudWatch")
.runtime(Runtime.JAVA_11)
.handler("sleeper.metrics.TableMetricsLambda::handleRequest")
.memorySize(256)
.timeout(Duration.seconds(60))
.logRetention(Utils.getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))));

configBucket.grantRead(tableMetricsPublisher);
stateStoreStacks.grantReadActiveFilesAndPartitions(tableMetricsPublisher);

Rule rule = Rule.Builder.create(this, tableName + "MetricsPublishSchedule")
.schedule(Schedule.rate(Duration.minutes(1)))
.targets(Collections.singletonList(
LambdaFunction.Builder.create(tableMetricsPublisher)
.event(RuleTargetInput.fromText(configBucket.getBucketName() + "|" + tableName))
.build()
))
.enabled(!shouldDeployPaused(this))
.build();
if (null == instanceProperties.get(TABLE_METRICS_RULES) || instanceProperties.get(TABLE_METRICS_RULES).isEmpty()) {
instanceProperties.set(TABLE_METRICS_RULES, rule.getRuleName());
} else {
String rulesList = instanceProperties.get(TABLE_METRICS_RULES);
instanceProperties.set(TABLE_METRICS_RULES, rulesList + "," + rule.getRuleName());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.range.Range;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileInfo;
import sleeper.core.statestore.StateStore;

Expand All @@ -35,15 +36,27 @@ public class StateStoreTestBuilder {
private final List<Partition> partitions;
private final List<FileInfo> files = new ArrayList<>();

private StateStoreTestBuilder(PartitionTree tree) {
this.tree = tree;
this.partitions = tree.getAllPartitions();
}

private StateStoreTestBuilder(PartitionsBuilder partitionsBuilder) {
tree = partitionsBuilder.buildTree();
partitions = partitionsBuilder.buildList();
this(partitionsBuilder.buildTree());
}

public static StateStoreTestBuilder from(PartitionsBuilder partitionsBuilder) {
return new StateStoreTestBuilder(partitionsBuilder);
}

public static StateStoreTestBuilder withSinglePartition(Schema schema) {
return withSinglePartition(schema, "root");
}

public static StateStoreTestBuilder withSinglePartition(Schema schema, String partitionId) {
return from(new PartitionsBuilder(schema).singlePartition(partitionId));
}

public StateStoreTestBuilder singleFileInEachLeafPartitionWithRecords(long records) {
return addFiles(partitions.stream().filter(Partition::isLeafPartition)
.map(partition -> partitionSingleFile(partition, records)));
Expand Down
24 changes: 23 additions & 1 deletion java/metrics/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
~ limitations under the License.
-->

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>aws</artifactId>
<groupId>sleeper</groupId>
Expand Down Expand Up @@ -47,6 +48,27 @@
<artifactId>tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>configuration</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>core</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>statestore</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>

<build>
Expand Down
Loading