Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validate optional stacks #3296

Merged
merged 23 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
7869865
Relocate the deploy config into the properties
rtjd6554 Sep 17, 2024
b9d1946
Relocation of testing deploy to properties
rtjd6554 Sep 17, 2024
341bee3
Generation of OptionalStack class and defaultValue test
rtjd6554 Sep 17, 2024
eb7a7f9
Validate optional stacks
patchwork01 Sep 17, 2024
00d46ed
Use SleeperPropertyValues.readList for validation
patchwork01 Sep 17, 2024
456334b
Use OptionalStack enum in CDK
patchwork01 Sep 17, 2024
e86b219
Fix for compilatin error
rtjd6554 Sep 17, 2024
d1e5abe
Update for SystemTestInstance usage of enum
rtjd6554 Sep 17, 2024
794c02f
Adjust enum usage with SystemTest Instances
rtjd6554 Sep 17, 2024
b50654b
Adjust for IngestBatcherStore usage of OptionalStack
rtjd6554 Sep 17, 2024
0695e12
Adjust usages for DockerImageConfiguration
rtjd6554 Sep 17, 2024
8667f0c
Fix compilation in CleanUpDeletedSleeperInstancesTest
patchwork01 Sep 17, 2024
fdc39a2
Pass OptionalStack to DockerImageConfiguration from AdminClientProper…
patchwork01 Sep 17, 2024
298d74a
Change type of field StacksForDockerUpload.stacks
patchwork01 Sep 17, 2024
c9b6e64
Pass OptionalStack into StacksForDockerUpload
patchwork01 Sep 17, 2024
eac888a
Only use enum in DockerImageConfiguration
patchwork01 Sep 17, 2024
26f9dfd
Use OptionalStack in DeployDockerInstance
patchwork01 Sep 17, 2024
0230a8e
Update test usages of OptionalStack
patchwork01 Sep 17, 2024
d9b6181
Refactor SystemTestOptionalStacks
patchwork01 Sep 17, 2024
3ffdc43
Use OptionalStack to enable/disable in system tests
patchwork01 Sep 17, 2024
adec1ff
Test case insensitivity in OptionalStackTest
patchwork01 Sep 17, 2024
9246c61
Merge branch 'develop' into 3204-validate-optional-stacks
patchwork01 Sep 17, 2024
6e7bf2e
Update description of optional stacks property
patchwork01 Sep 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion example/basic/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@ sleeper.jars.bucket=the name of the bucket containing your jars, e.g. sleeper-<i
# instance is destroyed.
sleeper.retain.infra.after.destroy=true

# The optional stacks to deploy.
# The optional stacks to deploy. Not case sensitive.
# Valid values: [IngestStack, IngestBatcherStack, EmrServerlessBulkImportStack, EmrBulkImportStack,
# PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack,
# AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack,
# DashboardStack, TableMetricsStack]
sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack

# The AWS account number. This is the AWS account that the instance will be deployed to.
Expand Down
6 changes: 5 additions & 1 deletion example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ sleeper.stack.tag.name=DeploymentStack
# instance is destroyed.
sleeper.retain.infra.after.destroy=true

# The optional stacks to deploy.
# The optional stacks to deploy. Not case sensitive.
# Valid values: [IngestStack, IngestBatcherStack, EmrServerlessBulkImportStack, EmrBulkImportStack,
# PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack,
# AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack,
# DashboardStack, TableMetricsStack]
sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack

# The AWS account number. This is the AWS account that the instance will be deployed to.
Expand Down
70 changes: 24 additions & 46 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,13 @@
import sleeper.cdk.stack.bulkimport.EmrStudioStack;
import sleeper.cdk.stack.bulkimport.PersistentEmrBulkImportStack;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.properties.validation.OptionalStack;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.Set;

import static java.util.stream.Collectors.toUnmodifiableSet;
import static sleeper.configuration.properties.instance.CommonProperty.ACCOUNT;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
import static sleeper.configuration.properties.instance.CommonProperty.JARS_BUCKET;
Expand Down Expand Up @@ -108,35 +109,12 @@ public SleeperCdkApp(App app, String id, StackProps props, InstanceProperties in
this.jars = jars;
}

private static final List<String> BULK_IMPORT_STACK_NAMES = Stream.of(
EmrBulkImportStack.class,
EmrServerlessBulkImportStack.class,
PersistentEmrBulkImportStack.class,
EksBulkImportStack.class)
.map(Class::getSimpleName).collect(Collectors.toList());

