Skip to content

Commit

Permalink
Read logs in AwsStateStoreCommitterDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Aug 13, 2024
1 parent 6206a31 commit 8bf2804
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.LogGroup;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.amazon.awscdk.services.sns.Topic;
Expand All @@ -41,6 +42,7 @@
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.configuration.properties.instance.CommonProperty.STATESTORE_COMMITTER_BATCH_SIZE;
Expand Down Expand Up @@ -116,6 +118,8 @@ private void lambdaToCommitStateStoreUpdates(

String functionName = String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "statestore-committer");
LogGroup logGroup = createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties);
instanceProperties.set(STATESTORE_COMMITTER_LOG_GROUP, logGroup.getLogGroupName());

IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder
.functionName(functionName)
Expand All @@ -125,7 +129,7 @@ private void lambdaToCommitStateStoreUpdates(
.timeout(Duration.seconds(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.statestore.committer.lambda.StateStoreCommitterLambda::handleRequest")
.environment(environmentVariables)
.logGroup(createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties)));
.logGroup(logGroup));

handlerFunction.addEventSource(SqsEventSource.Builder.create(commitQueue)
.batchSize(instanceProperties.getInt(STATESTORE_COMMITTER_BATCH_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.description("The ARN of the dead letter queue for statestore commit requests.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
CdkDefinedInstanceProperty STATESTORE_COMMITTER_LOG_GROUP = Index.propertyBuilder("sleeper.statestore.committer.log.group")
.description("The nane of the log group for the state store committer.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();

// Table metrics
CdkDefinedInstanceProperty TABLE_METRICS_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.table.metrics.lambda.function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,41 @@
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.cloudwatchlogs.model.GetQueryResultsResponse;
import software.amazon.awssdk.services.cloudwatchlogs.model.QueryStatus;

import sleeper.core.util.PollWithRetries;
import sleeper.core.util.SplitIntoBatches;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage;
import sleeper.systemtest.dsl.statestore.StateStoreCommitterDriver;
import sleeper.systemtest.dsl.statestore.StateStoreCommitterRun;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;

public class AwsStateStoreCommitterDriver implements StateStoreCommitterDriver {
public static final Logger LOGGER = LoggerFactory.getLogger(AwsStateStoreCommitterDriver.class);

private final SystemTestInstanceContext instance;
private final AmazonSQS sqs;
private final CloudWatchLogsClient cloudWatch;

public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQS sqs) {
public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQS sqs, CloudWatchLogsClient cloudWatch) {
this.instance = instance;
this.sqs = sqs;
this.cloudWatch = cloudWatch;
}

@Override
Expand All @@ -50,8 +62,21 @@ public void sendCommitMessages(Stream<StateStoreCommitMessage> messages) {

@Override
public List<StateStoreCommitterRun> getRunsAfter(Instant startTime) {
// TODO Auto-generated method stub
return null;
String logGroupName = instance.getInstanceProperties().get(STATESTORE_COMMITTER_LOG_GROUP);
LOGGER.info("Submitting logs query for log group {} starting at time {}", logGroupName, startTime);
String queryId = cloudWatch.startQuery(builder -> builder
.logGroupName(logGroupName)
.startTime(startTime.getEpochSecond())
.endTime(Instant.now().plus(Duration.ofMinutes(1)).getEpochSecond())
.limit(10000)
.queryString("fields @timestamp, @message, @logStream " +
"| filter @message like /Lambda (started|finished) at|Applied request to table/ " +
"| sort @timestamp asc"))
.queryId();
GetQueryResultsResponse response = waitForQuery(queryId);
StateStoreCommitterRunsBuilder builder = new StateStoreCommitterRunsBuilder();
response.results().forEach(builder::add);
return builder.buildRuns();
}

private void sendMessageBatch(List<StateStoreCommitMessage> batch) {
Expand All @@ -66,4 +91,29 @@ private void sendMessageBatch(List<StateStoreCommitMessage> batch) {
.collect(toUnmodifiableList())));
}

private GetQueryResultsResponse waitForQuery(String queryId) {
try {
return PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(1), Duration.ofMinutes(1))
.queryUntil("query is completed",
() -> cloudWatch.getQueryResults(builder -> builder.queryId(queryId)),
results -> isQueryCompleted(results));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

private static boolean isQueryCompleted(GetQueryResultsResponse response) {
LOGGER.info("Logs query response status {}, statistics: {}",
response.statusAsString(), response.statistics());
QueryStatus status = response.status();
if (status == QueryStatus.COMPLETE) {
return true;
} else if (Set.of(QueryStatus.SCHEDULED, QueryStatus.RUNNING).contains(status)) {
return false;
} else {
throw new RuntimeException("Logs query failed with status " + response.statusAsString());
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public SleeperTablesDriver tables(SystemTestParameters parameters) {

@Override
public StateStoreCommitterDriver stateStoreCommitter(SystemTestContext context) {
return new AwsStateStoreCommitterDriver(context.instance(), clients.getSqs());
return new AwsStateStoreCommitterDriver(context.instance(), clients.getSqs(), clients.getCloudWatchLogs());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import software.amazon.awssdk.regions.providers.DefaultAwsRegionProviderChain;
import software.amazon.awssdk.services.cloudformation.CloudFormationClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchClient;
import software.amazon.awssdk.services.cloudwatchlogs.CloudWatchLogsClient;
import software.amazon.awssdk.services.emrserverless.EmrServerlessClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.lambda.LambdaClientBuilder;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class SystemTestClients {
private final AmazonAutoScaling autoScaling;
private final AmazonECR ecr;
private final CloudWatchClient cloudWatch;
private final CloudWatchLogsClient cloudWatchLogs;
private final AmazonCloudWatchEvents cloudWatchEvents;
private final Supplier<Map<String, String>> getAuthEnvVars;
private final UnaryOperator<Configuration> configureHadoop;
Expand All @@ -100,6 +102,7 @@ private SystemTestClients(Builder builder) {
autoScaling = builder.autoScaling;
ecr = builder.ecr;
cloudWatch = builder.cloudWatch;
cloudWatchLogs = builder.cloudWatchLogs;
cloudWatchEvents = builder.cloudWatchEvents;
getAuthEnvVars = builder.getAuthEnvVars;
configureHadoop = builder.configureHadoop;
Expand Down Expand Up @@ -128,6 +131,7 @@ public static SystemTestClients fromDefaults() {
.autoScaling(AmazonAutoScalingClientBuilder.defaultClient())
.ecr(AmazonECRClientBuilder.defaultClient())
.cloudWatch(CloudWatchClient.create())
.cloudWatchLogs(CloudWatchLogsClient.create())
.cloudWatchEvents(AmazonCloudWatchEventsClientBuilder.defaultClient())
.build();
}
Expand Down Expand Up @@ -156,6 +160,7 @@ public SystemTestClients assumeRole(AssumeSleeperRole assumeRole) {
.autoScaling(v1.buildClient(AmazonAutoScalingClientBuilder.standard()))
.ecr(v1.buildClient(AmazonECRClientBuilder.standard()))
.cloudWatch(v2.buildClient(CloudWatchClient.builder()))
.cloudWatchLogs(v2.buildClient(CloudWatchLogsClient.builder()))
.cloudWatchEvents(v1.buildClient(AmazonCloudWatchEventsClientBuilder.standard()))
.getAuthEnvVars(v1::authEnvVars)
.configureHadoop(hadoop::setS3ACredentials)
Expand Down Expand Up @@ -222,6 +227,10 @@ public CloudWatchClient getCloudWatch() {
return cloudWatch;
}

public CloudWatchLogsClient getCloudWatchLogs() {
return cloudWatchLogs;
}

public AmazonCloudWatchEvents getCloudWatchEvents() {
return cloudWatchEvents;
}
Expand Down Expand Up @@ -264,6 +273,7 @@ public static class Builder {
private AmazonAutoScaling autoScaling;
private AmazonECR ecr;
private CloudWatchClient cloudWatch;
private CloudWatchLogsClient cloudWatchLogs;
private AmazonCloudWatchEvents cloudWatchEvents;
private Supplier<Map<String, String>> getAuthEnvVars = Map::of;
private UnaryOperator<Configuration> configureHadoop = conf -> conf;
Expand Down Expand Up @@ -352,6 +362,11 @@ public Builder cloudWatch(CloudWatchClient cloudWatch) {
return this;
}

public Builder cloudWatchLogs(CloudWatchLogsClient cloudWatchLogs) {
this.cloudWatchLogs = cloudWatchLogs;
return this;
}

public Builder cloudWatchEvents(AmazonCloudWatchEvents cloudWatchEvents) {
this.cloudWatchEvents = cloudWatchEvents;
return this;
Expand Down

0 comments on commit 8bf2804

Please sign in to comment.