Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3070 - Wait for state store commits to finish based on logs #3063

Merged
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
c11b404
Set up StateStoreCommitterST
patchwork01 Aug 12, 2024
3a27409
Refactor StateStoreCommitRequest to hold table ID
patchwork01 Aug 12, 2024
e8175a7
Move reading from S3 into StateStoreCommitRequestDeserialiser
patchwork01 Aug 12, 2024
b9df6d6
Assert on StateStoreCommitRequest.getTableId
patchwork01 Aug 12, 2024
47183ef
Merge branch '3061-send-commits-in-parallel' into 3060-measure-deploy…
patchwork01 Aug 12, 2024
409c426
Add missing Javadoc
patchwork01 Aug 12, 2024
1ad62d8
Merge branch 'develop' into 3060-measure-deployed-state-store-committ…
patchwork01 Aug 13, 2024
08b4b14
Merge branch 'develop' into 3060-measure-deployed-state-store-committ…
patchwork01 Aug 13, 2024
a2a0706
Rename methods to apply commit requests
patchwork01 Aug 13, 2024
71ee3aa
Adjust state store committer logs
patchwork01 Aug 13, 2024
0c9f094
Wait for state store commits in-memory
patchwork01 Aug 13, 2024
5422fdc
Add StateStoreCommitterRunsBuilder
patchwork01 Aug 13, 2024
72b487e
Read state store commit applied event
patchwork01 Aug 13, 2024
6206a31
Test reading unrecognised log
patchwork01 Aug 13, 2024
8bf2804
Read logs in AwsStateStoreCommitterDriver
patchwork01 Aug 13, 2024
adb98ec
Fix reading log with fields in reverse order
patchwork01 Aug 13, 2024
6a73294
Test ignoring extra log fields
patchwork01 Aug 13, 2024
d2b2b18
Add comments to StateStoreCommitterLogEntry
patchwork01 Aug 14, 2024
a263ba2
Use named log groups in StateStoreCommitterLogEntry
patchwork01 Aug 14, 2024
4b8eb46
Use constants for capture groups
patchwork01 Aug 14, 2024
35f7b52
Replace "capture group" with "capturing group"
patchwork01 Aug 14, 2024
44fe0a6
Load from S3 in AwsStateStoreCommitterDriverIT
patchwork01 Aug 14, 2024
eda8212
Test building unfinished runs
patchwork01 Aug 14, 2024
769bdd4
Fix wait for commit
patchwork01 Aug 14, 2024
24a4e67
Fix SystemTestStateStoreFakeCommitsTest
patchwork01 Aug 14, 2024
1a51d62
Unit test decrementWaitForNumCommits
patchwork01 Aug 14, 2024
1dbe546
Handle unknown run times
patchwork01 Aug 14, 2024
197f831
Add slack & logging in query for committer runs
patchwork01 Aug 14, 2024
f35b7cb
Grant reading state store committer logs
patchwork01 Aug 14, 2024
8c1cdc0
Apply query time slack to end time
patchwork01 Aug 14, 2024
59cfead
Reduce slack for quering committer runs
patchwork01 Aug 14, 2024
b3bdc33
Introduce StateStoreCommitterLogEntry
patchwork01 Aug 14, 2024
ded3048
Remove StateStoreCommitterRun from in-memory implementation
patchwork01 Aug 14, 2024
5ee106d
Rename ReadStateStoreCommitterLogs
patchwork01 Aug 14, 2024
1bdb0bf
Remove StateStoreCommitterRun
patchwork01 Aug 14, 2024
8f61246
Use StateStoreCommitterLogEntry.getLastTime
patchwork01 Aug 14, 2024
d9f53d0
Test logs processing in WaitForStateStoreCommits
patchwork01 Aug 14, 2024
0a69a1e
Refactor WaitForStateStoreCommits
patchwork01 Aug 14, 2024
93b900c
Remove unused logger
patchwork01 Aug 14, 2024
9e2232a
Remove unused code
patchwork01 Aug 14, 2024
ad89a13
Remove extra helper from ReadStateStoreCommitterLogsTest
patchwork01 Aug 14, 2024
8c6b58e
Remove unused method
patchwork01 Aug 14, 2024
c4b8b6f
Remove unused logger
patchwork01 Aug 14, 2024
74c49b7
Fix SystemTestStateStoreFakeCommitsTest
patchwork01 Aug 14, 2024
8807267
Set skipRust property in deployTest.sh
patchwork01 Aug 14, 2024
05d6366
Grant StartQuery/GetQueryResults for log stream
patchwork01 Aug 14, 2024
c508571
Handle new line at end of log message
patchwork01 Aug 14, 2024
c19975b
Adjust assertions in StateStoreCommitterST
patchwork01 Aug 14, 2024
b2795a8
Rename WaitForStateStoreCommitLogs
patchwork01 Aug 14, 2024
ca9b8f1
Ignore file update time in assertion
patchwork01 Aug 14, 2024
3064b36
Adjust assertion on files
patchwork01 Aug 14, 2024
48958ad
Use long for record count in assertion
patchwork01 Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import software.amazon.awscdk.services.iam.IGrantable;
import software.amazon.awscdk.services.lambda.IFunction;
import software.amazon.awscdk.services.lambda.eventsources.SqsEventSource;
import software.amazon.awscdk.services.logs.LogGroup;
import software.amazon.awscdk.services.s3.Bucket;
import software.amazon.awscdk.services.s3.IBucket;
import software.amazon.awscdk.services.sns.Topic;
Expand All @@ -41,6 +42,7 @@
import static sleeper.cdk.Utils.createLambdaLogGroup;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_DLQ_URL;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_LOG_GROUP;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_ARN;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.STATESTORE_COMMITTER_QUEUE_URL;
import static sleeper.configuration.properties.instance.CommonProperty.STATESTORE_COMMITTER_BATCH_SIZE;
Expand Down Expand Up @@ -116,6 +118,8 @@ private void lambdaToCommitStateStoreUpdates(

String functionName = String.join("-", "sleeper",
Utils.cleanInstanceId(instanceProperties), "statestore-committer");
LogGroup logGroup = createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties);
instanceProperties.set(STATESTORE_COMMITTER_LOG_GROUP, logGroup.getLogGroupName());

