diff --git a/example/basic/instance.properties b/example/basic/instance.properties index 7a2ad83532..1a32596fbd 100644 --- a/example/basic/instance.properties +++ b/example/basic/instance.properties @@ -23,7 +23,7 @@ sleeper.retain.infra.after.destroy=true # PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack, # AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack, # DashboardStack, TableMetricsStack] -sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack +sleeper.optional.stacks=IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack,QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack,DashboardStack,TableMetricsStack # The AWS account number. This is the AWS account that the instance will be deployed to. sleeper.account=1234567890 diff --git a/example/full/instance.properties b/example/full/instance.properties index 0794f902be..ec409e3423 100644 --- a/example/full/instance.properties +++ b/example/full/instance.properties @@ -28,7 +28,7 @@ sleeper.retain.infra.after.destroy=true # PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack, # AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack, # DashboardStack, TableMetricsStack] -sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack +sleeper.optional.stacks=IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack,QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack,DashboardStack,TableMetricsStack # The AWS account number. This is the AWS account that the instance will be deployed to. sleeper.account=1234567890 diff --git a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java index bdaff13d01..29360f4a5c 100644 --- a/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java +++ b/java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java @@ -42,6 +42,7 @@ import sleeper.cdk.stack.IngestStatusStoreStack; import sleeper.cdk.stack.InstanceRolesStack; import sleeper.cdk.stack.KeepLambdaWarmStack; +import sleeper.cdk.stack.LoggingStack; import sleeper.cdk.stack.ManagedPoliciesStack; import sleeper.cdk.stack.PartitionSplittingStack; import sleeper.cdk.stack.PropertiesStack; @@ -117,15 +118,18 @@ public void create() { .collect(toUnmodifiableSet()); List errorMetrics = new ArrayList<>(); + + LoggingStack loggingStack = new LoggingStack(this, "Logging", instanceProperties); + // Stack for Checking VPC configuration - new VpcStack(this, "Vpc", instanceProperties, jars); + new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack); // Topic stack TopicStack topicStack = new TopicStack(this, "Topic", instanceProperties); // Stacks for tables ManagedPoliciesStack policiesStack = new ManagedPoliciesStack(this, "Policies", instanceProperties); - TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, policiesStack, jars); + TableDataStack dataStack = new TableDataStack(this, "TableData", instanceProperties, loggingStack, policiesStack, jars); TransactionLogStateStoreStack transactionLogStateStoreStack = new TransactionLogStateStoreStack( this, "TransactionLogStateStore", instanceProperties, dataStack); StateStoreStacks stateStoreStacks = new StateStoreStacks( @@ -136,15 +140,15 @@ public void create() { instanceProperties, policiesStack).getResources(); CompactionStatusStoreResources compactionStatusStore = new CompactionStatusStoreStack(this, "CompactionStatusStore", instanceProperties, policiesStack).getResources(); - ConfigBucketStack configBucketStack = new ConfigBucketStack(this, "Configuration", instanceProperties, policiesStack, jars); + ConfigBucketStack configBucketStack = new ConfigBucketStack(this, "Configuration", instanceProperties, loggingStack, policiesStack, jars); TableIndexStack tableIndexStack = new TableIndexStack(this, "TableIndex", instanceProperties, policiesStack); StateStoreCommitterStack stateStoreCommitterStack = new StateStoreCommitterStack(this, "StateStoreCommitter", instanceProperties, jars, - configBucketStack, tableIndexStack, + loggingStack, configBucketStack, tableIndexStack, stateStoreStacks, ingestStatusStore, compactionStatusStore, policiesStack, topicStack.getTopic(), errorMetrics); coreStacks = new CoreStacks( - configBucketStack, tableIndexStack, policiesStack, stateStoreStacks, dataStack, + loggingStack, configBucketStack, tableIndexStack, policiesStack, stateStoreStacks, dataStack, stateStoreCommitterStack, ingestStatusStore, compactionStatusStore); new TransactionLogSnapshotStack(this, "TransactionLogSnapshot", diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java index e4600b040a..451b363b83 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/AthenaStack.java @@ -46,7 +46,6 @@ import java.util.Map; import java.util.Objects; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_CLASSES; import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_MEMORY; import static sleeper.core.properties.instance.AthenaProperty.ATHENA_COMPOSITE_HANDLER_TIMEOUT_IN_SECONDS; @@ -78,7 +77,7 @@ public AthenaStack( .removalPolicy(RemovalPolicy.DESTROY) .build(); - AutoDeleteS3Objects.autoDeleteForBucket(this, customResourcesJar, instanceProperties, spillBucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, coreStacks, customResourcesJar, spillBucket, bucketName); IKey spillMasterKey = createSpillMasterKey(this, instanceProperties); @@ -113,7 +112,7 @@ public AthenaStack( .build(); for (String className : handlerClasses) { - IFunction handler = createConnector(className, instanceProperties, jarCode, env, memory, timeout); + IFunction handler = createConnector(className, instanceProperties, coreStacks, jarCode, env, memory, timeout); jarsBucket.grantRead(handler); @@ -151,7 +150,9 @@ private static IKey createSpillMasterKey(Construct scope, InstanceProperties ins } } - private IFunction createConnector(String className, InstanceProperties instanceProperties, LambdaCode jar, Map env, Integer memory, Integer timeout) { + private IFunction createConnector( + String className, InstanceProperties instanceProperties, CoreStacks coreStacks, + LambdaCode jar, Map env, Integer memory, Integer timeout) { String instanceId = Utils.cleanInstanceId(instanceProperties); String simpleClassName = getSimpleClassName(className); @@ -162,7 +163,7 @@ private IFunction createConnector(String className, InstanceProperties instanceP .memorySize(memory) .timeout(Duration.seconds(timeout)) .runtime(Runtime.JAVA_11) - .logGroup(createLambdaLogGroup(this, simpleClassName + "AthenaCompositeHandlerLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .handler(className) .environment(env)); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java index 170b0a9efb..5a41a2b64f 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/CompactionStack.java @@ -95,7 +95,6 @@ import java.util.stream.Collectors; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_AUTO_SCALING_GROUP; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_CLUSTER; @@ -278,7 +277,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS( .handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsTriggerLambda::handleRequest") .environment(environmentVariables) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction handlerFunction = jobCreatorJar.buildFunction(this, "CompactionJobsCreationHandler", builder -> builder .functionName(functionName) @@ -289,7 +288,7 @@ private void lambdaToCreateCompactionJobsBatchedViaSQS( .handler("sleeper.compaction.job.creation.lambda.CreateCompactionJobsLambda::handleRequest") .environment(environmentVariables) .reservedConcurrentExecutions(instanceProperties.getInt(COMPACTION_JOB_CREATION_LAMBDA_CONCURRENCY_RESERVED)) - .logGroup(createLambdaLogGroup(this, "CompactionJobsCreationHandlerLogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); // Send messages from the trigger function to the handler function Queue jobCreationQueue = sqsQueueForCompactionJobCreation(coreStacks, topic, errorMetrics); @@ -396,8 +395,8 @@ private void ecsClusterForCompactionTasks( FargateTaskDefinition fargateTaskDefinition = compactionFargateTaskDefinition(); String fargateTaskDefinitionFamily = fargateTaskDefinition.getFamily(); instanceProperties.set(COMPACTION_TASK_FARGATE_DEFINITION_FAMILY, fargateTaskDefinitionFamily); - ContainerDefinitionOptions fargateContainerDefinitionOptions = createFargateContainerDefinition(containerImage, - environmentVariables, instanceProperties); + ContainerDefinitionOptions fargateContainerDefinitionOptions = createFargateContainerDefinition( + coreStacks, containerImage, environmentVariables, instanceProperties); fargateTaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME, fargateContainerDefinitionOptions); grantPermissions.accept(fargateTaskDefinition); @@ -405,8 +404,8 @@ private void ecsClusterForCompactionTasks( Ec2TaskDefinition ec2TaskDefinition = compactionEC2TaskDefinition(); String ec2TaskDefinitionFamily = ec2TaskDefinition.getFamily(); instanceProperties.set(COMPACTION_TASK_EC2_DEFINITION_FAMILY, ec2TaskDefinitionFamily); - ContainerDefinitionOptions ec2ContainerDefinitionOptions = createEC2ContainerDefinition(containerImage, - environmentVariables, instanceProperties); + ContainerDefinitionOptions ec2ContainerDefinitionOptions = createEC2ContainerDefinition( + coreStacks, containerImage, environmentVariables, instanceProperties); ec2TaskDefinition.addContainer(ContainerConstants.COMPACTION_CONTAINER_NAME, ec2ContainerDefinitionOptions); grantPermissions.accept(ec2TaskDefinition); addEC2CapacityProvider(cluster, vpc, coreStacks, taskCreatorJar); @@ -551,7 +550,7 @@ private Ec2TaskDefinition compactionEC2TaskDefinition() { } private ContainerDefinitionOptions createFargateContainerDefinition( - ContainerImage image, Map environment, InstanceProperties instanceProperties) { + CoreStacks coreStacks, ContainerImage image, Map environment, InstanceProperties instanceProperties) { String architecture = instanceProperties.get(COMPACTION_TASK_CPU_ARCHITECTURE).toUpperCase(Locale.ROOT); CompactionTaskRequirements requirements = CompactionTaskRequirements.getArchRequirements(architecture, instanceProperties); return ContainerDefinitionOptions.builder() @@ -559,12 +558,12 @@ private ContainerDefinitionOptions createFargateContainerDefinition( .environment(environment) .cpu(requirements.getCpu()) .memoryLimitMiB(requirements.getMemoryLimitMiB()) - .logging(Utils.createECSContainerLogDriver(this, instanceProperties, "FargateCompactionTasks")) + .logging(Utils.createECSContainerLogDriver(coreStacks, "FargateCompactionTasks")) .build(); } private ContainerDefinitionOptions createEC2ContainerDefinition( - ContainerImage image, Map environment, InstanceProperties instanceProperties) { + CoreStacks coreStacks, ContainerImage image, Map environment, InstanceProperties instanceProperties) { String architecture = instanceProperties.get(COMPACTION_TASK_CPU_ARCHITECTURE).toUpperCase(Locale.ROOT); CompactionTaskRequirements requirements = CompactionTaskRequirements.getArchRequirements(architecture, instanceProperties); return ContainerDefinitionOptions.builder() @@ -575,7 +574,7 @@ private ContainerDefinitionOptions createEC2ContainerDefinition( // container allocation failing when we need almost entire resources // of machine .memoryLimitMiB((int) (requirements.getMemoryLimitMiB() * 0.95)) - .logging(Utils.createECSContainerLogDriver(this, instanceProperties, "EC2CompactionTasks")) + .logging(Utils.createECSContainerLogDriver(coreStacks, "EC2CompactionTasks")) .build(); } @@ -593,7 +592,7 @@ private IFunction lambdaForCustomTerminationPolicy(CoreStacks coreStacks, Lambda .description("Custom termination policy for ECS auto scaling group. Only terminate empty instances.") .environment(environmentVariables) .handler("sleeper.compaction.task.creation.SafeTerminationLambda::handleRequest") - .logGroup(createLambdaLogGroup(this, "CompactionTerminatorLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .memorySize(512) .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11) .timeout(Duration.seconds(10))); @@ -626,7 +625,7 @@ private void lambdaToCreateCompactionTasks( .handler("sleeper.compaction.task.creation.RunCompactionTasksLambda::eventHandler") .environment(Utils.createDefaultEnvironment(instanceProperties)) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, "CompactionTasksCreatorLogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); // Grant this function permission to read from the S3 bucket coreStacks.grantReadInstanceConfig(handler); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/ConfigBucketStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/ConfigBucketStack.java index e1a5a4a558..5633a9f9b5 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/ConfigBucketStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/ConfigBucketStack.java @@ -40,12 +40,13 @@ public class ConfigBucketStack extends NestedStack { private final IBucket configBucket; public ConfigBucketStack( - Construct scope, String id, InstanceProperties instanceProperties, ManagedPoliciesStack policiesStack, BuiltJars jars) { + Construct scope, String id, InstanceProperties instanceProperties, + LoggingStack loggingStack, ManagedPoliciesStack policiesStack, BuiltJars jars) { super(scope, id); - + String bucketName = String.join("-", "sleeper", + Utils.cleanInstanceId(instanceProperties), "config"); configBucket = Bucket.Builder.create(this, "ConfigBucket") - .bucketName(String.join("-", "sleeper", - Utils.cleanInstanceId(instanceProperties), "config")) + .bucketName(bucketName) .versioned(false) .encryption(BucketEncryption.S3_MANAGED) .blockPublicAccess(BlockPublicAccess.BLOCK_ALL) @@ -54,7 +55,7 @@ public ConfigBucketStack( instanceProperties.set(CONFIG_BUCKET, configBucket.getBucketName()); - AutoDeleteS3Objects.autoDeleteForBucket(this, jars, instanceProperties, configBucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, loggingStack, jars, configBucket, bucketName); configBucket.grantRead(policiesStack.getDirectIngestPolicyForGrants()); configBucket.grantRead(policiesStack.getIngestByQueuePolicyForGrants()); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java b/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java index 4a0b552472..54d393731f 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/CoreStacks.java @@ -21,6 +21,7 @@ import software.amazon.awscdk.services.iam.IRole; import software.amazon.awscdk.services.iam.ManagedPolicy; import software.amazon.awscdk.services.lambda.IFunction; +import software.amazon.awscdk.services.logs.ILogGroup; import software.amazon.awscdk.services.sqs.IQueue; import javax.annotation.Nullable; @@ -29,6 +30,7 @@ public class CoreStacks { + private final LoggingStack loggingStack; private final ConfigBucketStack configBucketStack; private final TableIndexStack tableIndexStack; private final ManagedPoliciesStack policiesStack; @@ -38,11 +40,12 @@ public class CoreStacks { private final IngestStatusStoreResources ingestStatusStore; private final CompactionStatusStoreResources compactionStatusStore; - public CoreStacks(ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, + public CoreStacks(LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, ManagedPoliciesStack policiesStack, StateStoreStacks stateStoreStacks, TableDataStack dataStack, StateStoreCommitterStack stateStoreCommitterStack, IngestStatusStoreResources ingestStatusStore, CompactionStatusStoreResources compactionStatusStore) { + this.loggingStack = loggingStack; this.configBucketStack = configBucketStack; this.tableIndexStack = tableIndexStack; this.policiesStack = policiesStack; @@ -53,6 +56,22 @@ public CoreStacks(ConfigBucketStack configBucketStack, TableIndexStack tableInde this.compactionStatusStore = compactionStatusStore; } + public ILogGroup getLogGroupByFunctionName(String functionName) { + return loggingStack.getLogGroupByFunctionName(functionName); + } + + public ILogGroup getProviderLogGroupByFunctionName(String functionName) { + return loggingStack.getProviderLogGroupByFunctionName(functionName); + } + + public ILogGroup getLogGroupByECSLogDriverId(String id) { + return loggingStack.getLogGroupByECSLogDriverId(id); + } + + public ILogGroup getLogGroupByStateMachineId(String id) { + return loggingStack.getLogGroupByStateMachineId(id); + } + public void grantReadInstanceConfig(IGrantable grantee) { configBucketStack.grantRead(grantee); } diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java index 25bdd5cdbf..96e474d9e4 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/GarbageCollectorStack.java @@ -41,7 +41,6 @@ import java.util.List; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_CLOUDWATCH_RULE; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_DLQ_ARN; @@ -92,7 +91,7 @@ public GarbageCollectorStack( .reservedConcurrentExecutions(1) .memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB)) .timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS))) - .logGroup(createLambdaLogGroup(this, "GarbageCollectorTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction handlerFunction = gcJar.buildFunction(this, "GarbageCollectorLambda", builder -> builder .functionName(functionName) .description("Scan the state store looking for files that need deleting and delete them") @@ -102,7 +101,7 @@ public GarbageCollectorStack( .handler("sleeper.garbagecollector.GarbageCollectorLambda::handleRequest") .environment(Utils.createDefaultEnvironment(instanceProperties)) .reservedConcurrentExecutions(instanceProperties.getInt(GARBAGE_COLLECTOR_LAMBDA_CONCURRENCY_RESERVED)) - .logGroup(createLambdaLogGroup(this, "GarbageCollectorLambdaLogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); instanceProperties.set(GARBAGE_COLLECTOR_LAMBDA_FUNCTION, triggerFunction.getFunctionName()); // Grant this function permission delete files from the data bucket and diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java index 03915e2715..96d6565ba2 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestBatcherStack.java @@ -50,7 +50,6 @@ import java.util.Map; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.removalPolicy; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.BatcherProperty.INGEST_BATCHER_JOB_CREATION_LAMBDA_PERIOD_IN_MINUTES; @@ -145,7 +144,7 @@ public IngestBatcherStack( .timeout(Duration.seconds(instanceProperties.getInt(INGEST_BATCHER_SUBMITTER_TIMEOUT_IN_SECONDS))) .handler("sleeper.ingest.batcher.submitter.IngestBatcherSubmitterLambda::handleRequest") .environment(environmentVariables) - .logGroup(createLambdaLogGroup(this, "SubmitToIngestBatcherLogGroup", submitterName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(submitterName)) .events(List.of(new SqsEventSource(submitQueue)))); instanceProperties.set(INGEST_BATCHER_SUBMIT_REQUEST_FUNCTION, submitterLambda.getFunctionName()); @@ -163,7 +162,7 @@ public IngestBatcherStack( .handler("sleeper.ingest.batcher.job.creator.IngestBatcherJobCreatorLambda::eventHandler") .environment(environmentVariables) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, "IngestBatcherJobCreationLogGroup", jobCreatorName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(jobCreatorName))); instanceProperties.set(INGEST_BATCHER_JOB_CREATION_FUNCTION, jobCreatorLambda.getFunctionName()); ingestRequestsTable.grantReadWriteData(jobCreatorLambda); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java index 3c9004f28d..ad15796d94 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/IngestStack.java @@ -58,7 +58,6 @@ import java.util.Objects; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_CLOUDWATCH_RULE; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.INGEST_CLUSTER; @@ -213,7 +212,7 @@ private Cluster ecsClusterForIngestTasks( ContainerDefinitionOptions containerDefinitionOptions = ContainerDefinitionOptions.builder() .image(containerImage) - .logging(Utils.createECSContainerLogDriver(this, instanceProperties, "IngestTasks")) + .logging(Utils.createECSContainerLogDriver(coreStacks, "IngestTasks")) .environment(Utils.createDefaultEnvironment(instanceProperties)) .build(); taskDefinition.addContainer("IngestContainer", containerDefinitionOptions); @@ -257,7 +256,7 @@ private void lambdaToCreateIngestTasks(CoreStacks coreStacks, Queue ingestJobQue .handler("sleeper.ingest.starter.RunIngestTasksLambda::eventHandler") .environment(Utils.createDefaultEnvironment(instanceProperties)) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, "IngestTasksCreatorLogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); // Grant this function permission to read from the S3 bucket coreStacks.grantReadInstanceConfig(handler); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java index eafd31076f..e5ed38b44f 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/KeepLambdaWarmStack.java @@ -37,7 +37,6 @@ import java.util.Collections; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.QUERY_WARM_LAMBDA_CLOUDWATCH_RULE; import static sleeper.core.properties.instance.CommonProperty.ID; @@ -76,7 +75,7 @@ public KeepLambdaWarmStack(Construct scope, .handler("sleeper.query.lambda.WarmQueryExecutorLambda::handleRequest") .environment(Utils.createDefaultEnvironment(instanceProperties)) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, id + "LogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); // Cloudwatch rule to trigger this lambda Rule rule = Rule.Builder diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java new file mode 100644 index 0000000000..d8209f9759 --- /dev/null +++ b/java/cdk/src/main/java/sleeper/cdk/stack/LoggingStack.java @@ -0,0 +1,137 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.cdk.stack; + +import software.amazon.awscdk.NestedStack; +import software.amazon.awscdk.services.logs.ILogGroup; +import software.amazon.awscdk.services.logs.LogGroup; +import software.constructs.Construct; + +import sleeper.cdk.util.Utils; +import sleeper.core.properties.instance.InstanceProperties; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +import static sleeper.core.properties.instance.CommonProperty.LOG_RETENTION_IN_DAYS; + +public class LoggingStack extends NestedStack { + + private final Map logGroupByName = new HashMap<>(); + private final InstanceProperties instanceProperties; + + public LoggingStack(Construct scope, String id, InstanceProperties instanceProperties) { + super(scope, id); + this.instanceProperties = instanceProperties; + + // Accessed directly by getter on this class + createLogGroup("vpc-check"); + createLogGroup("vpc-check-provider"); + createLogGroup("config-autodelete"); + createLogGroup("config-autodelete-provider"); + createLogGroup("table-data-autodelete"); + createLogGroup("table-data-autodelete-provider"); + createLogGroup("statestore-committer"); + + // Accessed via CoreStacks getters + createLogGroup("properties-writer"); + createLogGroup("properties-writer-provider"); + createLogGroup("state-snapshot-creation-trigger"); + createLogGroup("state-snapshot-creation"); + createLogGroup("state-snapshot-deletion-trigger"); + createLogGroup("state-snapshot-deletion"); + createLogGroup("state-transaction-deletion-trigger"); + createLogGroup("state-transaction-deletion"); + createLogGroup("metrics-trigger"); + createLogGroup("metrics-publisher"); + createLogGroup("bulk-import-EMRServerless-start"); + createLogGroup("bulk-import-NonPersistentEMR-start"); + createLogGroup("bulk-import-PersistentEMR-start"); + createLogGroup("bulk-import-eks-starter"); + createStateMachineLogGroup("EksBulkImportStateMachine"); + createLogGroup("bulk-import-autodelete"); + createLogGroup("bulk-import-autodelete-provider"); + createLogGroup("IngestTasks"); + createLogGroup("ingest-create-tasks"); + createLogGroup("ingest-batcher-submit-files"); + createLogGroup("ingest-batcher-create-jobs"); + createLogGroup("partition-splitting-trigger"); + createLogGroup("partition-splitting-find-to-split"); + createLogGroup("partition-splitting-handler"); + createLogGroup("FargateCompactionTasks"); + createLogGroup("EC2CompactionTasks"); + createLogGroup("compaction-job-creation-trigger"); + createLogGroup("compaction-job-creation-handler"); + createLogGroup("compaction-tasks-creator"); + createLogGroup("compaction-custom-termination"); + createLogGroup("garbage-collector-trigger"); + createLogGroup("garbage-collector"); + createLogGroup("query-executor"); + createLogGroup("query-leaf-partition"); + createLogGroup("query-websocket-handler"); + createLogGroup("query-results-autodelete"); + createLogGroup("query-results-autodelete-provider"); + createLogGroup("query-keep-warm"); + createLogGroup("Simple-athena-handler"); + createLogGroup("IteratorApplying-athena-handler"); + createLogGroup("spill-bucket-autodelete"); + createLogGroup("spill-bucket-autodelete-provider"); + } + + public ILogGroup getLogGroupByFunctionName(String functionName) { + return getLogGroupByNameWithPrefixes(functionName); + } + + public ILogGroup getProviderLogGroupByFunctionName(String functionName) { + return getLogGroupByNameWithPrefixes(functionName + "-provider"); + } + + public ILogGroup getLogGroupByECSLogDriverId(String id) { + return getLogGroupByNameWithPrefixes(addNamePrefixes(id)); + } + + public ILogGroup getLogGroupByStateMachineId(String id) { + return getLogGroupByNameWithPrefixes(addStateMachineNamePrefixes(id)); + } + + private ILogGroup getLogGroupByNameWithPrefixes(String nameWithPrefixes) { + return Objects.requireNonNull(logGroupByName.get(nameWithPrefixes), "No log group found: " + nameWithPrefixes); + } + + private void createLogGroup(String shortName) { + createLogGroup(shortName, addNamePrefixes(shortName)); + } + + private void createStateMachineLogGroup(String shortName) { + createLogGroup(shortName, addStateMachineNamePrefixes(shortName)); + } + + private void createLogGroup(String shortName, String nameWithPrefixes) { + logGroupByName.put(nameWithPrefixes, LogGroup.Builder.create(this, shortName) + .logGroupName(nameWithPrefixes) + .retention(Utils.getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))) + .build()); + } + + private String addStateMachineNamePrefixes(String shortName) { + return "/aws/vendedlogs/states/" + addNamePrefixes(shortName); + } + + private String addNamePrefixes(String shortName) { + return String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties), shortName); + } +} diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java index eaa990a76a..03628dc8e5 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/PartitionSplittingStack.java @@ -44,7 +44,6 @@ import java.util.Map; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_DLQ_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_DLQ_URL; @@ -195,7 +194,7 @@ private void createTriggerFunction(InstanceProperties instanceProperties, Lambda .handler("sleeper.splitter.lambda.FindPartitionsToSplitTriggerLambda::handleRequest") .environment(environmentVariables) .reservedConcurrentExecutions(1) - .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); // Cloudwatch rule to trigger this lambda Rule rule = Rule.Builder .create(this, "FindPartitionsToSplitPeriodicTrigger") @@ -225,7 +224,7 @@ private void createFindPartitionsToSplitFunction(InstanceProperties instanceProp .handler("sleeper.splitter.lambda.FindPartitionsToSplitLambda::handleRequest") .environment(environmentVariables) .reservedConcurrentExecutions(instanceProperties.getInt(FIND_PARTITIONS_TO_SPLIT_LAMBDA_CONCURRENCY_RESERVED)) - .logGroup(createLambdaLogGroup(this, "FindPartitionsToSplitLogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); coreStacks.grantReadTablesMetadata(findPartitionsToSplitLambda); partitionSplittingJobQueue.grantSendMessages(findPartitionsToSplitLambda); @@ -251,7 +250,7 @@ private void createSplitPartitionFunction(InstanceProperties instanceProperties, .reservedConcurrentExecutions(concurrency) .handler("sleeper.splitter.lambda.SplitPartitionLambda::handleRequest") .environment(environmentVariables) - .logGroup(createLambdaLogGroup(this, "SplitPartitionLogGroup", splitFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(splitFunctionName))); coreStacks.grantSplitPartitions(splitPartitionLambda); splitPartitionLambda.addEventSource(SqsEventSource.Builder.create(partitionSplittingJobQueue) diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java index b03190b93d..ec4855d26a 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/PropertiesStack.java @@ -32,8 +32,6 @@ import java.util.HashMap; -import static sleeper.cdk.util.Utils.createCustomResourceProviderLogGroup; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.core.properties.instance.CommonProperty.JARS_BUCKET; /** @@ -61,14 +59,14 @@ public PropertiesStack( .memorySize(2048) .environment(Utils.createDefaultEnvironment(instanceProperties)) .description("Lambda for writing instance properties to S3 upon initialisation and teardown") - .logGroup(createLambdaLogGroup(this, "PropertiesWriterLambdaLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .runtime(Runtime.JAVA_11)); coreStacks.grantWriteInstanceConfig(propertiesWriterLambda); Provider propertiesWriterProvider = Provider.Builder.create(this, "PropertiesWriterProvider") .onEventHandler(propertiesWriterLambda) - .logGroup(createCustomResourceProviderLogGroup(this, "PropertiesWriterProviderLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getProviderLogGroupByFunctionName(functionName)) .build(); CustomResource.Builder.create(this, "InstanceProperties") diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java index 943f0a9d40..38f449e1a2 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/QueryStack.java @@ -63,7 +63,6 @@ import java.util.Objects; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.removalPolicy; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.QUERY_TRACKER_TABLE_NAME; import static sleeper.core.properties.instance.CommonProperty.ID; @@ -131,14 +130,16 @@ public QueryStack(Construct scope, * Creates a Lambda Function. * * @param id of the function to be created - * @param queryJar the jar containing the code for the Lambda + * @param coreStacks the core stacks * @param instanceProperties containing configuration details + * @param queryJar the jar containing the code for the Lambda * @param functionName the name of the function * @param handler the path for the method be be used as the entry point for the Lambda * @param description a description for the function * @return an IFunction */ - private IFunction createFunction(String id, LambdaCode queryJar, InstanceProperties instanceProperties, + private IFunction createFunction( + String id, CoreStacks coreStacks, InstanceProperties instanceProperties, LambdaCode queryJar, String functionName, String handler, String description) { return queryJar.buildFunction(this, id, builder -> builder .functionName(functionName) @@ -148,7 +149,7 @@ private IFunction createFunction(String id, LambdaCode queryJar, InstancePropert .timeout(Duration.seconds(instanceProperties.getInt(QUERY_PROCESSOR_LAMBDA_TIMEOUT_IN_SECONDS))) .handler(handler) .environment(Utils.createDefaultEnvironment(instanceProperties)) - .logGroup(createLambdaLogGroup(this, id + "LogGroup", functionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(functionName))); } /*** @@ -165,7 +166,7 @@ private IFunction setupQueryExecutorLambda(CoreStacks coreStacks, QueryQueueStac IBucket jarsBucket, ITable queryTrackingTable) { String functionName = String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties), "query-executor"); - IFunction lambda = createFunction("QueryExecutorLambda", queryJar, instanceProperties, functionName, + IFunction lambda = createFunction("QueryExecutorLambda", coreStacks, instanceProperties, queryJar, functionName, "sleeper.query.lambda.SqsQueryProcessorLambda::handleRequest", "When a query arrives on the query SQS queue, this lambda is invoked to look for leaf partition queries"); @@ -212,10 +213,10 @@ private IFunction setupLeafPartitionQueryQueueAndLambda( IBucket jarsBucket, ITable queryTrackingTable, List errorMetrics) { Queue leafPartitionQueryQueue = setupLeafPartitionQueryQueue(instanceProperties, topic, errorMetrics); Queue queryResultsQueue = setupResultsQueue(instanceProperties); - IBucket queryResultsBucket = setupResultsBucket(instanceProperties, customResourcesJar); + IBucket queryResultsBucket = setupResultsBucket(instanceProperties, coreStacks, customResourcesJar); String leafQueryFunctionName = String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties), "query-leaf-partition"); - IFunction lambda = createFunction("QueryLeafPartitionExecutorLambda", queryJar, instanceProperties, leafQueryFunctionName, + IFunction lambda = createFunction("QueryLeafPartitionExecutorLambda", coreStacks, instanceProperties, queryJar, leafQueryFunctionName, "sleeper.query.lambda.SqsLeafPartitionQueryLambda::handleRequest", "When a query arrives on the query SQS queue, this lambda is invoked to execute the query"); @@ -346,12 +347,13 @@ private Queue setupResultsQueue(InstanceProperties instanceProperties) { * @param customResourcesJar the jar for deploying custom CDK resources * @return the bucket created */ - private IBucket setupResultsBucket(InstanceProperties instanceProperties, LambdaCode customResourcesJar) { + private IBucket setupResultsBucket(InstanceProperties instanceProperties, CoreStacks coreStacks, LambdaCode customResourcesJar) { RemovalPolicy removalPolicy = removalPolicy(instanceProperties); + String bucketName = String.join("-", "sleeper", + Utils.cleanInstanceId(instanceProperties), "query-results"); Bucket resultsBucket = Bucket.Builder .create(this, "QueryResultsBucket") - .bucketName(String.join("-", "sleeper", - Utils.cleanInstanceId(instanceProperties), "query-results")) + .bucketName(bucketName) .versioned(false) .blockPublicAccess(BlockPublicAccess.BLOCK_ALL) .encryption(BucketEncryption.S3_MANAGED) @@ -362,7 +364,7 @@ private IBucket setupResultsBucket(InstanceProperties instanceProperties, Lambda instanceProperties.set(CdkDefinedInstanceProperty.QUERY_RESULTS_BUCKET, resultsBucket.getBucketName()); if (removalPolicy == RemovalPolicy.DESTROY) { - AutoDeleteS3Objects.autoDeleteForBucket(this, customResourcesJar, instanceProperties, resultsBucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, coreStacks, customResourcesJar, resultsBucket, bucketName); } return resultsBucket; diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java index 7b5db6e584..d6b5285da4 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java @@ -23,7 +23,7 @@ import software.amazon.awscdk.services.iam.PolicyStatement; import software.amazon.awscdk.services.lambda.IFunction; import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource; -import software.amazon.awscdk.services.logs.LogGroup; +import software.amazon.awscdk.services.logs.ILogGroup; import software.amazon.awscdk.services.s3.Bucket; import software.amazon.awscdk.services.s3.IBucket; import software.amazon.awscdk.services.sns.Topic; @@ -43,7 +43,6 @@ import java.util.Map; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_EVENT_SOURCE_ID; @@ -61,11 +60,13 @@ public class StateStoreCommitterStack extends NestedStack { private final InstanceProperties instanceProperties; private final Queue commitQueue; + @SuppressWarnings("checkstyle:ParameterNumberCheck") public StateStoreCommitterStack( Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars, + LoggingStack loggingStack, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks, @@ -80,7 +81,8 @@ public StateStoreCommitterStack( LambdaCode committerJar = jars.lambdaCode(BuiltJar.STATESTORE, jarsBucket); commitQueue = sqsQueueForStateStoreCommitter(policiesStack, topic, errorMetrics); - lambdaToCommitStateStoreUpdates(policiesStack, committerJar, + lambdaToCommitStateStoreUpdates( + loggingStack, policiesStack, committerJar, configBucketStack, tableIndexStack, stateStoreStacks, compactionStatusStore, ingestStatusStore); } @@ -119,7 +121,7 @@ private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack, } private void lambdaToCommitStateStoreUpdates( - ManagedPoliciesStack policiesStack, LambdaCode committerJar, + LoggingStack loggingStack, ManagedPoliciesStack policiesStack, LambdaCode committerJar, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks, CompactionStatusStoreResources compactionStatusStore, IngestStatusStoreResources ingestStatusStore) { @@ -127,7 +129,7 @@ private void lambdaToCommitStateStoreUpdates( String functionName = String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties), "statestore-committer"); - LogGroup logGroup = createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties); + ILogGroup logGroup = loggingStack.getLogGroupByFunctionName(functionName); instanceProperties.set(STATESTORE_COMMITTER_LOG_GROUP, logGroup.getLogGroupName()); IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TableDataStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TableDataStack.java index 4d34c04462..c6219f6a1f 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/TableDataStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/TableDataStack.java @@ -38,15 +38,17 @@ public class TableDataStack extends NestedStack { private final IBucket dataBucket; public TableDataStack( - Construct scope, String id, InstanceProperties instanceProperties, ManagedPoliciesStack policiesStack, BuiltJars jars) { + Construct scope, String id, InstanceProperties instanceProperties, + LoggingStack loggingStack, ManagedPoliciesStack policiesStack, BuiltJars jars) { super(scope, id); RemovalPolicy removalPolicy = removalPolicy(instanceProperties); + String bucketName = String.join("-", "sleeper", + Utils.cleanInstanceId(instanceProperties), "table-data"); dataBucket = Bucket.Builder .create(this, "TableDataBucket") - .bucketName(String.join("-", "sleeper", - Utils.cleanInstanceId(instanceProperties), "table-data")) + .bucketName(bucketName) .versioned(false) .blockPublicAccess(BlockPublicAccess.BLOCK_ALL) .encryption(BucketEncryption.S3_MANAGED) @@ -54,7 +56,7 @@ public TableDataStack( .build(); if (removalPolicy == RemovalPolicy.DESTROY) { - AutoDeleteS3Objects.autoDeleteForBucket(this, jars, instanceProperties, dataBucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, loggingStack, jars, dataBucket, bucketName); } instanceProperties.set(DATA_BUCKET, dataBucket.getBucketName()); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java index 84e78f1053..6db39354a8 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/TableMetricsStack.java @@ -45,7 +45,6 @@ import java.util.List; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_DLQ_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_DLQ_URL; @@ -80,7 +79,7 @@ public TableMetricsStack( .reservedConcurrentExecutions(1) .memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB)) .timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS))) - .logGroup(createLambdaLogGroup(this, "MetricsTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction tableMetricsPublisher = metricsJar.buildFunction(this, "MetricsPublisher", builder -> builder .functionName(publishFunctionName) .description("Generates metrics for a Sleeper table based on info in its state store, and publishes them to CloudWatch") @@ -90,7 +89,7 @@ public TableMetricsStack( .reservedConcurrentExecutions(instanceProperties.getInt(METRICS_LAMBDA_CONCURRENCY_RESERVED)) .memorySize(1024) .timeout(Duration.minutes(1)) - .logGroup(createLambdaLogGroup(this, "MetricsPublisherLogGroup", publishFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(publishFunctionName))); instanceProperties.set(TABLE_METRICS_LAMBDA_FUNCTION, tableMetricsTrigger.getFunctionName()); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java index add6925423..3fa0e2b305 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogSnapshotStack.java @@ -41,7 +41,6 @@ import java.util.List; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_DLQ_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_SNAPSHOT_CREATION_DLQ_URL; @@ -94,7 +93,7 @@ private void createSnapshotCreationLambda(InstanceProperties instanceProperties, .reservedConcurrentExecutions(1) .memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB)) .timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS))) - .logGroup(createLambdaLogGroup(this, "TransactionLogSnapshotCreationTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction snapshotCreationLambda = statestoreJar.buildFunction(this, "TransactionLogSnapshotCreation", builder -> builder .functionName(creationFunctionName) .description("Creates transaction log snapshots for tables") @@ -104,7 +103,7 @@ private void createSnapshotCreationLambda(InstanceProperties instanceProperties, .reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_SNAPSHOT_CREATION_LAMBDA_CONCURRENCY_RESERVED)) .memorySize(1024) .timeout(Duration.minutes(1)) - .logGroup(createLambdaLogGroup(this, "TransactionLogSnapshotCreationLogGroup", creationFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(creationFunctionName))); Rule rule = Rule.Builder.create(this, "TransactionLogSnapshotCreationSchedule") .ruleName(SleeperScheduleRule.TRANSACTION_LOG_SNAPSHOT_CREATION.buildRuleName(instanceProperties)) @@ -164,7 +163,7 @@ private void createSnapshotDeletionLambda(InstanceProperties instanceProperties, .reservedConcurrentExecutions(1) .memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB)) .timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS))) - .logGroup(createLambdaLogGroup(this, "TransactionLogSnapshotDeletionTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction snapshotDeletionLambda = statestoreJar.buildFunction(this, "TransactionLogSnapshotDeletion", builder -> builder .functionName(deletionFunctionName) .description("Deletes old transaction log snapshots for tables") @@ -174,7 +173,7 @@ private void createSnapshotDeletionLambda(InstanceProperties instanceProperties, .reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_SNAPSHOT_DELETION_LAMBDA_CONCURRENCY_RESERVED)) .memorySize(1024) .timeout(Duration.minutes(1)) - .logGroup(createLambdaLogGroup(this, "TransactionLogSnapshotDeletionLogGroup", deletionFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(deletionFunctionName))); Rule rule = Rule.Builder.create(this, "TransactionLogSnapshotDeletionSchedule") .ruleName(SleeperScheduleRule.TRANSACTION_LOG_SNAPSHOT_DELETION.buildRuleName(instanceProperties)) diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java index cd696e33ee..8be9944d68 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/TransactionLogTransactionStack.java @@ -41,7 +41,6 @@ import java.util.List; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.shouldDeployPaused; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_TRANSACTION_DELETION_DLQ_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.TRANSACTION_LOG_TRANSACTION_DELETION_DLQ_URL; @@ -84,7 +83,7 @@ private void createTransactionDeletionLambda(InstanceProperties instanceProperti .reservedConcurrentExecutions(1) .memorySize(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_MEMORY_IN_MB)) .timeout(Duration.seconds(instanceProperties.getInt(TABLE_BATCHING_LAMBDAS_TIMEOUT_IN_SECONDS))) - .logGroup(createLambdaLogGroup(this, "TransactionLogTransactionDeletionTriggerLogGroup", triggerFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(triggerFunctionName))); IFunction transactionDeletionLambda = statestoreJar.buildFunction(this, "TransactionLogTransactionDeletion", builder -> builder .functionName(deletionFunctionName) .description("Deletes old transaction log transactions for tables") @@ -94,7 +93,7 @@ private void createTransactionDeletionLambda(InstanceProperties instanceProperti .reservedConcurrentExecutions(instanceProperties.getInt(TRANSACTION_LOG_TRANSACTION_DELETION_LAMBDA_CONCURRENCY_RESERVED)) .memorySize(1024) .timeout(Duration.minutes(1)) - .logGroup(createLambdaLogGroup(this, "TransactionLogTransactionDeletionLogGroup", deletionFunctionName, instanceProperties))); + .logGroup(coreStacks.getLogGroupByFunctionName(deletionFunctionName))); Rule rule = Rule.Builder.create(this, "TransactionLogTransactionDeletionSchedule") .ruleName(SleeperScheduleRule.TRANSACTION_LOG_TRANSACTION_DELETION.buildRuleName(instanceProperties)) diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java index 19fd2e4fa9..f7dc0f66fe 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java @@ -41,8 +41,6 @@ import java.util.HashMap; import java.util.Map; -import static sleeper.cdk.util.Utils.createCustomResourceProviderLogGroup; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.core.properties.instance.CommonProperty.REGION; import static sleeper.core.properties.instance.CommonProperty.VPC_ENDPOINT_CHECK; import static sleeper.core.properties.instance.CommonProperty.VPC_ID; @@ -50,7 +48,7 @@ public class VpcStack extends NestedStack { private static final Logger LOGGER = LoggerFactory.getLogger(VpcStack.class); - public VpcStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars) { + public VpcStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars, LoggingStack logging) { super(scope, id); if (!instanceProperties.getBoolean(VPC_ENDPOINT_CHECK)) { @@ -71,7 +69,7 @@ public VpcStack(Construct scope, String id, InstanceProperties instancePropertie .handler("sleeper.cdk.custom.VpcCheckLambda::handleEvent") .memorySize(2048) .description("Lambda for checking the VPC has an associated S3 endpoint") - .logGroup(createLambdaLogGroup(this, "VpcCheckLambdaLogGroup", functionName, instanceProperties)) + .logGroup(logging.getLogGroupByFunctionName(functionName)) .runtime(Runtime.JAVA_11)); vpcCheckLambda.addToRolePolicy(new PolicyStatement(new PolicyStatementProps.Builder() @@ -84,7 +82,7 @@ public VpcStack(Construct scope, String id, InstanceProperties instancePropertie Provider provider = new Provider(this, "VpcCustomResourceProvider", ProviderProps.builder() .onEventHandler(vpcCheckLambda) - .logGroup(createCustomResourceProviderLogGroup(this, "VpcCustomResourceProviderLogGroup", functionName, instanceProperties)) + .logGroup(logging.getProviderLogGroupByFunctionName(functionName)) .build()); // Custom resource to check whether VPC is valid diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java index 4d078d9576..85c2020c59 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/WebSocketQueryStack.java @@ -48,8 +48,6 @@ import java.util.Collections; import java.util.Map; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; - public final class WebSocketQueryStack extends NestedStack { private CfnApi webSocketApi; @@ -86,7 +84,7 @@ protected void setupWebSocketApi(InstanceProperties instanceProperties, LambdaCo .handler("sleeper.query.lambda.WebSocketQueryProcessorLambda::handleRequest") .environment(env) .memorySize(256) - .logGroup(createLambdaLogGroup(this, "WebSocketApiHandlerLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .timeout(Duration.seconds(29)) .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11)); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/BulkImportBucketStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/BulkImportBucketStack.java index a090314a4a..f3e4233d51 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/BulkImportBucketStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/BulkImportBucketStack.java @@ -36,9 +36,10 @@ public class BulkImportBucketStack extends NestedStack { public BulkImportBucketStack(Construct scope, String id, InstanceProperties instanceProperties, CoreStacks coreStacks, BuiltJars jars) { super(scope, id); + String bucketName = String.join("-", "sleeper", + Utils.cleanInstanceId(instanceProperties), "bulk-import"); importBucket = Bucket.Builder.create(this, "BulkImportBucket") - .bucketName(String.join("-", "sleeper", - Utils.cleanInstanceId(instanceProperties), "bulk-import")) + .bucketName(bucketName) .blockPublicAccess(BlockPublicAccess.BLOCK_ALL) .versioned(false) .removalPolicy(RemovalPolicy.DESTROY) @@ -46,7 +47,7 @@ public BulkImportBucketStack(Construct scope, String id, InstanceProperties inst .build(); importBucket.grantWrite(coreStacks.getIngestByQueuePolicyForGrants()); instanceProperties.set(BULK_IMPORT_BUCKET, importBucket.getBucketName()); - AutoDeleteS3Objects.autoDeleteForBucket(this, jars, instanceProperties, importBucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, coreStacks, jars, importBucket, bucketName); } public IBucket getImportBucket() { diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java index 43ac491e9f..18b72c9511 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/CommonEmrBulkImportHelper.java @@ -43,7 +43,6 @@ import java.util.stream.Collectors; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.core.properties.instance.CommonProperty.JARS_BUCKET; public class CommonEmrBulkImportHelper { @@ -127,7 +126,7 @@ public IFunction createJobStarterFunction( .environment(env) .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11) .handler("sleeper.bulkimport.starter.BulkImportStarterLambda") - .logGroup(createLambdaLogGroup(scope, "BulkImport" + platform + "JobStarterLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .events(Lists.newArrayList(SqsEventSource.Builder.create(jobQueue).batchSize(1).build()))); coreStacks.grantValidateBulkImport(function.getRole()); diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java index 2895327f6a..05226062f3 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/bulkimport/EksBulkImportStack.java @@ -77,7 +77,6 @@ import java.util.function.Function; import static sleeper.cdk.util.Utils.createAlarmForDlq; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; import static sleeper.cdk.util.Utils.createStateMachineLogOptions; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_EKS_JOB_QUEUE_ARN; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.BULK_IMPORT_EKS_JOB_QUEUE_URL; @@ -146,7 +145,7 @@ public EksBulkImportStack( .environment(env) .runtime(software.amazon.awscdk.services.lambda.Runtime.JAVA_11) .handler("sleeper.bulkimport.starter.BulkImportStarterLambda") - .logGroup(createLambdaLogGroup(this, "BulkImportEKSJobStarterLogGroup", functionName, instanceProperties)) + .logGroup(coreStacks.getLogGroupByFunctionName(functionName)) .events(Lists.newArrayList(SqsEventSource.Builder.create(bulkImportJobQueue).batchSize(1).build()))); configureJobStarterFunction(bulkImportJobStarter); @@ -198,7 +197,7 @@ public EksBulkImportStack( .forEach(sa -> sa.getNode().addDependency(namespace)); coreStacks.grantIngest(sparkServiceAccount.getRole()); - StateMachine stateMachine = createStateMachine(bulkImportCluster, instanceProperties, errorsTopic); + StateMachine stateMachine = createStateMachine(bulkImportCluster, instanceProperties, coreStacks, errorsTopic); instanceProperties.set(CdkDefinedInstanceProperty.BULK_IMPORT_EKS_STATE_MACHINE_ARN, stateMachine.getStateMachineArn()); bulkImportCluster.getAwsAuth().addRoleMapping(stateMachine.getRole(), AwsAuthMapping.builder() @@ -223,7 +222,7 @@ private static void configureJobStarterFunction(IFunction bulkImportJobStarter) .build()); } - private StateMachine createStateMachine(Cluster cluster, InstanceProperties instanceProperties, Topic errorsTopic) { + private StateMachine createStateMachine(Cluster cluster, InstanceProperties instanceProperties, CoreStacks coreStacks, Topic errorsTopic) { String imageName = instanceProperties.get(ACCOUNT) + ".dkr.ecr." + instanceProperties.get(REGION) + @@ -268,7 +267,7 @@ private StateMachine createStateMachine(Cluster cluster, InstanceProperties inst .stateJson(deleteJobState).build())) .otherwise(createErrorMessage.next(publishError).next(Fail.Builder .create(this, "FailedJobState").cause("Spark job failed").build()))))) - .logs(createStateMachineLogOptions(this, "EksBulkImportStateMachineLogs", instanceProperties)) + .logs(createStateMachineLogOptions(coreStacks, "EksBulkImportStateMachine")) .build(); } diff --git a/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java b/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java index 79869e5f6f..38266020f5 100644 --- a/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java +++ b/java/cdk/src/main/java/sleeper/cdk/util/AutoDeleteS3Objects.java @@ -20,6 +20,7 @@ import software.amazon.awscdk.customresources.Provider; import software.amazon.awscdk.services.lambda.IFunction; import software.amazon.awscdk.services.lambda.Runtime; +import software.amazon.awscdk.services.logs.ILogGroup; import software.amazon.awscdk.services.s3.Bucket; import software.amazon.awscdk.services.s3.IBucket; import software.constructs.Construct; @@ -27,28 +28,59 @@ import sleeper.cdk.jars.BuiltJar; import sleeper.cdk.jars.BuiltJars; import sleeper.cdk.jars.LambdaCode; +import sleeper.cdk.stack.CoreStacks; +import sleeper.cdk.stack.LoggingStack; import sleeper.core.properties.instance.InstanceProperties; import java.util.Map; - -import static sleeper.cdk.util.Utils.createCustomResourceProviderLogGroup; -import static sleeper.cdk.util.Utils.createLambdaLogGroup; +import java.util.function.Function; public class AutoDeleteS3Objects { private AutoDeleteS3Objects() { } - public static void autoDeleteForBucket(Construct scope, BuiltJars jars, InstanceProperties instanceProperties, IBucket bucket) { + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, CoreStacks coreStacks, BuiltJars jars, + IBucket bucket, String bucketName) { + autoDeleteForBucket(scope, instanceProperties, jars, bucket, bucketName, coreStacks::getLogGroupByFunctionName, coreStacks::getProviderLogGroupByFunctionName); + } + + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, LoggingStack logging, BuiltJars jars, + IBucket bucket, String bucketName) { + autoDeleteForBucket(scope, instanceProperties, jars, bucket, bucketName, logging::getLogGroupByFunctionName, logging::getProviderLogGroupByFunctionName); + } + + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, LoggingStack logging, LambdaCode customResourcesJar, + IBucket bucket, String bucketName) { + autoDeleteForBucket(scope, instanceProperties, customResourcesJar, bucket, bucketName, logging::getLogGroupByFunctionName, logging::getProviderLogGroupByFunctionName); + } + + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, CoreStacks coreStacks, LambdaCode customResourcesJar, + IBucket bucket, String bucketName) { + autoDeleteForBucket(scope, instanceProperties, customResourcesJar, bucket, bucketName, coreStacks::getLogGroupByFunctionName, coreStacks::getProviderLogGroupByFunctionName); + } + + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, BuiltJars jars, IBucket bucket, String bucketName, + Function getLogGroupByFunctionName, + Function getProviderLogGroupByFunctionName) { IBucket jarsBucket = Bucket.fromBucketName(scope, "JarsBucket", jars.bucketName()); LambdaCode jar = jars.lambdaCode(BuiltJar.CUSTOM_RESOURCES, jarsBucket); - autoDeleteForBucket(scope, jar, instanceProperties, bucket); + autoDeleteForBucket(scope, instanceProperties, jar, bucket, bucketName, getLogGroupByFunctionName, getProviderLogGroupByFunctionName); } - public static void autoDeleteForBucket(Construct scope, LambdaCode customResourcesJar, InstanceProperties instanceProperties, IBucket bucket) { + public static void autoDeleteForBucket( + Construct scope, InstanceProperties instanceProperties, LambdaCode customResourcesJar, + IBucket bucket, String bucketName, + Function getLogGroupByFunctionName, + Function getProviderLogGroupByFunctionName) { String id = bucket.getNode().getId() + "-AutoDelete"; - String functionName = bucket.getBucketName() + "-autodelete"; + String functionName = bucketName + "-autodelete"; IFunction lambda = customResourcesJar.buildFunction(scope, id + "Lambda", builder -> builder .functionName(functionName) @@ -56,7 +88,7 @@ public static void autoDeleteForBucket(Construct scope, LambdaCode customResourc .memorySize(2048) .environment(Utils.createDefaultEnvironmentNoConfigBucket(instanceProperties)) .description("Lambda for auto-deleting S3 objects") - .logGroup(createLambdaLogGroup(scope, id + "LambdaLogGroup", functionName, instanceProperties)) + .logGroup(getLogGroupByFunctionName.apply(functionName)) .runtime(Runtime.JAVA_11) .timeout(Duration.minutes(10))); @@ -65,12 +97,12 @@ public static void autoDeleteForBucket(Construct scope, LambdaCode customResourc Provider propertiesWriterProvider = Provider.Builder.create(scope, id + "Provider") .onEventHandler(lambda) - .logGroup(createCustomResourceProviderLogGroup(scope, id + "ProviderLogGroup", functionName, instanceProperties)) + .logGroup(getProviderLogGroupByFunctionName.apply(functionName)) .build(); CustomResource.Builder.create(scope, id) .resourceType("Custom::AutoDeleteS3Objects") - .properties(Map.of("bucket", bucket.getBucketName())) + .properties(Map.of("bucket", bucketName)) .serviceToken(propertiesWriterProvider.getServiceToken()) .build(); } diff --git a/java/cdk/src/main/java/sleeper/cdk/util/Utils.java b/java/cdk/src/main/java/sleeper/cdk/util/Utils.java index fe59031d45..8575c903c0 100644 --- a/java/cdk/src/main/java/sleeper/cdk/util/Utils.java +++ b/java/cdk/src/main/java/sleeper/cdk/util/Utils.java @@ -32,7 +32,7 @@ import software.amazon.awscdk.services.iam.ManagedPolicy; import software.amazon.awscdk.services.iam.PolicyStatement; import software.amazon.awscdk.services.lambda.IFunction; -import software.amazon.awscdk.services.logs.LogGroup; +import software.amazon.awscdk.services.logs.ILogGroup; import software.amazon.awscdk.services.logs.RetentionDays; import software.amazon.awscdk.services.sns.Topic; import software.amazon.awscdk.services.sqs.Queue; @@ -42,6 +42,7 @@ import software.amazon.awssdk.services.s3.S3Client; import software.constructs.Construct; +import sleeper.cdk.stack.CoreStacks; import sleeper.core.SleeperVersion; import sleeper.core.properties.instance.CdkDefinedInstanceProperty; import sleeper.core.properties.instance.InstanceProperties; @@ -63,7 +64,6 @@ import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.VERSION; import static sleeper.core.properties.instance.CommonProperty.ID; -import static sleeper.core.properties.instance.CommonProperty.LOG_RETENTION_IN_DAYS; import static sleeper.core.properties.instance.CommonProperty.RETAIN_INFRA_AFTER_DESTROY; import static sleeper.core.properties.instance.CommonProperty.STACK_TAG_NAME; import static sleeper.core.properties.instance.DashboardProperty.DASHBOARD_TIME_WINDOW_MINUTES; @@ -125,67 +125,31 @@ private static String createToolOptions(InstanceProperties instanceProperties) { * @return the cleaned up instance ID */ public static String cleanInstanceId(InstanceProperties properties) { - return properties.get(ID) - .toLowerCase(Locale.ROOT) - .replace(".", "-"); - } - - /** - * Configures a log group with the specified number of days. Valid values are taken from - * here. - * A value of -1 represents an infinite number of days. - * - * @param numberOfDays number of days you want to retain the logs - * @return The RetentionDays equivalent - */ - public static LogGroup createLogGroupWithRetentionDays(Construct scope, String id, int numberOfDays) { - return LogGroup.Builder.create(scope, id) - .retention(getRetentionDays(numberOfDays)) - .build(); + return cleanInstanceId(properties.get(ID)); } - public static LogGroup createLambdaLogGroup( - Construct scope, String id, String functionName, InstanceProperties instanceProperties) { - return LogGroup.Builder.create(scope, id) - .logGroupName(functionName) - .retention(getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))) - .build(); - } - - public static LogGroup createCustomResourceProviderLogGroup( - Construct scope, String id, String functionName, InstanceProperties instanceProperties) { - return LogGroup.Builder.create(scope, id) - .logGroupName(functionName + "-provider") - .retention(getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))) - .build(); + public static String cleanInstanceId(String instanceId) { + return instanceId.toLowerCase(Locale.ROOT) + .replace(".", "-"); } - public static LogDriver createECSContainerLogDriver(Construct scope, InstanceProperties instanceProperties, String id) { - String logGroupName = String.join("-", "sleeper", cleanInstanceId(instanceProperties), id); - AwsLogDriverProps logDriverProps = AwsLogDriverProps.builder() - .streamPrefix(logGroupName) - .logGroup(LogGroup.Builder.create(scope, id) - .logGroupName(logGroupName) - .retention(getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))) - .build()) - .build(); - return LogDriver.awsLogs(logDriverProps); + public static LogDriver createECSContainerLogDriver(CoreStacks coreStacks, String id) { + ILogGroup logGroup = coreStacks.getLogGroupByECSLogDriverId(id); + return LogDriver.awsLogs(AwsLogDriverProps.builder() + .streamPrefix(logGroup.getLogGroupName()) + .logGroup(logGroup) + .build()); } - public static LogOptions createStateMachineLogOptions(Construct scope, String id, InstanceProperties instanceProperties) { - String logGroupName = String.join("-", "sleeper", cleanInstanceId(instanceProperties), id); + public static LogOptions createStateMachineLogOptions(CoreStacks coreStacks, String id) { return LogOptions.builder() - .destination(LogGroup.Builder.create(scope, id) - .logGroupName(logGroupName) - .retention(getRetentionDays(instanceProperties.getInt(LOG_RETENTION_IN_DAYS))) - .build()) + .destination(coreStacks.getLogGroupByStateMachineId(id)) .level(LogLevel.ALL) .includeExecutionData(true) .build(); } - private static RetentionDays getRetentionDays(int numberOfDays) { + public static RetentionDays getRetentionDays(int numberOfDays) { switch (numberOfDays) { case -1: return RetentionDays.INFINITE; diff --git a/java/core/src/main/java/sleeper/core/properties/validation/OptionalStack.java b/java/core/src/main/java/sleeper/core/properties/validation/OptionalStack.java index 61d730457a..06fb32548d 100644 --- a/java/core/src/main/java/sleeper/core/properties/validation/OptionalStack.java +++ b/java/core/src/main/java/sleeper/core/properties/validation/OptionalStack.java @@ -18,8 +18,8 @@ import org.apache.commons.lang3.EnumUtils; import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.Stream; + +import static java.util.stream.Collectors.joining; /** * Valid values for optional deployment stacks. Determines which components of Sleeper will be deployed. @@ -93,6 +93,11 @@ public enum OptionalStack { PartitionSplittingStack, QueryStack); + public static final List DEFAULT_STACKS = List.of( + IngestStack, IngestBatcherStack, EmrServerlessBulkImportStack, EmrStudioStack, + QueryStack, AthenaStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack, + DashboardStack, TableMetricsStack); + /** * Checks if the value is a valid optional deployment stack. * @@ -104,10 +109,24 @@ public static boolean isValid(String value) { .allMatch(item -> EnumUtils.isValidEnumIgnoreCase(OptionalStack.class, item)); } + /** + * Returns the default value for the property to set optional stacks for an instance. This value is a + * comma-separated string. + * + * @return the default value + */ public static String getDefaultValue() { - return Stream.of(CompactionStack, GarbageCollectorStack, IngestStack, IngestBatcherStack, PartitionSplittingStack, - QueryStack, AthenaStack, EmrServerlessBulkImportStack, EmrStudioStack, DashboardStack, TableMetricsStack) - .map(a -> a.toString()) - .collect(Collectors.joining(",")); + return DEFAULT_STACKS.stream() + .map(OptionalStack::toString) + .collect(joining(",")); + } + + /** + * Returns a list of all optional stacks. + * + * @return all optional stacks + */ + public static List all() { + return List.of(values()); } } diff --git a/java/core/src/test/java/sleeper/core/properties/validation/OptionalStackTest.java b/java/core/src/test/java/sleeper/core/properties/validation/OptionalStackTest.java index c2c9002292..5957147bc9 100644 --- a/java/core/src/test/java/sleeper/core/properties/validation/OptionalStackTest.java +++ b/java/core/src/test/java/sleeper/core/properties/validation/OptionalStackTest.java @@ -39,9 +39,9 @@ public class OptionalStackTest { void shouldGenerateListOfDefaultValueForOptionalStack() { InstanceProperties properties = new InstanceProperties(); assertThat(properties.get(OPTIONAL_STACKS)) - .isEqualTo("CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack," + - "PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack," + - "EmrStudioStack,DashboardStack,TableMetricsStack"); + .isEqualTo("IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack," + + "QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack," + + "DashboardStack,TableMetricsStack"); } @Test diff --git a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestBucketStack.java b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestBucketStack.java index 09789afb1e..2e5feb1c95 100644 --- a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestBucketStack.java +++ b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestBucketStack.java @@ -19,6 +19,7 @@ import software.amazon.awscdk.NestedStack; import software.amazon.awscdk.RemovalPolicy; import software.amazon.awscdk.Tags; +import software.amazon.awscdk.services.logs.LogGroup; import software.amazon.awscdk.services.s3.BlockPublicAccess; import software.amazon.awscdk.services.s3.Bucket; import software.amazon.awscdk.services.s3.BucketEncryption; @@ -30,6 +31,7 @@ import sleeper.cdk.util.Utils; import sleeper.core.properties.instance.InstanceProperties; import sleeper.systemtest.configuration.SystemTestProperties; +import sleeper.systemtest.configuration.SystemTestPropertyValues; import sleeper.systemtest.configuration.SystemTestStandaloneProperties; import java.util.List; @@ -39,6 +41,7 @@ import static sleeper.core.properties.instance.IngestProperty.INGEST_SOURCE_BUCKET; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_BUCKET_NAME; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_ID; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_LOG_RETENTION_DAYS; public class SystemTestBucketStack extends NestedStack { @@ -48,7 +51,7 @@ public SystemTestBucketStack(Construct scope, String id, SystemTestStandalonePro super(scope, id); String bucketName = SystemTestStandaloneProperties.buildSystemTestBucketName(properties.get(SYSTEM_TEST_ID)); properties.set(SYSTEM_TEST_BUCKET_NAME, bucketName); - bucket = createBucket("SystemTestBucket", bucketName, properties.toInstancePropertiesForCdkUtils(), jars); + bucket = createBucket("SystemTestBucket", bucketName, properties, properties.toInstancePropertiesForCdkUtils(), jars); Tags.of(this).add("DeploymentStack", id); } @@ -58,11 +61,11 @@ public SystemTestBucketStack(Construct scope, String id, SystemTestProperties pr "system", "test", "ingest").toLowerCase(Locale.ROOT); properties.set(SYSTEM_TEST_BUCKET_NAME, bucketName); properties.addToListIfMissing(INGEST_SOURCE_BUCKET, List.of(bucketName)); - bucket = createBucket("SystemTestIngestBucket", bucketName, properties, jars); + bucket = createBucket("SystemTestIngestBucket", bucketName, properties.testPropertiesOnly(), properties, jars); Utils.addStackTagIfSet(this, properties); } - private IBucket createBucket(String id, String bucketName, InstanceProperties instanceProperties, BuiltJars jars) { + private IBucket createBucket(String id, String bucketName, SystemTestPropertyValues properties, InstanceProperties instanceProperties, BuiltJars jars) { IBucket bucket = Bucket.Builder.create(this, id) .bucketName(bucketName) .versioned(false) @@ -70,7 +73,15 @@ private IBucket createBucket(String id, String bucketName, InstanceProperties in .blockPublicAccess(BlockPublicAccess.BLOCK_ALL) .removalPolicy(RemovalPolicy.DESTROY) .build(); - AutoDeleteS3Objects.autoDeleteForBucket(this, jars, instanceProperties, bucket); + AutoDeleteS3Objects.autoDeleteForBucket(this, instanceProperties, jars, bucket, bucketName, + functionName -> LogGroup.Builder.create(this, id + "-AutoDeleteLambdaLogGroup") + .logGroupName(functionName) + .retention(Utils.getRetentionDays(properties.getInt(SYSTEM_TEST_LOG_RETENTION_DAYS))) + .build(), + functionName -> LogGroup.Builder.create(this, id + "-AutoDeleteProviderLogGroup") + .logGroupName(functionName + "-provider") + .retention(Utils.getRetentionDays(properties.getInt(SYSTEM_TEST_LOG_RETENTION_DAYS))) + .build()); return bucket; } diff --git a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestClusterStack.java b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestClusterStack.java index 6404bbdb56..6bfb2d3085 100644 --- a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestClusterStack.java +++ b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestClusterStack.java @@ -25,13 +25,16 @@ import software.amazon.awscdk.services.ec2.VpcLookupOptions; import software.amazon.awscdk.services.ecr.IRepository; import software.amazon.awscdk.services.ecr.Repository; +import software.amazon.awscdk.services.ecs.AwsLogDriverProps; import software.amazon.awscdk.services.ecs.Cluster; import software.amazon.awscdk.services.ecs.ContainerDefinitionOptions; import software.amazon.awscdk.services.ecs.ContainerImage; import software.amazon.awscdk.services.ecs.FargateTaskDefinition; +import software.amazon.awscdk.services.ecs.LogDriver; import software.amazon.awscdk.services.iam.Effect; import software.amazon.awscdk.services.iam.IRole; import software.amazon.awscdk.services.iam.PolicyStatement; +import software.amazon.awscdk.services.logs.LogGroup; import software.amazon.awscdk.services.s3.Bucket; import software.constructs.Construct; @@ -49,19 +52,14 @@ import java.util.List; -import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.core.properties.instance.CommonProperty.ID; import static sleeper.core.properties.instance.CommonProperty.JARS_BUCKET; import static sleeper.core.properties.instance.CommonProperty.VPC_ID; -import static sleeper.core.properties.instance.LoggingLevelsProperty.LOGGING_LEVEL; -import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_BUCKET_NAME; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_CLUSTER_NAME; -import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_ID; -import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_JARS_BUCKET; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_LOG_RETENTION_DAYS; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_REPO; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_TASK_CPU; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_TASK_MEMORY; -import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_VPC_ID; import static sleeper.systemtest.configuration.SystemTestProperty.WRITE_DATA_TASK_DEFINITION_FAMILY; public class SystemTestClusterStack extends NestedStack { @@ -69,13 +67,7 @@ public class SystemTestClusterStack extends NestedStack { public SystemTestClusterStack( Construct scope, String id, SystemTestStandaloneProperties properties, SystemTestBucketStack bucketStack) { super(scope, id); - InstanceProperties instanceProperties = new InstanceProperties(); - instanceProperties.set(ID, properties.get(SYSTEM_TEST_ID)); - instanceProperties.set(VPC_ID, properties.get(SYSTEM_TEST_VPC_ID)); - instanceProperties.set(JARS_BUCKET, properties.get(SYSTEM_TEST_JARS_BUCKET)); - instanceProperties.set(CONFIG_BUCKET, properties.get(SYSTEM_TEST_BUCKET_NAME)); - instanceProperties.set(LOGGING_LEVEL, "debug"); - createSystemTestCluster(properties, properties, instanceProperties, bucketStack); + createSystemTestCluster(properties, properties, properties.toInstancePropertiesForCdkUtils(), bucketStack); Tags.of(this).add("DeploymentStack", id); } @@ -94,10 +86,10 @@ private void createSystemTestCluster( .vpcId(instanceProperties.get(VPC_ID)) .build(); IVpc vpc = Vpc.fromLookup(this, "SystemTestVPC", vpcLookupOptions); + String instanceId = Utils.cleanInstanceId(instanceProperties); // ECS cluster for tasks to write data - String clusterName = String.join("-", "sleeper", - Utils.cleanInstanceId(instanceProperties), "system-test-cluster"); + String clusterName = String.join("-", "sleeper", instanceId, "system-test-cluster"); Cluster cluster = Cluster.Builder .create(this, "SystemTestCluster") .clusterName(clusterName) @@ -126,9 +118,16 @@ private void createSystemTestCluster( IRepository repository = Repository.fromRepositoryName(this, "SystemTestECR", properties.get(SYSTEM_TEST_REPO)); ContainerImage containerImage = ContainerImage.fromEcrRepository(repository, SleeperVersion.getVersion()); + String logGroupName = String.join("-", "sleeper", instanceId, "SystemTestTasks"); ContainerDefinitionOptions containerDefinitionOptions = ContainerDefinitionOptions.builder() .image(containerImage) - .logging(Utils.createECSContainerLogDriver(this, instanceProperties, "SystemTestTasks")) + .logging(LogDriver.awsLogs(AwsLogDriverProps.builder() + .streamPrefix(logGroupName) + .logGroup(LogGroup.Builder.create(this, "SystemTestTasks") + .logGroupName(logGroupName) + .retention(Utils.getRetentionDays(properties.getInt(SYSTEM_TEST_LOG_RETENTION_DAYS))) + .build()) + .build())) .environment(Utils.createDefaultEnvironment(instanceProperties)) .build(); taskDefinition.addContainer(SystemTestConstants.SYSTEM_TEST_CONTAINER, containerDefinitionOptions); diff --git a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java index a007bd55ed..556e22f922 100644 --- a/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java +++ b/java/system-test/system-test-cdk/src/main/java/sleeper/systemtest/cdk/SystemTestPropertiesStack.java @@ -22,6 +22,7 @@ import software.amazon.awscdk.customresources.Provider; import software.amazon.awscdk.services.lambda.IFunction; import software.amazon.awscdk.services.lambda.Runtime; +import software.amazon.awscdk.services.logs.LogGroup; import software.amazon.awscdk.services.s3.Bucket; import software.amazon.awscdk.services.s3.IBucket; import software.constructs.Construct; @@ -29,16 +30,16 @@ import sleeper.cdk.jars.BuiltJar; import sleeper.cdk.jars.BuiltJars; import sleeper.cdk.jars.LambdaCode; +import sleeper.cdk.util.Utils; import sleeper.systemtest.configuration.SystemTestStandaloneProperties; import java.util.HashMap; -import java.util.Locale; import java.util.Map; -import static sleeper.cdk.util.Utils.createLogGroupWithRetentionDays; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_ID; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_JARS_BUCKET; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_LOG_RETENTION_DAYS; public class SystemTestPropertiesStack extends NestedStack { @@ -54,8 +55,7 @@ public SystemTestPropertiesStack( HashMap properties = new HashMap<>(); properties.put("properties", systemTestProperties.saveAsString()); - String functionName = String.join("-", "sleeper", - systemTestProperties.get(SYSTEM_TEST_ID).toLowerCase(Locale.ROOT), "properties-writer"); + String functionName = String.join("-", "sleeper", Utils.cleanInstanceId(systemTestProperties.get(SYSTEM_TEST_ID)), "properties-writer"); IFunction propertiesWriterLambda = jar.buildFunction(this, "PropertiesWriterLambda", builder -> builder .functionName(functionName) @@ -63,14 +63,20 @@ public SystemTestPropertiesStack( .memorySize(2048) .environment(Map.of(CONFIG_BUCKET.toEnvironmentVariable(), bucketStack.getBucket().getBucketName())) .description("Lambda for writing system test properties to S3 upon initialisation and teardown") - .logGroup(createLogGroupWithRetentionDays(this, "PropertiesWriterLambdaLogGroup", 30)) + .logGroup(LogGroup.Builder.create(this, "PropertiesWriterLambdaLogGroup") + .logGroupName(functionName) + .retention(Utils.getRetentionDays(systemTestProperties.getInt(SYSTEM_TEST_LOG_RETENTION_DAYS))) + .build()) .runtime(Runtime.JAVA_11)); bucketStack.getBucket().grantWrite(propertiesWriterLambda); Provider propertiesWriterProvider = Provider.Builder.create(this, "PropertiesWriterProvider") .onEventHandler(propertiesWriterLambda) - .logGroup(createLogGroupWithRetentionDays(this, "PropertiesWriterProviderLogGroup", 30)) + .logGroup(LogGroup.Builder.create(this, "PropertiesWriterProviderLogGroup") + .logGroupName(functionName + "-provider") + .retention(Utils.getRetentionDays(systemTestProperties.getInt(SYSTEM_TEST_LOG_RETENTION_DAYS))) + .build()) .build(); CustomResource.Builder.create(this, "SystemTestProperties") diff --git a/java/system-test/system-test-configuration/src/main/java/sleeper/systemtest/configuration/SystemTestStandaloneProperties.java b/java/system-test/system-test-configuration/src/main/java/sleeper/systemtest/configuration/SystemTestStandaloneProperties.java index 66a18b524b..0b12b72c73 100644 --- a/java/system-test/system-test-configuration/src/main/java/sleeper/systemtest/configuration/SystemTestStandaloneProperties.java +++ b/java/system-test/system-test-configuration/src/main/java/sleeper/systemtest/configuration/SystemTestStandaloneProperties.java @@ -34,14 +34,21 @@ import java.util.Properties; import static sleeper.core.properties.PropertiesUtils.loadProperties; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; +import static sleeper.core.properties.instance.CommonProperty.ID; +import static sleeper.core.properties.instance.CommonProperty.JARS_BUCKET; import static sleeper.core.properties.instance.CommonProperty.LOG_RETENTION_IN_DAYS; +import static sleeper.core.properties.instance.CommonProperty.VPC_ID; import static sleeper.core.properties.instance.LoggingLevelsProperty.APACHE_LOGGING_LEVEL; import static sleeper.core.properties.instance.LoggingLevelsProperty.AWS_LOGGING_LEVEL; import static sleeper.core.properties.instance.LoggingLevelsProperty.LOGGING_LEVEL; import static sleeper.core.properties.instance.LoggingLevelsProperty.PARQUET_LOGGING_LEVEL; import static sleeper.core.properties.instance.LoggingLevelsProperty.ROOT_LOGGING_LEVEL; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_BUCKET_NAME; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_ID; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_JARS_BUCKET; import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_LOG_RETENTION_DAYS; +import static sleeper.systemtest.configuration.SystemTestProperty.SYSTEM_TEST_VPC_ID; public class SystemTestStandaloneProperties extends SleeperProperties @@ -107,6 +114,10 @@ protected SleeperPropertiesPrettyPrinter getPrettyPrinter(Pr public InstanceProperties toInstancePropertiesForCdkUtils() { InstanceProperties instanceProperties = new InstanceProperties(); + instanceProperties.set(ID, get(SYSTEM_TEST_ID)); + instanceProperties.set(VPC_ID, get(SYSTEM_TEST_VPC_ID)); + instanceProperties.set(JARS_BUCKET, get(SYSTEM_TEST_JARS_BUCKET)); + instanceProperties.set(CONFIG_BUCKET, get(SYSTEM_TEST_BUCKET_NAME)); instanceProperties.set(LOG_RETENTION_IN_DAYS, get(SYSTEM_TEST_LOG_RETENTION_DAYS)); instanceProperties.set(LOGGING_LEVEL, "DEBUG"); instanceProperties.set(ROOT_LOGGING_LEVEL, "INFO"); diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/SleeperSystemTest.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/SleeperSystemTest.java index d97e642739..c824756465 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/SleeperSystemTest.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/SleeperSystemTest.java @@ -43,6 +43,7 @@ import sleeper.systemtest.dsl.statestore.SystemTestStateStore; import java.nio.file.Path; +import java.util.Collection; import java.util.Map; import java.util.stream.LongStream; @@ -175,10 +176,18 @@ public void enableOptionalStack(OptionalStack stack) { new SystemTestOptionalStacks(context.instance()).addOptionalStack(stack); } + public void enableOptionalStacks(Collection stacks) { + new SystemTestOptionalStacks(context.instance()).addOptionalStacks(stacks); + } + public void disableOptionalStack(OptionalStack stack) { new SystemTestOptionalStacks(context.instance()).removeOptionalStack(stack); } + public void disableOptionalStacks(Collection stacks) { + new SystemTestOptionalStacks(context.instance()).removeOptionalStacks(stacks); + } + public SystemTestStateStore stateStore() { return new SystemTestStateStore(context); } diff --git a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestOptionalStacks.java b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestOptionalStacks.java index f83fbf7360..47359814f6 100644 --- a/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestOptionalStacks.java +++ b/java/system-test/system-test-dsl/src/main/java/sleeper/systemtest/dsl/instance/SystemTestOptionalStacks.java @@ -24,6 +24,7 @@ import sleeper.core.properties.validation.OptionalStack; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedHashSet; import java.util.Set; import java.util.function.Consumer; @@ -57,6 +58,16 @@ public void removeOptionalStack(OptionalStack stack) { updateOptionalStacks(stacks -> stacks.remove(stack)); } + public void addOptionalStacks(Collection stacks) { + LOGGER.info("Adding optional stacks: {}", stacks); + updateOptionalStacks(stacksSet -> stacksSet.addAll(stacks)); + } + + public void removeOptionalStacks(Collection stacks) { + LOGGER.info("Removing optional stacks: {}", stacks); + updateOptionalStacks(stacksSet -> stacksSet.removeAll(stacks)); + } + private OptionalStack stack(Class stackClass) { return EnumUtils.getEnumIgnoreCase(OptionalStack.class, stackClass.getSimpleName()); } diff --git a/java/system-test/system-test-suite/src/main/java/sleeper/systemtest/suite/fixtures/SystemTestInstance.java b/java/system-test/system-test-suite/src/main/java/sleeper/systemtest/suite/fixtures/SystemTestInstance.java index e36df5910e..4cc5eff45b 100644 --- a/java/system-test/system-test-suite/src/main/java/sleeper/systemtest/suite/fixtures/SystemTestInstance.java +++ b/java/system-test/system-test-suite/src/main/java/sleeper/systemtest/suite/fixtures/SystemTestInstance.java @@ -73,6 +73,7 @@ private SystemTestInstance() { public static final SystemTestInstanceConfiguration INGEST_PERFORMANCE = usingSystemTestDefaults("ingest", SystemTestInstance::buildIngestPerformanceConfiguration); public static final SystemTestInstanceConfiguration COMPACTION_PERFORMANCE = usingSystemTestDefaults("compact", SystemTestInstance::buildCompactionPerformanceConfiguration); public static final SystemTestInstanceConfiguration BULK_IMPORT_PERFORMANCE = usingSystemTestDefaults("emr", SystemTestInstance::buildBulkImportPerformanceConfiguration); + public static final SystemTestInstanceConfiguration REENABLE_OPTIONAL_STACKS = usingSystemTestDefaults("optstck", SystemTestInstance::buildMainConfiguration); public static final SystemTestInstanceConfiguration INGEST_NO_SOURCE_BUCKET = noSourceBucket("no-src", SystemTestInstance::buildMainConfiguration); public static final SystemTestInstanceConfiguration PARALLEL_COMPACTIONS = usingSystemTestDefaults("cpt-pll", SystemTestInstance::buildCompactionInParallelConfiguration); public static final SystemTestInstanceConfiguration COMPACTION_ON_EC2 = usingSystemTestDefaults("cpt-ec2", SystemTestInstance::buildCompactionOnEC2Configuration); diff --git a/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/RedeployOptionalStacksST.java b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/RedeployOptionalStacksST.java new file mode 100644 index 0000000000..7d2ce951b0 --- /dev/null +++ b/java/system-test/system-test-suite/src/test/java/sleeper/systemtest/suite/RedeployOptionalStacksST.java @@ -0,0 +1,62 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.systemtest.suite; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import sleeper.core.properties.validation.OptionalStack; +import sleeper.systemtest.dsl.SleeperSystemTest; +import sleeper.systemtest.suite.testutil.Slow; +import sleeper.systemtest.suite.testutil.SystemTest; + +import java.util.LinkedHashSet; +import java.util.Set; + +import static sleeper.systemtest.suite.fixtures.SystemTestInstance.REENABLE_OPTIONAL_STACKS; + +@SystemTest +// Slow because it needs to do many CDK deployments +@Slow +public class RedeployOptionalStacksST { + + private static final Set REDEPLOYABLE_STACKS = new LinkedHashSet<>(OptionalStack.all()); + static { + // We're currently unable to configure some of the log groups related to an EKS cluster, so it fails to redeploy + // because those log groups are retained and already exist. Here's the issue for this problem: + // https://github.com/gchq/sleeper/issues/3480 (Can't redeploy EKS bulk import optional stack) + REDEPLOYABLE_STACKS.remove(OptionalStack.EksBulkImportStack); + } + + @BeforeEach + void setUp(SleeperSystemTest sleeper) { + sleeper.connectToInstance(REENABLE_OPTIONAL_STACKS); + } + + @AfterEach + void tearDown(SleeperSystemTest sleeper) { + sleeper.disableOptionalStacks(OptionalStack.all()); + } + + @Test + void shouldDisableAndReenableAllOptionalStacks(SleeperSystemTest sleeper) { + sleeper.enableOptionalStacks(REDEPLOYABLE_STACKS); + sleeper.disableOptionalStacks(OptionalStack.all()); + sleeper.enableOptionalStacks(REDEPLOYABLE_STACKS); + } + +} diff --git a/scripts/templates/instanceproperties.template b/scripts/templates/instanceproperties.template index c6cdbc627b..caa45d28ba 100644 --- a/scripts/templates/instanceproperties.template +++ b/scripts/templates/instanceproperties.template @@ -82,7 +82,7 @@ sleeper.retain.infra.after.destroy=true # PersistentEmrBulkImportStack, EksBulkImportStack, EmrStudioStack, QueryStack, WebSocketQueryStack, # AthenaStack, KeepLambdaWarmStack, CompactionStack, GarbageCollectorStack, PartitionSplittingStack, # DashboardStack, TableMetricsStack] -sleeper.optional.stacks=CompactionStack,GarbageCollectorStack,IngestStack,IngestBatcherStack,PartitionSplittingStack,QueryStack,AthenaStack,EmrServerlessBulkImportStack,EmrStudioStack,DashboardStack,TableMetricsStack +sleeper.optional.stacks=IngestStack,IngestBatcherStack,EmrServerlessBulkImportStack,EmrStudioStack,QueryStack,AthenaStack,CompactionStack,GarbageCollectorStack,PartitionSplittingStack,DashboardStack,TableMetricsStack # Whether to check that the VPC that the instance is deployed to has an S3 endpoint. If there is no S3 # endpoint then the NAT costs can be very significant.