private static final List<String> EMR_BULK_IMPORT_STACK_NAMES = Stream.of(
EmrBulkImportStack.class,
EmrServerlessBulkImportStack.class,
PersistentEmrBulkImportStack.class)
.map(Class::getSimpleName).collect(Collectors.toList());

public static final List<String> INGEST_STACK_NAMES = Stream.of(
IngestStack.class,
EmrBulkImportStack.class,
EmrServerlessBulkImportStack.class,
PersistentEmrBulkImportStack.class,
EksBulkImportStack.class)
.map(Class::getSimpleName).collect(Collectors.toList());
public static final List<String> QUERY_STACK_NAMES = Stream.of(
QueryStack.class,
WebSocketQueryStack.class)
.map(Class::getSimpleName).collect(Collectors.toList());

@SuppressWarnings("checkstyle:methodlength")
public void create() {
// Optional stacks to be included
List<String> optionalStacks = instanceProperties.getList(OPTIONAL_STACKS);
Set<OptionalStack> optionalStacks = instanceProperties
.streamEnumList(OPTIONAL_STACKS, OptionalStack.class)
.collect(toUnmodifiableSet());

List<IMetric> errorMetrics = new ArrayList<>();
// Stack for Checking VPC configuration
Expand Down Expand Up @@ -173,25 +151,25 @@ public void create() {
instanceProperties, jars, coreStacks, transactionLogStateStoreStack, topicStack.getTopic(), errorMetrics);
new TransactionLogTransactionStack(this, "TransactionLogTransaction",
instanceProperties, jars, coreStacks, transactionLogStateStoreStack, topicStack.getTopic(), errorMetrics);
if (optionalStacks.contains(TableMetricsStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.TableMetricsStack)) {
new TableMetricsStack(this, "TableMetrics", instanceProperties, jars, topicStack.getTopic(), coreStacks, errorMetrics);
}

// Stack for Athena analytics
if (optionalStacks.contains(AthenaStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.AthenaStack)) {
new AthenaStack(this, "Athena", instanceProperties, jars, coreStacks);
}

if (BULK_IMPORT_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
if (OptionalStack.BULK_IMPORT_STACKS.stream().anyMatch(optionalStacks::contains)) {
bulkImportBucketStack = new BulkImportBucketStack(this, "BulkImportBucket", instanceProperties, coreStacks);
}
if (EMR_BULK_IMPORT_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
if (OptionalStack.EMR_BULK_IMPORT_STACKS.stream().anyMatch(optionalStacks::contains)) {
emrBulkImportCommonStack = new CommonEmrBulkImportStack(this, "BulkImportEMRCommon",
instanceProperties, coreStacks, bulkImportBucketStack);
}

// Stack to run bulk import jobs via EMR Serverless
if (optionalStacks.contains(EmrServerlessBulkImportStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.EmrServerlessBulkImportStack)) {
emrServerlessBulkImportStack = new EmrServerlessBulkImportStack(this, "BulkImportEMRServerless",
instanceProperties, jars,
topicStack.getTopic(),
Expand All @@ -200,12 +178,12 @@ public void create() {
errorMetrics);

// Stack to created EMR studio to be used to access EMR Serverless
if (optionalStacks.contains(EmrStudioStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.EmrStudioStack)) {
new EmrStudioStack(this, "EmrStudio", instanceProperties);
}
}
// Stack to run bulk import jobs via EMR (one cluster per bulk import job)
if (optionalStacks.contains(EmrBulkImportStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.EmrBulkImportStack)) {
emrBulkImportStack = new EmrBulkImportStack(this, "BulkImportEMR",
instanceProperties, jars,
topicStack.getTopic(),
Expand All @@ -216,7 +194,7 @@ public void create() {
}

// Stack to run bulk import jobs via a persistent EMR cluster
if (optionalStacks.contains(PersistentEmrBulkImportStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.PersistentEmrBulkImportStack)) {
persistentEmrBulkImportStack = new PersistentEmrBulkImportStack(this, "BulkImportPersistentEMR",
instanceProperties, jars,
topicStack.getTopic(),
Expand All @@ -227,7 +205,7 @@ public void create() {
}

// Stack to run bulk import jobs via EKS
if (optionalStacks.contains(EksBulkImportStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.EksBulkImportStack)) {
eksBulkImportStack = new EksBulkImportStack(this, "BulkImportEKS",
instanceProperties, jars,
topicStack.getTopic(),
Expand All @@ -237,7 +215,7 @@ public void create() {
}

// Stack to garbage collect old files
if (optionalStacks.contains(GarbageCollectorStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.GarbageCollectorStack)) {
new GarbageCollectorStack(this,
"GarbageCollector",
instanceProperties, jars,
Expand All @@ -246,7 +224,7 @@ public void create() {
errorMetrics);
}
// Stack for containers for compactions and splitting compactions
if (optionalStacks.contains(CompactionStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.CompactionStack)) {
compactionStack = new CompactionStack(this,
"Compaction",
instanceProperties, jars,
Expand All @@ -256,7 +234,7 @@ public void create() {
}

// Stack to split partitions
if (optionalStacks.contains(PartitionSplittingStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.PartitionSplittingStack)) {
partitionSplittingStack = new PartitionSplittingStack(this,
"PartitionSplitting",
instanceProperties, jars,
Expand All @@ -267,7 +245,7 @@ public void create() {

QueryStack queryStack = null;
// Stack to execute queries
if (QUERY_STACK_NAMES.stream().anyMatch(optionalStacks::contains)) {
if (OptionalStack.QUERY_STACKS.stream().anyMatch(optionalStacks::contains)) {
queryQueueStack = new QueryQueueStack(this, "QueryQueue",
instanceProperties,
topicStack.getTopic(), coreStacks,
Expand All @@ -279,15 +257,15 @@ public void create() {
coreStacks, queryQueueStack,
errorMetrics);
// Stack to execute queries using the web socket API
if (optionalStacks.contains(WebSocketQueryStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.WebSocketQueryStack)) {
new WebSocketQueryStack(this,
"WebSocketQuery",
instanceProperties, jars,
coreStacks, queryQueueStack, queryStack);
}
}
// Stack for ingest jobs
if (optionalStacks.contains(IngestStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.IngestStack)) {
ingestStack = new IngestStack(this,
"Ingest",
instanceProperties, jars,
Expand All @@ -300,7 +278,7 @@ public void create() {
ingestStacks = new IngestStacks(ingestStack, emrBulkImportStack, persistentEmrBulkImportStack, eksBulkImportStack, emrServerlessBulkImportStack);

// Stack to batch up files to ingest and create jobs
if (optionalStacks.contains(IngestBatcherStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.IngestBatcherStack)) {
ingestBatcherStack = new IngestBatcherStack(this, "IngestBatcher",
instanceProperties, jars,
topicStack.getTopic(),
Expand All @@ -309,7 +287,7 @@ public void create() {
errorMetrics);
}

if (optionalStacks.contains(DashboardStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.DashboardStack)) {
new DashboardStack(this,
"Dashboard",
ingestStack,
Expand All @@ -319,7 +297,7 @@ public void create() {
errorMetrics);
}

if (optionalStacks.contains(KeepLambdaWarmStack.class.getSimpleName())) {
if (optionalStacks.contains(OptionalStack.KeepLambdaWarmStack)) {
new KeepLambdaWarmStack(this,
"KeepLambdaWarmExecution",
instanceProperties,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,28 @@
import sleeper.clients.util.cdk.CdkCommand;
import sleeper.clients.util.cdk.InvokeCdkForInstance;
import sleeper.clients.util.console.ConsoleOutput;
import sleeper.configuration.properties.SleeperPropertyValues;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.properties.instance.InstanceProperty;
import sleeper.configuration.properties.local.SaveLocalProperties;
import sleeper.configuration.properties.table.S3TableProperties;
import sleeper.configuration.properties.table.TableProperties;
import sleeper.configuration.properties.table.TablePropertiesProvider;
import sleeper.configuration.properties.validation.OptionalStack;
import sleeper.core.statestore.StateStore;
import sleeper.core.table.TableNotFoundException;
import sleeper.statestore.StateStoreFactory;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Stream;

import static sleeper.configuration.properties.SleeperPropertyValues.readList;
import static java.util.function.Predicate.not;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static sleeper.configuration.properties.instance.CommonProperty.ID;
import static sleeper.configuration.properties.instance.CommonProperty.OPTIONAL_STACKS;
import static sleeper.configuration.properties.instance.InstanceProperties.loadPropertiesFromS3GivenInstanceId;
Expand Down Expand Up @@ -137,9 +139,13 @@ private boolean shouldUploadDockerImages(PropertiesDiff diff) {
return false;
}
PropertyDiff stackDiff = stackDiffOptional.get();
Set<String> stacksBefore = new HashSet<>(readList(stackDiff.getOldValue()));
Set<String> newStacks = new HashSet<>(readList(stackDiff.getNewValue()));
newStacks.removeAll(stacksBefore);
Set<OptionalStack> stacksBefore = SleeperPropertyValues
.streamEnumList(OPTIONAL_STACKS, stackDiff.getOldValue(), OptionalStack.class)
.collect(toUnmodifiableSet());
Set<OptionalStack> newStacks = SleeperPropertyValues
.streamEnumList(OPTIONAL_STACKS, stackDiff.getNewValue(), OptionalStack.class)
.filter(not(stacksBefore::contains))
.collect(toUnmodifiableSet());
return !dockerImageConfiguration.getStacksToDeploy(newStacks).isEmpty();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@
import sleeper.clients.util.EcrRepositoryCreator;
import sleeper.clients.util.cdk.CdkCommand;
import sleeper.clients.util.cdk.InvokeCdkForInstance;
import sleeper.configuration.deploy.DeployInstanceConfiguration;
import sleeper.configuration.deploy.DeployInstanceConfigurationFromTemplates;
import sleeper.configuration.properties.SleeperPropertiesValidationReporter;
import sleeper.configuration.properties.deploy.DeployInstanceConfiguration;
import sleeper.configuration.properties.deploy.DeployInstanceConfigurationFromTemplates;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.configuration.properties.local.SaveLocalProperties;
import sleeper.configuration.properties.table.TableProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package sleeper.clients.deploy;

import sleeper.configuration.properties.validation.OptionalStack;

import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -28,27 +30,27 @@
import static sleeper.clients.deploy.StackDockerImage.emrServerlessImage;

public class DockerImageConfiguration {
private static final Map<String, StackDockerImage> DEFAULT_DOCKER_IMAGE_BY_STACK = Map.of(
"IngestStack", dockerBuildImage("ingest"),
"EksBulkImportStack", dockerBuildImage("bulk-import-runner"),
"CompactionStack", dockerBuildxImage("compaction-job-execution"),
"EmrServerlessBulkImportStack", emrServerlessImage("bulk-import-runner-emr-serverless"));
private static final Map<OptionalStack, StackDockerImage> DEFAULT_DOCKER_IMAGE_BY_STACK = Map.of(
OptionalStack.IngestStack, dockerBuildImage("ingest"),
OptionalStack.EksBulkImportStack, dockerBuildImage("bulk-import-runner"),
OptionalStack.CompactionStack, dockerBuildxImage("compaction-job-execution"),
OptionalStack.EmrServerlessBulkImportStack, emrServerlessImage("bulk-import-runner-emr-serverless"));

private final Map<String, StackDockerImage> imageByStack;
private final Map<OptionalStack, StackDockerImage> imageByStack;

public DockerImageConfiguration() {
this(DEFAULT_DOCKER_IMAGE_BY_STACK);
}

public DockerImageConfiguration(Map<String, StackDockerImage> imageByStack) {
public DockerImageConfiguration(Map<OptionalStack, StackDockerImage> imageByStack) {
this.imageByStack = imageByStack;
}

public List<StackDockerImage> getStacksToDeploy(Collection<String> stacks) {
public List<StackDockerImage> getStacksToDeploy(Collection<OptionalStack> stacks) {
return getStacksToDeploy(stacks, List.of());
}

public List<StackDockerImage> getStacksToDeploy(Collection<String> stacks, List<StackDockerImage> extraDockerImages) {
public List<StackDockerImage> getStacksToDeploy(Collection<OptionalStack> stacks, List<StackDockerImage> extraDockerImages) {
return Stream.concat(
stacks.stream()
.map(this::getStackImage)
Expand All @@ -57,7 +59,7 @@ public List<StackDockerImage> getStacksToDeploy(Collection<String> stacks, List<
.collect(toUnmodifiableList());
}

public Optional<StackDockerImage> getStackImage(String stack) {
private Optional<StackDockerImage> getStackImage(OptionalStack stack) {
return Optional.ofNullable(imageByStack.get(stack));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import com.amazonaws.services.securitytoken.model.GetCallerIdentityRequest;
import software.amazon.awssdk.regions.providers.AwsRegionProvider;

import sleeper.configuration.deploy.DeployInstanceConfiguration;
import sleeper.configuration.properties.SleeperScheduleRule;
import sleeper.configuration.properties.deploy.DeployInstanceConfiguration;
import sleeper.configuration.properties.instance.InstanceProperties;

import java.nio.file.Path;
Expand Down
Loading
Loading