IFunction handlerFunction = committerJar.buildFunction(this, "StateStoreCommitter", builder -> builder
.functionName(functionName)
Expand All @@ -125,7 +129,7 @@ private void lambdaToCommitStateStoreUpdates(
.timeout(Duration.seconds(instanceProperties.getInt(STATESTORE_COMMITTER_LAMBDA_TIMEOUT_IN_SECONDS)))
.handler("sleeper.statestore.committer.lambda.StateStoreCommitterLambda::handleRequest")
.environment(environmentVariables)
.logGroup(createLambdaLogGroup(this, "StateStoreCommitterLogGroup", functionName, instanceProperties)));
.logGroup(logGroup));

handlerFunction.addEventSource(SqsEventSource.Builder.create(commitQueue)
.batchSize(instanceProperties.getInt(STATESTORE_COMMITTER_BATCH_SIZE))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,10 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.description("The ARN of the dead letter queue for statestore commit requests.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();
CdkDefinedInstanceProperty STATESTORE_COMMITTER_LOG_GROUP = Index.propertyBuilder("sleeper.statestore.committer.log.group")
.description("The nane of the log group for the state store committer.")
.propertyGroup(InstancePropertyGroup.COMMON)
.build();

// Table metrics
CdkDefinedInstanceProperty TABLE_METRICS_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.table.metrics.lambda.function")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@

import sleeper.compaction.job.commit.CompactionJobCommitRequest;
import sleeper.compaction.job.commit.CompactionJobIdAssignmentCommitRequest;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.commit.SplitPartitionCommitRequest;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.ingest.job.commit.IngestAddFilesCommitRequest;

import java.util.Objects;
Expand All @@ -29,6 +29,8 @@
public class StateStoreCommitRequest {

private final Object request;
private final String tableId;
private final ApplyRequest applyRequest;

/**
* Creates a request to commit the results of a compaction job.
Expand All @@ -37,7 +39,7 @@ public class StateStoreCommitRequest {
* @return a state store commit request
*/
public static StateStoreCommitRequest forCompactionJob(CompactionJobCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getJob().getTableId(), committer -> committer.commitCompaction(request));
}

/**
Expand All @@ -47,7 +49,7 @@ public static StateStoreCommitRequest forCompactionJob(CompactionJobCommitReques
* @return a state store commit request
*/
public static StateStoreCommitRequest forCompactionJobIdAssignment(CompactionJobIdAssignmentCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.assignCompactionInputFiles(request));
}

/**
Expand All @@ -57,7 +59,7 @@ public static StateStoreCommitRequest forCompactionJobIdAssignment(CompactionJob
* @return a state store commit request
*/
public static StateStoreCommitRequest forIngestAddFiles(IngestAddFilesCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.addFiles(request));
}

