Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3202 - Make optional stacks redeployable except EKS #3467

Merged
merged 17 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion example/basic/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ sleeper.retain.infra.after.destroy=true
# PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack,
# AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack,
# DashboardStack, TableMetricsStack]
sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack
sleeper.optional.stacks=IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack,QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack,DashboardStack,TableMetricsStack

# The AWS account number. This is the AWS account that the instance will be deployed to.
sleeper.account=1234567890
Expand Down
2 changes: 1 addition & 1 deletion example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ sleeper.retain.infra.after.destroy=true
# PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack,
# AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack,
# DashboardStack, TableMetricsStack]
sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack
sleeper.optional.stacks=IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack,QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack,DashboardStack,TableMetricsStack

# The AWS account number. This is the AWS account that the instance will be deployed to.
sleeper.account=1234567890
Expand Down
14 changes: 9 additions & 5 deletions java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import sleeper.cdk.stack.IngestStatusStoreStack;
import sleeper.cdk.stack.InstanceRolesStack;
import sleeper.cdk.stack.KeepLambdaWarmStack;
import sleeper.cdk.stack.LoggingStack;
import sleeper.cdk.stack.ManagedPoliciesStack;
import sleeper.cdk.stack.PartitionSplittingStack;
import sleeper.cdk.stack.PropertiesStack;
Expand Down Expand Up @@ -117,15 +118,18 @@ public void create() {
.collect(toUnmodifiableSet());

List<IMetric> errorMetrics = new ArrayList<>();

LoggingStack loggingStack = new LoggingStack(this, "Logging", instanceProperties);

// Stack for Checking VPC configuration
new VpcStack(this, "Vpc", instanceProperties, jars);
new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack);

// Topic stack
TopicStack topicStack = new TopicStack(this, "Topic", instanceProperties);

