Skip to content

Commit

Permalink
Merge pull request #3063 from gchq/3060-measure-deployed-state-store-…
Browse files Browse the repository at this point in the history
…committer-throughput

Issue 3070 - Wait for state store commits to finish based on logs
  • Loading branch information
patchwork01 authored Aug 14, 2024
2 parents 6e4fa6b + 48958ad commit 14706e0
Show file tree
Hide file tree
Showing 30 changed files with 1,146 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.LogGroup;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.amazon.awscdk.services.sns.Topic;
Expand All @@ -41,6 +42,7 @@
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.configuration.properties.instance.CommonProperty.STATESTORE_COMMITTER_BATCH_SIZE;
Expand Down Expand Up @@ -71,7 +73,7 @@ public StateStoreCommitterStack(
LambdaCode committerJar = jars.lambdaCode(BuiltJar.STATESTORE, jarsBucket);

commitQueue = sqsQueueForStateStoreCommitter(policiesStack, topic, errorMetrics);
lambdaToCommitStateStoreUpdates(committerJar,
lambdaToCommitStateStoreUpdates(policiesStack, committerJar,
configBucketStack, tableIndexStack, stateStoreStacks,
compactionStatusStore, ingestStatusStore);
}
Expand Down Expand Up @@ -108,14 +110,16 @@ private Queue sqsQueueForStateStoreCommitter(ManagedPoliciesStack policiesStack,
}

private void lambdaToCommitStateStoreUpdates(
LambdaCode committerJar, ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack,
StateStoreStacks stateStoreStacks,
ManagedPoliciesStack policiesStack, LambdaCode committerJar,
ConfigBucketStack configBucketStack, TableIndexStack tableIndexStack, StateStoreStacks stateStoreStacks,
CompactionStatusStoreResources compactionStatusStore,
IngestStatusStoreResources ingestStatusStore) {
Map<String, String> environmentVariables = Utils.createDefaultEnvironment(instanceProperties);

String functionName = String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "statestore-committer");
LogGroup logGroup = createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties);
instanceProperties.set(STATESTORE_COMMITTER_LOG_GROUP, logGroup.getLogGroupName());

IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder
.functionName(functionName)
Expand All @@ -125,12 +129,14 @@ private void lambdaToCommitStateStoreUpdates(
.timeout(Duration.seconds(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.statestore.committer.lambda.StateStoreCommitterLambda::handleRequest")
.environment(environmentVariables)
.logGroup(createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties)));
.logGroup(logGroup));

handlerFunction.addEventSource(SqsEventSource.Builder.create(commitQueue)
.batchSize(instanceProperties.getInt(STATESTORE_COMMITTER_BATCH_SIZE))
.build());

logGroup.grantRead(policiesStack.getReportingPolicyForGrants());
logGroup.grant(policiesStack.getReportingPolicyForGrants(), "logs:StartQuery", "logs:GetQueryResults");
configBucketStack.grantRead(handlerFunction);
tableIndexStack.grantRead(handlerFunction);
stateStoreStacks.grantReadWriteAllFilesAndPartitions(handlerFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.description("The ARN of the dead letter queue for statestore commit requests.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
CdkDefinedInstanceProperty STATESTORE_COMMITTER_LOG_GROUP = Index.propertyBuilder("sleeper.statestore.committer.log.group")
.description("The nane of the log group for the state store committer.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();

// Table metrics
CdkDefinedInstanceProperty TABLE_METRICS_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.table.metrics.lambda.function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import sleeper.compaction.job.commit.CompactionJobCommitRequest;
import sleeper.compaction.job.commit.CompactionJobIdAssignmentCommitRequest;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.commit.SplitPartitionCommitRequest;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.ingest.job.commit.IngestAddFilesCommitRequest;

import java.util.Objects;
Expand All @@ -29,6 +29,8 @@
public class StateStoreCommitRequest {

private final Object request;
private final String tableId;
private final ApplyRequest applyRequest;

/**
* Creates a request to commit the results of a compaction job.
Expand All @@ -37,7 +39,7 @@ public class StateStoreCommitRequest {
* @return a state store commit request
*/
public static StateStoreCommitRequest forCompactionJob(CompactionJobCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getJob().getTableId(), committer -> committer.commitCompaction(request));
}

/**
Expand All @@ -47,7 +49,7 @@ public static StateStoreCommitRequest forCompactionJob(CompactionJobCommitReques
* @return a state store commit request
*/
public static StateStoreCommitRequest forCompactionJobIdAssignment(CompactionJobIdAssignmentCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.assignCompactionInputFiles(request));
}

/**
Expand All @@ -57,7 +59,7 @@ public static StateStoreCommitRequest forCompactionJobIdAssignment(CompactionJob
* @return a state store commit request
*/
public static StateStoreCommitRequest forIngestAddFiles(IngestAddFilesCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.addFiles(request));
}

/**
Expand All @@ -67,27 +69,27 @@ public static StateStoreCommitRequest forIngestAddFiles(IngestAddFilesCommitRequ
* @return a state store commit request
*/
public static StateStoreCommitRequest forSplitPartition(SplitPartitionCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.splitPartition(request));
}

/**
* Creates a request which is stored in S3.
*
* @param request the commit request
* @return a state store commit request
*/
public static StateStoreCommitRequest storedInS3(StateStoreCommitRequestInS3 request) {
return new StateStoreCommitRequest(request);
}