/**
Expand All @@ -67,27 +69,27 @@ public static StateStoreCommitRequest forIngestAddFiles(IngestAddFilesCommitRequ
* @return a state store commit request
*/
public static StateStoreCommitRequest forSplitPartition(SplitPartitionCommitRequest request) {
return new StateStoreCommitRequest(request);
return new StateStoreCommitRequest(request, request.getTableId(), committer -> committer.splitPartition(request));
}

/**
* Creates a request which is stored in S3.
*
* @param request the commit request
* @return a state store commit request
*/
public static StateStoreCommitRequest storedInS3(StateStoreCommitRequestInS3 request) {
return new StateStoreCommitRequest(request);
}

private StateStoreCommitRequest(Object request) {
private StateStoreCommitRequest(Object request, String tableId, ApplyRequest applyRequest) {
this.request = request;
this.tableId = tableId;
this.applyRequest = applyRequest;
}

public Object getRequest() {
return request;
}

public String getTableId() {
return tableId;
}

void apply(StateStoreCommitter committer) throws StateStoreException {
applyRequest.apply(committer);
}

@Override
public int hashCode() {
return Objects.hash(request);
Expand All @@ -110,4 +112,11 @@ public String toString() {
return "StateStoreCommitRequest{request=" + request + "}";
}

/**
* Applies the current request with a given committer.
*/
@FunctionalInterface
private interface ApplyRequest {
void apply(StateStoreCommitter committer) throws StateStoreException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.commit.StateStoreCommitter.LoadS3ObjectFromDataBucket;
import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobJsonSerDe;
import sleeper.compaction.job.commit.CompactionJobCommitRequest;
Expand All @@ -46,11 +47,20 @@ public class StateStoreCommitRequestDeserialiser {
public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreCommitRequestDeserialiser.class);

private final Gson gson;
private final Gson gsonFromDataBucket;
private final LoadS3ObjectFromDataBucket loadFromDataBucket;

public StateStoreCommitRequestDeserialiser(
TablePropertiesProvider tablePropertiesProvider, LoadS3ObjectFromDataBucket loadFromDataBucket) {
gson = gson(tablePropertiesProvider, this::fromDataBucket);
gsonFromDataBucket = gson(tablePropertiesProvider, DeserialiseFromDataBucket.refuseWhileReadingFromBucket());
this.loadFromDataBucket = loadFromDataBucket;
}

public StateStoreCommitRequestDeserialiser(TablePropertiesProvider tablePropertiesProvider) {
gson = GsonConfig.standardBuilder()
private static Gson gson(TablePropertiesProvider tablePropertiesProvider, DeserialiseFromDataBucket readFromDataBucket) {
return GsonConfig.standardBuilder()
.registerTypeAdapter(CompactionJob.class, new CompactionJobJsonSerDe())
.registerTypeAdapter(StateStoreCommitRequest.class, new WrapperDeserialiser())
.registerTypeAdapter(StateStoreCommitRequest.class, new WrapperDeserialiser(readFromDataBucket))
.registerTypeAdapter(SplitPartitionCommitRequest.class, new SplitPartitionDeserialiser(tablePropertiesProvider))
.serializeNulls()
.create();
Expand All @@ -66,11 +76,22 @@ public StateStoreCommitRequest fromJson(String jsonString) {
return gson.fromJson(jsonString, StateStoreCommitRequest.class);
}

private StateStoreCommitRequest fromDataBucket(StateStoreCommitRequestInS3 request) {
String json = loadFromDataBucket.loadFromDataBucket(request.getKeyInS3());
return gsonFromDataBucket.fromJson(json, StateStoreCommitRequest.class);
}

/**
* Deserialises the commit request by reading the type.
*/
private static class WrapperDeserialiser implements JsonDeserializer<StateStoreCommitRequest> {

private final DeserialiseFromDataBucket fromDataBucket;

private WrapperDeserialiser(DeserialiseFromDataBucket fromDataBucket) {
this.fromDataBucket = fromDataBucket;
}

@Override
public StateStoreCommitRequest deserialize(JsonElement json, Type wrapperType, JsonDeserializationContext context) throws JsonParseException {

Expand All @@ -88,15 +109,15 @@ public StateStoreCommitRequest deserialize(JsonElement json, Type wrapperType, J
case INGEST_ADD_FILES:
return StateStoreCommitRequest.forIngestAddFiles(
context.deserialize(requestObj, IngestAddFilesCommitRequest.class));
case STORED_IN_S3:
return StateStoreCommitRequest.storedInS3(
context.deserialize(requestObj, StateStoreCommitRequestInS3.class));
case COMPACTION_JOB_ID_ASSIGNMENT:
return StateStoreCommitRequest.forCompactionJobIdAssignment(
context.deserialize(requestObj, CompactionJobIdAssignmentCommitRequest.class));
case SPLIT_PARTITION:
return StateStoreCommitRequest.forSplitPartition(
context.deserialize(requestObj, SplitPartitionCommitRequest.class));
case STORED_IN_S3:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for the re-ordering of the case statements?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I felt like the stored in S3 type should be separate from the others because it's handled differently. I'm not sure there's a good way to make this clear though.

return fromDataBucket.read(
context.deserialize(requestObj, StateStoreCommitRequestInS3.class));
default:
throw new CommitRequestValidationException("Unrecognised request type");
}
Expand Down Expand Up @@ -138,4 +159,19 @@ public SplitPartitionCommitRequest deserialize(JsonElement jsonElement, Type typ
return new SplitPartitionCommitRequest(tableId, parentPartition, leftChildPartition, rightChildPartition);
}
}

/**
* Reads and deserialises a commit request from the data bucket. An alternative implementation will refuse reading
* from the bucket because the pointer was already stored in S3.
*/
@FunctionalInterface
private interface DeserialiseFromDataBucket {
StateStoreCommitRequest read(StateStoreCommitRequestInS3 request);

static DeserialiseFromDataBucket refuseWhileReadingFromBucket() {
return request -> {
throw new IllegalArgumentException("Found a request stored in S3 pointing to another S3 object: " + request.getKeyInS3());
};
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreException;
import sleeper.core.statestore.commit.SplitPartitionCommitRequest;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.core.statestore.exception.FileAlreadyExistsException;
import sleeper.core.statestore.exception.FileNotFoundException;
import sleeper.core.statestore.exception.FileReferenceNotAssignedToJobException;
Expand Down Expand Up @@ -59,7 +58,6 @@ public class StateStoreCommitter {
private final CompactionJobStatusStore compactionJobStatusStore;
private final IngestJobStatusStore ingestJobStatusStore;
private final GetStateStoreByTableId stateStoreProvider;
private final LoadS3ObjectFromDataBucket loadFromDataBucket;
private final Supplier<Instant> timeSupplier;

public StateStoreCommitter(
Expand All @@ -69,21 +67,23 @@ public StateStoreCommitter(
GetStateStoreByTableId stateStoreProvider,
LoadS3ObjectFromDataBucket loadFromDataBucket,
Supplier<Instant> timeSupplier) {
this.deserialiser = new StateStoreCommitRequestDeserialiser(tablePropertiesProvider);
this.deserialiser = new StateStoreCommitRequestDeserialiser(tablePropertiesProvider, loadFromDataBucket);
this.compactionJobStatusStore = compactionJobStatusStore;
this.ingestJobStatusStore = ingestJobStatusStore;
this.stateStoreProvider = stateStoreProvider;
this.loadFromDataBucket = loadFromDataBucket;
this.timeSupplier = timeSupplier;
}

/**
* Applies a state store commit request.
*
* @param json the commit request JSON string
* @param json the commit request JSON string
* @return the commit request
*/
public void applyFromJson(String json) throws StateStoreException {
apply(deserialiser.fromJson(json));
public StateStoreCommitRequest applyFromJson(String json) throws StateStoreException {
StateStoreCommitRequest request = deserialiser.fromJson(json);
apply(request);
return request;
}

/**
Expand All @@ -92,30 +92,19 @@ public void applyFromJson(String json) throws StateStoreException {
* @param request the commit request
*/
public void apply(StateStoreCommitRequest request) throws StateStoreException {
Object requestObj = request.getRequest();
if (requestObj instanceof CompactionJobCommitRequest) {
apply((CompactionJobCommitRequest) requestObj);
} else if (requestObj instanceof IngestAddFilesCommitRequest) {
apply((IngestAddFilesCommitRequest) requestObj);
} else if (requestObj instanceof StateStoreCommitRequestInS3) {
apply((StateStoreCommitRequestInS3) requestObj);
} else if (requestObj instanceof CompactionJobIdAssignmentCommitRequest) {
apply((CompactionJobIdAssignmentCommitRequest) requestObj);
} else if (requestObj instanceof SplitPartitionCommitRequest) {
apply((SplitPartitionCommitRequest) requestObj);
} else {
throw new IllegalArgumentException("Unsupported commit request type: " + requestObj.getClass().getName());
}
request.apply(this);
LOGGER.info("Applied request to table ID {} with type {} at time {}",
request.getTableId(), request.getRequest().getClass().getSimpleName(), Instant.now());
}

private void apply(CompactionJobCommitRequest request) throws StateStoreException {
void commitCompaction(CompactionJobCommitRequest request) throws StateStoreException {
CompactionJob job = request.getJob();
try {
CompactionJobCommitter.updateStateStoreSuccess(job, request.getRecordsWritten(),
stateStoreProvider.getByTableId(job.getTableId()));
compactionJobStatusStore.jobCommitted(compactionJobCommitted(job, timeSupplier.get())
.taskId(request.getTaskId()).jobRunId(request.getJobRunId()).build());
LOGGER.info("Successfully committed compaction job {} to table with ID {}", job.getId(), job.getTableId());
LOGGER.debug("Successfully committed compaction job {}", job.getId());
} catch (ReplaceRequestsFailedException e) {
Exception failure = e.getFailures().get(0);
if (failure instanceof FileNotFoundException
Expand All @@ -133,32 +122,28 @@ private void apply(CompactionJobCommitRequest request) throws StateStoreExceptio
}
}

private void apply(IngestAddFilesCommitRequest request) throws StateStoreException {
void addFiles(IngestAddFilesCommitRequest request) throws StateStoreException {
StateStore stateStore = stateStoreProvider.getByTableId(request.getTableId());
List<AllReferencesToAFile> files = AllReferencesToAFile.newFilesWithReferences(request.getFileReferences());
stateStore.addFilesWithReferences(files);
IngestJob job = request.getJob();
if (job != null) {
ingestJobStatusStore.jobAddedFiles(IngestJobAddedFilesEvent.ingestJobAddedFiles(job, files, request.getWrittenTime())
.taskId(request.getTaskId()).jobRunId(request.getJobRunId()).build());
LOGGER.info("Successfully committed new files for ingest job {} to table with ID {}", job.getId(), request.getTableId());
LOGGER.debug("Successfully committed new files for ingest job {}", job.getId());
} else {
LOGGER.info("Successfully committed new files for ingest to table with ID {}", request.getTableId());
LOGGER.debug("Successfully committed new files for ingest with no job");
}
}

private void apply(SplitPartitionCommitRequest request) throws StateStoreException {
void splitPartition(SplitPartitionCommitRequest request) throws StateStoreException {
StateStore stateStore = stateStoreProvider.getByTableId(request.getTableId());
stateStore.atomicallyUpdatePartitionAndCreateNewOnes(request.getParentPartition(), request.getLeftChild(), request.getRightChild());
}

private void apply(StateStoreCommitRequestInS3 request) throws StateStoreException {
String json = loadFromDataBucket.loadFromDataBucket(request.getKeyInS3());
StateStoreCommitRequest requestFromS3 = deserialiser.fromJson(json);
if (requestFromS3.getRequest() instanceof StateStoreCommitRequestInS3) {
throw new IllegalArgumentException("Found a request stored in S3 pointing to another S3 object: " + request.getKeyInS3());
}
apply(requestFromS3);
void assignCompactionInputFiles(CompactionJobIdAssignmentCommitRequest request) throws StateStoreException {
StateStore stateStore = stateStoreProvider.getByTableId(request.getTableId());
stateStore.assignJobIds(request.getAssignJobIdRequests());
}

/**
Expand All @@ -173,9 +158,4 @@ public interface LoadS3ObjectFromDataBucket {
*/
String loadFromDataBucket(String key);
}

private void apply(CompactionJobIdAssignmentCommitRequest request) throws StateStoreException {
StateStore stateStore = stateStoreProvider.getByTableId(request.getTableId());
stateStore.assignJobIds(request.getAssignJobIdRequests());
}
}
Loading
Loading