// Stacks for tables
ManagedPoliciesStack policiesStack = new ManagedPoliciesStack(this, "Policies", instanceProperties);
TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, policiesStack, jars);
TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, loggingStack, policiesStack, jars);
TransactionLogStateStoreStack transactionLogStateStoreStack = new TransactionLogStateStoreStack(
this, "TransactionLogStateStore", instanceProperties, dataStack);
StateStoreStacks stateStoreStacks = new StateStoreStacks(
Expand All @@ -136,15 +140,15 @@ public void create() {
instanceProperties, policiesStack).getResources();
CompactionStatusStoreResources compactionStatusStore = new CompactionStatusStoreStack(this, "CompactionStatusStore",
instanceProperties, policiesStack).getResources();
ConfigBucketStack configBucketStack = new ConfigBucketStack(this, "Configuration", instanceProperties, policiesStack, jars);
ConfigBucketStack configBucketStack = new ConfigBucketStack(this, "Configuration", instanceProperties, loggingStack, policiesStack, jars);
TableIndexStack tableIndexStack = new TableIndexStack(this, "TableIndex", instanceProperties, policiesStack);
StateStoreCommitterStack stateStoreCommitterStack = new StateStoreCommitterStack(this, "StateStoreCommitter",
instanceProperties, jars,
configBucketStack, tableIndexStack,
loggingStack, configBucketStack, tableIndexStack,
stateStoreStacks, ingestStatusStore, compactionStatusStore,
policiesStack, topicStack.getTopic(), errorMetrics);
coreStacks = new CoreStacks(
configBucketStack, tableIndexStack, policiesStack, stateStoreStacks, dataStack,
loggingStack, configBucketStack, tableIndexStack, policiesStack, stateStoreStacks, dataStack,
stateStoreCommitterStack, ingestStatusStore, compactionStatusStore);

new TransactionLogSnapshotStack(this, "TransactionLogSnapshot",
Expand Down
11 changes: 6 additions & 5 deletions java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import java.util.Map;
import java.util.Objects;

import static sleeper.cdk.util.Utils.createLambdaLogGroup;
import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_CLASSES;
import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_MEMORY;
import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_TIMEOUT_IN_SECONDS;
Expand Down Expand Up @@ -78,7 +77,7 @@ public AthenaStack(
.removalPolicy(RemovalPolicy.DESTROY)
.build();

AutoDeleteS3Objects.autoDeleteForBucket(this, customResourcesJar, instanceProperties, spillBucket);
AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, coreStacks, customResourcesJar, spillBucket, bucketName);

IKey spillMasterKey = createSpillMasterKey(this, instanceProperties);

Expand Down Expand Up @@ -113,7 +112,7 @@ public AthenaStack(
.build();

for (String className : handlerClasses) {
IFunction handler = createConnector(className, instanceProperties, jarCode, env, memory, timeout);
IFunction handler = createConnector(className, instanceProperties, coreStacks, jarCode, env, memory, timeout);

jarsBucket.grantRead(handler);

Expand Down Expand Up @@ -151,7 +150,9 @@ private static IKey createSpillMasterKey(Construct scope, InstanceProperties ins
}
}

private IFunction createConnector(String className, InstanceProperties instanceProperties, LambdaCode jar, Map<String, String> env, Integer memory, Integer timeout) {
private IFunction createConnector(
String className, InstanceProperties instanceProperties, CoreStacks coreStacks,
LambdaCode jar, Map<String, String> env, Integer memory, Integer timeout) {
String instanceId = Utils.cleanInstanceId(instanceProperties);
String simpleClassName = getSimpleClassName(className);

Expand All @@ -162,7 +163,7 @@ private IFunction createConnector(String className, InstanceProperties instanceP
.memorySize(memory)
.timeout(Duration.seconds(timeout))
.runtime(Runtime.JAVA_11)
.logGroup(createLambdaLogGroup(this, simpleClassName + "AthenaCompositeHandlerLogGroup", functionName, instanceProperties))
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.handler(className)
.environment(env));

Expand Down
25 changes: 12 additions & 13 deletions java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@
import java.util.stream.Collectors;

import static sleeper.cdk.util.Utils.createAlarmForDlq;
import static sleeper.cdk.util.Utils.createLambdaLogGroup;
import static sleeper.cdk.util.Utils.shouldDeployPaused;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_AUTO_SCALING_GROUP;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_CLUSTER;
Expand Down Expand Up @@ -278,7 +277,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsTriggerLambda::handleRequest")
.environment(environmentVariables)
.reservedConcurrentExecutions(1)
.logGroup(createLambdaLogGroup(this, "CompactionJobsCreationTriggerLogGroup", triggerFunctionName, instanceProperties)));
.logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName)));

IFunction handlerFunction = jobCreatorJar.buildFunction(this, "CompactionJobsCreationHandler", builder -> builder
.functionName(functionName)
Expand All @@ -289,7 +288,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS(
.handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsLambda::handleRequest")
.environment(environmentVariables)
.reservedConcurrentExecutions(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_CONCURRENCY_RESERVED))
.logGroup(createLambdaLogGroup(this, "CompactionJobsCreationHandlerLogGroup", functionName, instanceProperties)));
.logGroup(coreStacks.getLogGroupByFunctionName(functionName)));

// Send messages from the trigger function to the handler function
Queue jobCreationQueue = sqsQueueForCompactionJobCreation(coreStacks, topic, errorMetrics);
Expand Down Expand Up @@ -396,17 +395,17 @@ private void ecsClusterForCompactionTasks(
FargateTaskDefinition fargateTaskDefinition = compactionFargateTaskDefinition();
String fargateTaskDefinitionFamily = fargateTaskDefinition.getFamily();
instanceProperties.set(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY, fargateTaskDefinitionFamily);
ContainerDefinitionOptions fargateContainerDefinitionOptions = createFargateContainerDefinition(containerImage,
environmentVariables, instanceProperties);
ContainerDefinitionOptions fargateContainerDefinitionOptions = createFargateContainerDefinition(
coreStacks, containerImage, environmentVariables, instanceProperties);
fargateTaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME,
fargateContainerDefinitionOptions);
grantPermissions.accept(fargateTaskDefinition);
} else {
Ec2TaskDefinition ec2TaskDefinition = compactionEC2TaskDefinition();
String ec2TaskDefinitionFamily = ec2TaskDefinition.getFamily();
instanceProperties.set(COMPACTION_TASK_EC2_DEFINITION_FAMILY, ec2TaskDefinitionFamily);
ContainerDefinitionOptions ec2ContainerDefinitionOptions = createEC2ContainerDefinition(containerImage,
environmentVariables, instanceProperties);
ContainerDefinitionOptions ec2ContainerDefinitionOptions = createEC2ContainerDefinition(
coreStacks, containerImage, environmentVariables, instanceProperties);
ec2TaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME, ec2ContainerDefinitionOptions);
grantPermissions.accept(ec2TaskDefinition);
addEC2CapacityProvider(cluster, vpc, coreStacks, taskCreatorJar);
Expand Down Expand Up @@ -551,20 +550,20 @@ private Ec2TaskDefinition compactionEC2TaskDefinition() {
}