private StateStoreCommitRequest(Object request) {
private StateStoreCommitRequest(Object request, String tableId, ApplyRequest applyRequest) {
this.request = request;
this.tableId = tableId;
this.applyRequest = applyRequest;
}

public Object getRequest() {
return request;
}

public String getTableId() {
return tableId;
}

void apply(StateStoreCommitter committer) throws StateStoreException {
applyRequest.apply(committer);
}

@Override
public int hashCode() {
return Objects.hash(request);
Expand All @@ -110,4 +112,11 @@ public String toString() {
return "StateStoreCommitRequest{request=" + request + "}";
}

/**
* Applies the current request with a given committer.
*/
@FunctionalInterface
private interface ApplyRequest {
void apply(StateStoreCommitter committer) throws StateStoreException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.commit.StateStoreCommitter.LoadS3ObjectFromDataBucket;
import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobJsonSerDe;
import sleeper.compaction.job.commit.CompactionJobCommitRequest;
Expand All @@ -46,11 +47,20 @@ public class StateStoreCommitRequestDeserialiser {
public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreCommitRequestDeserialiser.class);

private final Gson gson;
private final Gson gsonFromDataBucket;
private final LoadS3ObjectFromDataBucket loadFromDataBucket;

public StateStoreCommitRequestDeserialiser(
TablePropertiesProvider tablePropertiesProvider, LoadS3ObjectFromDataBucket loadFromDataBucket) {
gson = gson(tablePropertiesProvider, this::fromDataBucket);
gsonFromDataBucket = gson(tablePropertiesProvider, DeserialiseFromDataBucket.refuseWhileReadingFromBucket());
this.loadFromDataBucket = loadFromDataBucket;
}

public StateStoreCommitRequestDeserialiser(TablePropertiesProvider tablePropertiesProvider) {
gson = GsonConfig.standardBuilder()
private static Gson gson(TablePropertiesProvider tablePropertiesProvider, DeserialiseFromDataBucket readFromDataBucket) {
return GsonConfig.standardBuilder()
.registerTypeAdapter(CompactionJob.class, new CompactionJobJsonSerDe())
.registerTypeAdapter(StateStoreCommitRequest.class, new WrapperDeserialiser())
.registerTypeAdapter(StateStoreCommitRequest.class, new WrapperDeserialiser(readFromDataBucket))
.registerTypeAdapter(SplitPartitionCommitRequest.class, new SplitPartitionDeserialiser(tablePropertiesProvider))
.serializeNulls()
.create();
Expand All @@ -66,11 +76,22 @@ public StateStoreCommitRequest fromJson(String jsonString) {
return gson.fromJson(jsonString, StateStoreCommitRequest.class);
}

private StateStoreCommitRequest fromDataBucket(StateStoreCommitRequestInS3 request) {
String json = loadFromDataBucket.loadFromDataBucket(request.getKeyInS3());
return gsonFromDataBucket.fromJson(json, StateStoreCommitRequest.class);
}

/**
* Deserialises the commit request by reading the type.
*/
private static class WrapperDeserialiser implements JsonDeserializer<StateStoreCommitRequest> {

private final DeserialiseFromDataBucket fromDataBucket;

private WrapperDeserialiser(DeserialiseFromDataBucket fromDataBucket) {
this.fromDataBucket = fromDataBucket;
}

@Override
public StateStoreCommitRequest deserialize(JsonElement json, Type wrapperType, JsonDeserializationContext context) throws JsonParseException {

Expand All @@ -88,15 +109,15 @@ public StateStoreCommitRequest deserialize(JsonElement json, Type wrapperType, J
case INGEST_ADD_FILES:
return StateStoreCommitRequest.forIngestAddFiles(
context.deserialize(requestObj, IngestAddFilesCommitRequest.class));
case STORED_IN_S3:
return StateStoreCommitRequest.storedInS3(
context.deserialize(requestObj, StateStoreCommitRequestInS3.class));
case COMPACTION_JOB_ID_ASSIGNMENT:
return StateStoreCommitRequest.forCompactionJobIdAssignment(
context.deserialize(requestObj, CompactionJobIdAssignmentCommitRequest.class));
case SPLIT_PARTITION:
return StateStoreCommitRequest.forSplitPartition(
context.deserialize(requestObj, SplitPartitionCommitRequest.class));
case STORED_IN_S3:
return fromDataBucket.read(
context.deserialize(requestObj, StateStoreCommitRequestInS3.class));
default:
throw new CommitRequestValidationException("Unrecognised request type");
}
Expand Down Expand Up @@ -138,4 +159,19 @@ public SplitPartitionCommitRequest deserialize(JsonElement jsonElement, Type typ
return new SplitPartitionCommitRequest(tableId, parentPartition, leftChildPartition, rightChildPartition);
}
}

/**
* Reads and deserialises a commit request from the data bucket. An alternative implementation will refuse reading
* from the bucket because the pointer was already stored in S3.
*/
@FunctionalInterface
private interface DeserialiseFromDataBucket {
StateStoreCommitRequest read(StateStoreCommitRequestInS3 request);

static DeserialiseFromDataBucket refuseWhileReadingFromBucket() {
return request -> {
throw new IllegalArgumentException("Found a request stored in S3 pointing to another S3 object: " + request.getKeyInS3());
};
}
}
}
Loading

0 comments on commit 14706e0

Please sign in to comment.