diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java index f40651856c..d94bcdefd3 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/StateStoreCommitterStack.java @@ -21,6 +21,7 @@ import software.amazon.awscdk.services.iam.IGrantable; import software.amazon.awscdk.services.lambda.IFunction; import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource; +import software.amazon.awscdk.services.logs.LogGroup; import software.amazon.awscdk.services.s3.Bucket; import software.amazon.awscdk.services.s3.IBucket; import software.amazon.awscdk.services.sns.Topic; @@ -41,6 +42,7 @@ import static sleeper.cdk.Utils.createLambdaLogGroup; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL; +import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL; import static sleeper.configuration.properties.instance.CommonProperty.STATESTORE_COMMITTER_BATCH_SIZE; @@ -116,6 +118,8 @@ private void lambdaToCommitStateStoreUpdates( String functionName = String.join("-", "sleeper", Utils.cleanInstanceId(instanceProperties), "statestore-committer"); + LogGroup logGroup = createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties); + instanceProperties.set(STATESTORE_COMMITTER_LOG_GROUP, logGroup.getLogGroupName()); IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder .functionName(functionName) @@ -125,7 +129,7 @@ private void lambdaToCommitStateStoreUpdates( .timeout(Duration.seconds(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS))) .handler("sleeper.statestore.committer.lambda.StateStoreCommitterLambda::handleRequest") .environment(environmentVariables) - .logGroup(createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties))); + .logGroup(logGroup)); handlerFunction.addEventSource(SqsEventSource.Builder.create(commitQueue) .batchSize(instanceProperties.getInt(STATESTORE_COMMITTER_BATCH_SIZE)) diff --git a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CdkDefinedInstanceProperty.java b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CdkDefinedInstanceProperty.java index da2aaf74ef..24947ccfda 100644 --- a/java/configuration/src/main/java/sleeper/configuration/properties/instance/CdkDefinedInstanceProperty.java +++ b/java/configuration/src/main/java/sleeper/configuration/properties/instance/CdkDefinedInstanceProperty.java @@ -179,6 +179,10 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty { .description("The ARN of the dead letter queue for statestore commit requests.") .propertyGroup(InstancePropertyGroup.COMMON) .build(); + CdkDefinedInstanceProperty STATESTORE_COMMITTER_LOG_GROUP = Index.propertyBuilder("sleeper.statestore.committer.log.group") + .description("The nane of the log group for the state store committer.") + .propertyGroup(InstancePropertyGroup.COMMON) + .build(); // Table metrics CdkDefinedInstanceProperty TABLE_METRICS_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.table.metrics.lambda.function") diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java index 759ac83be7..2c8f621270 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java @@ -18,29 +18,41 @@ import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; +import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse; +import software.amazon.awssdk.services.cloudwatchlogs.model.QueryStatus; +import sleeper.core.util.PollWithRetries; import sleeper.core.util.SplitIntoBatches; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage; import sleeper.systemtest.dsl.statestore.StateStoreCommitterDriver; import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun; +import java.time.Duration; import java.time.Instant; import java.util.List; +import java.util.Set; import java.util.UUID; import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; +import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL; public class AwsStateStoreCommitterDriver implements StateStoreCommitterDriver { + public static final Logger LOGGER = LoggerFactory.getLogger(AwsStateStoreCommitterDriver.class); private final SystemTestInstanceContext instance; private final AmazonSQS sqs; + private final CloudWatchLogsClient cloudWatch; - public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQS sqs) { + public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQS sqs, CloudWatchLogsClient cloudWatch) { this.instance = instance; this.sqs = sqs; + this.cloudWatch = cloudWatch; } @Override @@ -50,8 +62,21 @@ public void sendCommitMessages(Stream messages) { @Override public List getRunsAfter(Instant startTime) { - // TODO Auto-generated method stub - return null; + String logGroupName = instance.getInstanceProperties().get(STATESTORE_COMMITTER_LOG_GROUP); + LOGGER.info("Submitting logs query for log group {} starting at time {}", logGroupName, startTime); + String queryId = cloudWatch.startQuery(builder -> builder + .logGroupName(logGroupName) + .startTime(startTime.getEpochSecond()) + .endTime(Instant.now().plus(Duration.ofMinutes(1)).getEpochSecond()) + .limit(10000) + .queryString("fields @timestamp, @message, @logStream " + + "| filter @message like /Lambda (started|finished) at|Applied request to table/ " + + "| sort @timestamp asc")) + .queryId(); + GetQueryResultsResponse response = waitForQuery(queryId); + StateStoreCommitterRunsBuilder builder = new StateStoreCommitterRunsBuilder(); + response.results().forEach(builder::add); + return builder.buildRuns(); } private void sendMessageBatch(List batch) { @@ -66,4 +91,29 @@ private void sendMessageBatch(List batch) { .collect(toUnmodifiableList()))); } + private GetQueryResultsResponse waitForQuery(String queryId) { + try { + return PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(1), Duration.ofMinutes(1)) + .queryUntil("query is completed", + () -> cloudWatch.getQueryResults(builder -> builder.queryId(queryId)), + results -> isQueryCompleted(results)); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + private static boolean isQueryCompleted(GetQueryResultsResponse response) { + LOGGER.info("Logs query response status {}, statistics: {}", + response.statusAsString(), response.statistics()); + QueryStatus status = response.status(); + if (status == QueryStatus.COMPLETE) { + return true; + } else if (Set.of(QueryStatus.SCHEDULED, QueryStatus.RUNNING).contains(status)) { + return false; + } else { + throw new RuntimeException("Logs query failed with status " + response.statusAsString()); + } + } + } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java index 99af68d260..bb24ff4c8c 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsSystemTestDrivers.java @@ -112,7 +112,7 @@ public SleeperTablesDriver tables(SystemTestParameters parameters) { @Override public StateStoreCommitterDriver stateStoreCommitter(SystemTestContext context) { - return new AwsStateStoreCommitterDriver(context.instance(), clients.getSqs()); + return new AwsStateStoreCommitterDriver(context.instance(), clients.getSqs(), clients.getCloudWatchLogs()); } @Override diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/SystemTestClients.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/SystemTestClients.java index 81bfcdc92f..777930a4ed 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/SystemTestClients.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/SystemTestClients.java @@ -40,6 +40,7 @@ import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain; import software.amazon.awssdk.services.cloudformation.CloudFormationClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchClient; +import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient; import software.amazon.awssdk.services.emrserverless.EmrServerlessClient; import software.amazon.awssdk.services.lambda.LambdaClient; import software.amazon.awssdk.services.lambda.LambdaClientBuilder; @@ -78,6 +79,7 @@ public class SystemTestClients { private final AmazonAutoScaling autoScaling; private final AmazonECR ecr; private final CloudWatchClient cloudWatch; + private final CloudWatchLogsClient cloudWatchLogs; private final AmazonCloudWatchEvents cloudWatchEvents; private final Supplier> getAuthEnvVars; private final UnaryOperator configureHadoop; @@ -100,6 +102,7 @@ private SystemTestClients(Builder builder) { autoScaling = builder.autoScaling; ecr = builder.ecr; cloudWatch = builder.cloudWatch; + cloudWatchLogs = builder.cloudWatchLogs; cloudWatchEvents = builder.cloudWatchEvents; getAuthEnvVars = builder.getAuthEnvVars; configureHadoop = builder.configureHadoop; @@ -128,6 +131,7 @@ public static SystemTestClients fromDefaults() { .autoScaling(AmazonAutoScalingClientBuilder.defaultClient()) .ecr(AmazonECRClientBuilder.defaultClient()) .cloudWatch(CloudWatchClient.create()) + .cloudWatchLogs(CloudWatchLogsClient.create()) .cloudWatchEvents(AmazonCloudWatchEventsClientBuilder.defaultClient()) .build(); } @@ -156,6 +160,7 @@ public SystemTestClients assumeRole(AssumeSleeperRole assumeRole) { .autoScaling(v1.buildClient(AmazonAutoScalingClientBuilder.standard())) .ecr(v1.buildClient(AmazonECRClientBuilder.standard())) .cloudWatch(v2.buildClient(CloudWatchClient.builder())) + .cloudWatchLogs(v2.buildClient(CloudWatchLogsClient.builder())) .cloudWatchEvents(v1.buildClient(AmazonCloudWatchEventsClientBuilder.standard())) .getAuthEnvVars(v1::authEnvVars) .configureHadoop(hadoop::setS3ACredentials) @@ -222,6 +227,10 @@ public CloudWatchClient getCloudWatch() { return cloudWatch; } + public CloudWatchLogsClient getCloudWatchLogs() { + return cloudWatchLogs; + } + public AmazonCloudWatchEvents getCloudWatchEvents() { return cloudWatchEvents; } @@ -264,6 +273,7 @@ public static class Builder { private AmazonAutoScaling autoScaling; private AmazonECR ecr; private CloudWatchClient cloudWatch; + private CloudWatchLogsClient cloudWatchLogs; private AmazonCloudWatchEvents cloudWatchEvents; private Supplier> getAuthEnvVars = Map::of; private UnaryOperator configureHadoop = conf -> conf; @@ -352,6 +362,11 @@ public Builder cloudWatch(CloudWatchClient cloudWatch) { return this; } + public Builder cloudWatchLogs(CloudWatchLogsClient cloudWatchLogs) { + this.cloudWatchLogs = cloudWatchLogs; + return this; + } + public Builder cloudWatchEvents(AmazonCloudWatchEvents cloudWatchEvents) { this.cloudWatchEvents = cloudWatchEvents; return this;