private ContainerDefinitionOptions createFargateContainerDefinition(
ContainerImage image, Map<String, String> environment, InstanceProperties instanceProperties) {
CoreStacks coreStacks, ContainerImage image, Map<String, String> environment, InstanceProperties instanceProperties) {
String architecture = instanceProperties.get(COMPACTION_TASK_CPU_ARCHITECTURE).toUpperCase(Locale.ROOT);
CompactionTaskRequirements requirements = CompactionTaskRequirements.getArchRequirements(architecture, instanceProperties);
return ContainerDefinitionOptions.builder()
.image(image)
.environment(environment)
.cpu(requirements.getCpu())
.memoryLimitMiB(requirements.getMemoryLimitMiB())
.logging(Utils.createECSContainerLogDriver(this, instanceProperties, "FargateCompactionTasks"))
.logging(Utils.createECSContainerLogDriver(coreStacks, "FargateCompactionTasks"))
.build();
}

private ContainerDefinitionOptions createEC2ContainerDefinition(
ContainerImage image, Map<String, String> environment, InstanceProperties instanceProperties) {
CoreStacks coreStacks, ContainerImage image, Map<String, String> environment, InstanceProperties instanceProperties) {
String architecture = instanceProperties.get(COMPACTION_TASK_CPU_ARCHITECTURE).toUpperCase(Locale.ROOT);
CompactionTaskRequirements requirements = CompactionTaskRequirements.getArchRequirements(architecture, instanceProperties);
return ContainerDefinitionOptions.builder()
Expand All @@ -575,7 +574,7 @@ private ContainerDefinitionOptions createEC2ContainerDefinition(
// container allocation failing when we need almost entire resources
// of machine
.memoryLimitMiB((int) (requirements.getMemoryLimitMiB() * 0.95))
.logging(Utils.createECSContainerLogDriver(this, instanceProperties, "EC2CompactionTasks"))
.logging(Utils.createECSContainerLogDriver(coreStacks, "EC2CompactionTasks"))
.build();
}

Expand All @@ -593,7 +592,7 @@ private IFunction lambdaForCustomTerminationPolicy(CoreStacks coreStacks, Lambda
.description("Custom termination policy for ECS auto scaling group. Only terminate empty instances.")
.environment(environmentVariables)
.handler("sleeper.compaction.task.creation.SafeTerminationLambda::handleRequest")
.logGroup(createLambdaLogGroup(this, "CompactionTerminatorLogGroup", functionName, instanceProperties))
.logGroup(coreStacks.getLogGroupByFunctionName(functionName))
.memorySize(512)
.runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)
.timeout(Duration.seconds(10)));
Expand Down Expand Up @@ -626,7 +625,7 @@ private void lambdaToCreateCompactionTasks(
.handler("sleeper.compaction.task.creation.RunCompactionTasksLambda::eventHandler")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(1)
.logGroup(createLambdaLogGroup(this, "CompactionTasksCreatorLogGroup", functionName, instanceProperties)));
.logGroup(coreStacks.getLogGroupByFunctionName(functionName)));

// Grant this function permission to read from the S3 bucket
coreStacks.grantReadInstanceConfig(handler);
Expand Down
11 changes: 6 additions & 5 deletions java/cdk/src/main/java/sleeper/cdk/stack/ConfigBucketStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,13 @@ public class ConfigBucketStack extends NestedStack {
private final IBucket configBucket;

public ConfigBucketStack(
Construct scope, String id, InstanceProperties instanceProperties, ManagedPoliciesStack policiesStack, BuiltJars jars) {
Construct scope, String id, InstanceProperties instanceProperties,
LoggingStack loggingStack, ManagedPoliciesStack policiesStack, BuiltJars jars) {
super(scope, id);

String bucketName = String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "config");
configBucket = Bucket.Builder.create(this, "ConfigBucket")
.bucketName(String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "config"))
.bucketName(bucketName)
.versioned(false)
.encryption(BucketEncryption.S3_MANAGED)
.blockPublicAccess(BlockPublicAccess.BLOCK_ALL)
Expand All @@ -54,7 +55,7 @@ public ConfigBucketStack(

instanceProperties.set(CONFIG_BUCKET, configBucket.getBucketName());

AutoDeleteS3Objects.autoDeleteForBucket(this, jars, instanceProperties, configBucket);
AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, loggingStack, jars, configBucket, bucketName);

configBucket.grantRead(policiesStack.getDirectIngestPolicyForGrants());
configBucket.grantRead(policiesStack.getIngestByQueuePolicyForGrants());
Expand Down
21 changes: 20 additions & 1 deletion java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.awscdk.services.iam.IRole;
import software.amazon.awscdk.services.iam.ManagedPolicy;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.logs.ILogGroup;
import software.amazon.awscdk.services.sqs.IQueue;

import javax.annotation.Nullable;
Expand All @@ -29,6 +30,7 @@

public class CoreStacks {

private final LoggingStack loggingStack;
private final ConfigBucketStack configBucketStack;
private final TableIndexStack tableIndexStack;
private final ManagedPoliciesStack policiesStack;
Expand All @@ -38,11 +40,12 @@ public class CoreStacks {
private final IngestStatusStoreResources ingestStatusStore;
private final CompactionStatusStoreResources compactionStatusStore;

public CoreStacks(ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack,
public CoreStacks(LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack,
ManagedPoliciesStack policiesStack, StateStoreStacks stateStoreStacks, TableDataStack dataStack,
StateStoreCommitterStack stateStoreCommitterStack,
IngestStatusStoreResources ingestStatusStore,
CompactionStatusStoreResources compactionStatusStore) {
this.loggingStack = loggingStack;
this.configBucketStack = configBucketStack;
this.tableIndexStack = tableIndexStack;
this.policiesStack = policiesStack;
Expand All @@ -53,6 +56,22 @@ public CoreStacks(ConfigBucketStack configBucketStack, TableIndexStack tableInde
this.compactionStatusStore = compactionStatusStore;
}

public ILogGroup getLogGroupByFunctionName(String functionName) {
return loggingStack.getLogGroupByFunctionName(functionName);
}

public ILogGroup getProviderLogGroupByFunctionName(String functionName) {
return loggingStack.getProviderLogGroupByFunctionName(functionName);
}

public ILogGroup getLogGroupByECSLogDriverId(String id) {
return loggingStack.getLogGroupByECSLogDriverId(id);
}

public ILogGroup getLogGroupByStateMachineId(String id) {
return loggingStack.getLogGroupByStateMachineId(id);
}

public void grantReadInstanceConfig(IGrantable grantee) {
configBucketStack.grantRead(grantee);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import java.util.List;

import static sleeper.cdk.util.Utils.createAlarmForDlq;
import static sleeper.cdk.util.Utils.createLambdaLogGroup;
import static sleeper.cdk.util.Utils.shouldDeployPaused;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_CLOUDWATCH_RULE;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_DLQ_ARN;
Expand Down Expand Up @@ -92,7 +91,7 @@ public GarbageCollectorStack(
.reservedConcurrentExecutions(1)
.memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB))
.timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS)))
.logGroup(createLambdaLogGroup(this, "GarbageCollectorTriggerLogGroup", triggerFunctionName, instanceProperties)));
.logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName)));
IFunction handlerFunction = gcJar.buildFunction(this, "GarbageCollectorLambda", builder -> builder
.functionName(functionName)
.description("Scan the state store looking for files that need deleting and delete them")
Expand All @@ -102,7 +101,7 @@ public GarbageCollectorStack(
.handler("sleeper.garbagecollector.GarbageCollectorLambda::handleRequest")
.environment(Utils.createDefaultEnvironment(instanceProperties))
.reservedConcurrentExecutions(instanceProperties.getInt(GARBAGE_COLLECTOR_LAMBDA_CONCURRENCY_RESERVED))
.logGroup(createLambdaLogGroup(this, "GarbageCollectorLambdaLogGroup", functionName, instanceProperties)));
.logGroup(coreStacks.getLogGroupByFunctionName(functionName)));
instanceProperties.set(GARBAGE_COLLECTOR_LAMBDA_FUNCTION, triggerFunction.getFunctionName());

// Grant this function permission delete files from the data bucket and
Expand Down
Loading
Loading