Skip to content

Commit

Permalink
Merge branch 'develop' into 3640-update-changelog
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 authored Nov 12, 2024
2 parents 4fe18c7 + 9fe8c92 commit d48d87a
Show file tree
Hide file tree
Showing 33 changed files with 612 additions and 120 deletions.
4 changes: 2 additions & 2 deletions code-style/dependency-check-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@
We're not using the HttpURI class directly, and the CVE states that Jetty is not vulnerable unless you use that
directly.
]]></notes>
<packageUrl regex="true">^pkg:maven/org\.eclipse\.jetty/jetty-http@.*$</packageUrl>
<packageUrl regex="true">^pkg:maven/org\.eclipse\.jetty(/.*$|\.http2/http2.*$|\.websocket/websocket.*$)</packageUrl>
<vulnerabilityName>CVE-2024-6763</vulnerabilityName>
</suppress>
<suppress>
Expand All @@ -298,7 +298,7 @@
we're using uses Jetty 9.
]]></notes>
<packageUrl regex="true">^pkg:javascript/DOMPurify@.*$</packageUrl>
<vulnerabilityName regex="true">CVE-2024-45801|CVE-2024-47875</vulnerabilityName>
<vulnerabilityName regex="true">CVE-2024-45801|CVE-2024-47875|CVE-2024-48910</vulnerabilityName>
</suppress>
<suppress>
<notes><![CDATA[
Expand Down
12 changes: 11 additions & 1 deletion java/cdk/src/main/java/sleeper/cdk/SleeperCdkApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
*/
package sleeper.cdk;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awscdk.App;
import software.amazon.awscdk.AppProps;
import software.amazon.awscdk.Environment;
Expand Down Expand Up @@ -80,11 +82,14 @@
import static sleeper.core.properties.instance.CommonProperty.ID;
import static sleeper.core.properties.instance.CommonProperty.OPTIONAL_STACKS;
import static sleeper.core.properties.instance.CommonProperty.REGION;
import static sleeper.core.properties.instance.CommonProperty.VPC_ENDPOINT_CHECK;

/**
* Deploys an instance of Sleeper, including any configured optional stacks.
*/
public class SleeperCdkApp extends Stack {
public static final Logger LOGGER = LoggerFactory.getLogger(SleeperCdkApp.class);

private final InstanceProperties instanceProperties;
private final BuiltJars jars;
private final App app;
Expand Down Expand Up @@ -121,7 +126,12 @@ public void create() {
LoggingStack loggingStack = new LoggingStack(this, "Logging", instanceProperties);

// Stack for Checking VPC configuration
new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack);
if (instanceProperties.getBoolean(VPC_ENDPOINT_CHECK)) {
new VpcStack(this, "Vpc", instanceProperties, jars, loggingStack);
} else {
LOGGER.warn("Skipping VPC check as requested by the user. Be aware that VPCs that don't have an S3 endpoint can result "
+ "in very significant NAT charges.");
}

// Topic stack
TopicStack topicStack = new TopicStack(this, "Topic", instanceProperties);
Expand Down
10 changes: 0 additions & 10 deletions java/cdk/src/main/java/sleeper/cdk/stack/VpcStack.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package sleeper.cdk.stack;

import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awscdk.CustomResource;
import software.amazon.awscdk.CustomResourceProps;
import software.amazon.awscdk.NestedStack;
Expand All @@ -41,21 +39,13 @@
import java.util.Map;

import static sleeper.core.properties.instance.CommonProperty.REGION;
import static sleeper.core.properties.instance.CommonProperty.VPC_ENDPOINT_CHECK;
import static sleeper.core.properties.instance.CommonProperty.VPC_ID;

public class VpcStack extends NestedStack {
private static final Logger LOGGER = LoggerFactory.getLogger(VpcStack.class);

public VpcStack(Construct scope, String id, InstanceProperties instanceProperties, BuiltJars jars, LoggingStack logging) {
super(scope, id);

if (!instanceProperties.getBoolean(VPC_ENDPOINT_CHECK)) {
LOGGER.warn("Skipping VPC check as requested by the user. Be aware that VPCs that don't have an S3 endpoint can result "
+ "in very significant NAT charges.");
return;
}

// Jars bucket
IBucket jarsBucket = Bucket.fromBucketName(this, "JarsBucket", jars.bucketName());
LambdaCode lambdaCode = jars.lambdaCode(jarsBucket);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.commit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.core.properties.instance.InstanceProperties;

import java.util.UUID;
import java.util.function.Supplier;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;

/**
* Handles uploading commit requests to S3 if they are too big to fit in an SQS message.
*/
public class StateStoreCommitRequestInS3Uploader {
public static final Logger LOGGER = LoggerFactory.getLogger(StateStoreCommitRequestInS3Uploader.class);
public static final int MAX_JSON_LENGTH = 262144;

private final InstanceProperties instanceProperties;
private final Client client;
private final Supplier<String> filenameSupplier;
private final int maxJsonLength;
private final StateStoreCommitRequestInS3SerDe serDe = new StateStoreCommitRequestInS3SerDe();

public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client) {
this(instanceProperties, client, MAX_JSON_LENGTH, () -> UUID.randomUUID().toString());
}

public StateStoreCommitRequestInS3Uploader(InstanceProperties instanceProperties, Client client, int maxJsonLength, Supplier<String> filenameSupplier) {
this.instanceProperties = instanceProperties;
this.client = client;
this.maxJsonLength = maxJsonLength;
this.filenameSupplier = filenameSupplier;
}

/**
* Checks whether a state store commit request JSON will fit in an SQS message. If not, uploads it to S3 and creates
* a new commit request referencing the original JSON in S3.
*
* @param tableId the Sleeper table ID
* @param commitRequestJson the commit request JSON
* @return the commit request if it fits in an SQS message, or a new commit request referencing S3
*/
public String uploadAndWrapIfTooBig(String tableId, String commitRequestJson) {
// Store in S3 if the request will not fit in an SQS message
if (commitRequestJson.length() > maxJsonLength) {
String s3Key = StateStoreCommitRequestInS3.createFileS3Key(tableId, filenameSupplier.get());
client.putObject(instanceProperties.get(DATA_BUCKET), s3Key, commitRequestJson);
LOGGER.info("Request was too big for an SQS message. Will submit a reference to file in data bucket: {}", s3Key);
return serDe.toJson(new StateStoreCommitRequestInS3(s3Key));
} else {
return commitRequestJson;
}
}

/**
* A client to upload an object to S3.
*/
public interface Client {

/**
* Uploads an object to an S3 bucket.
*
* @param bucketName the bucket name
* @param key the key to upload to
* @param content the content of the file to upload
*/
void putObject(String bucketName, String key, String content);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.statestore.commit;

import org.junit.jupiter.api.Test;

import sleeper.core.properties.instance.InstanceProperties;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;

public class StateStoreCommitRequestInS3UploaderTest {

private final InstanceProperties instanceProperties = new InstanceProperties();
private final Map<String, String> objectsByBucketAndKey = new HashMap<>();

@Test
void shouldUploadFileWhenTooBig() {
// Given
String commitJson = "{\"type\":\"test\",\"tableId\":\"test-table\"}";
instanceProperties.set(DATA_BUCKET, "test-bucket");
StateStoreCommitRequestInS3Uploader uploader = uploaderWithMaxLengthAndFilenames(10, "commit");

// When
String resultJson = uploader.uploadAndWrapIfTooBig("test-table", commitJson);

// Then
assertThat(new StateStoreCommitRequestInS3SerDe().fromJson(resultJson))
.isEqualTo(new StateStoreCommitRequestInS3("test-table/statestore/commitrequests/commit.json"));
assertThat(objectsByBucketAndKey)
.isEqualTo(Map.of("test-bucket/test-table/statestore/commitrequests/commit.json", commitJson));
}

@Test
void shouldNotUploadFileWhenMeetsMax() {
// Given
String commitJson = "{\"type\":\"test\",\"tableId\":\"test-table\"}";
instanceProperties.set(DATA_BUCKET, "test-bucket");
StateStoreCommitRequestInS3Uploader uploader = uploaderWithMaxLengthAndFilenames(commitJson.length(), "commit");

// When
String resultJson = uploader.uploadAndWrapIfTooBig("test-table", commitJson);

// Then
assertThat(resultJson).isEqualTo(commitJson);
assertThat(objectsByBucketAndKey).isEmpty();
}

private StateStoreCommitRequestInS3Uploader uploaderWithMaxLengthAndFilenames(int maxLength, String... filenames) {
return new StateStoreCommitRequestInS3Uploader(instanceProperties, client(), maxLength, List.of(filenames).iterator()::next);
}

private StateStoreCommitRequestInS3Uploader.Client client() {
return (bucket, key, content) -> {
objectsByBucketAndKey.put(bucket + "/" + key, content);
};
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.GarbageCollectionCommitRequest;
import sleeper.core.statestore.commit.GarbageCollectionCommitRequestSerDe;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader;
import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
import sleeper.garbagecollector.FailedGarbageCollectionException.TableFailures;
Expand Down Expand Up @@ -193,11 +194,11 @@ public interface SendAsyncCommit {
void sendCommit(GarbageCollectionCommitRequest commitRequest);
}

public static SendAsyncCommit sendAsyncCommit(InstanceProperties instanceProperties, AmazonSQS sqs) {
public static SendAsyncCommit sendAsyncCommit(InstanceProperties instanceProperties, AmazonSQS sqs, StateStoreCommitRequestInS3Uploader s3Uploader) {
GarbageCollectionCommitRequestSerDe serDe = new GarbageCollectionCommitRequestSerDe();
return request -> sqs.sendMessage(new SendMessageRequest()
.withQueueUrl(instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL))
.withMessageBody(serDe.toJson(request))
.withMessageBody(s3Uploader.uploadAndWrapIfTooBig(request.getTableId(), serDe.toJson(request)))
.withMessageGroupId(request.getTableId())
.withMessageDeduplicationId(UUID.randomUUID().toString()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader;
import sleeper.core.table.TableStatus;
import sleeper.core.util.LoggedDuration;
import sleeper.garbagecollector.FailedGarbageCollectionException.TableFailures;
Expand Down Expand Up @@ -84,7 +85,7 @@ public GarbageCollectorLambda() {
StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3Client, dynamoDBClient, conf);
garbageCollector = new GarbageCollector(deleteFileAndSketches(conf),
instanceProperties, stateStoreProvider,
sendAsyncCommit(instanceProperties, sqsClient));
sendAsyncCommit(instanceProperties, sqsClient, new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.commit.GarbageCollectionCommitRequest;
import sleeper.core.statestore.commit.GarbageCollectionCommitRequestSerDe;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3SerDe;
import sleeper.core.statestore.commit.StateStoreCommitRequestInS3Uploader;
import sleeper.core.statestore.testutils.FixedStateStoreProvider;
import sleeper.parquet.utils.HadoopConfigurationLocalStackUtils;

Expand Down Expand Up @@ -168,6 +171,40 @@ void shouldSendAsyncCommit() throws Exception {
.containsExactly(new GarbageCollectionCommitRequest(tableProperties.get(TABLE_ID), List.of(oldFile.getFilename())));
}

@Test
void shouldUploadCommitToS3IfTooBig() throws Exception {
// Given
TableProperties tableProperties = createTableWithGCDelay(instanceProperties, 10);
tableProperties.set(GARBAGE_COLLECTOR_ASYNC_COMMIT, "true");
Instant currentTime = Instant.parse("2023-06-28T13:46:00Z");
Instant oldEnoughTime = currentTime.minus(Duration.ofMinutes(11));
StateStore stateStore = setupStateStoreAndFixTime(oldEnoughTime);
// Perform a compaction on an existing file to create a readyForGC file
s3Client.putObject(testBucket, "old-file.parquet", "abc");
s3Client.putObject(testBucket, "new-file.parquet", "def");
FileReference oldFile = factory.rootFile("s3a://" + testBucket + "/old-file.parquet", 100L);
FileReference newFile = factory.rootFile("s3a://" + testBucket + "/new-file.parquet", 100L);
stateStore.addFile(oldFile);
stateStore.assignJobIds(List.of(
assignJobOnPartitionToFiles("test-job", "root", List.of(oldFile.getFilename()))));
stateStore.atomicallyReplaceFileReferencesWithNewOnes(List.of(replaceJobFileReferences(
"test-job", "root", List.of(oldFile.getFilename()), newFile)));

// When
createGarbageCollectorWithMaxCommitLength(1, instanceProperties, tableProperties, stateStore)
.runAtTime(currentTime, List.of(tableProperties));

// Then
assertThat(s3Client.doesObjectExist(testBucket, "old-file.parquet")).isFalse();
assertThat(s3Client.doesObjectExist(testBucket, "new-file.parquet")).isTrue();
assertThat(stateStore.getAllFilesWithMaxUnreferenced(10))
.isEqualTo(activeAndReadyForGCFilesReport(oldEnoughTime, List.of(newFile), List.of(oldFile.getFilename())));
assertThat(receiveS3CommitRequests())
.map(request -> s3Client.getObjectAsString(testBucket, request.getKeyInS3()))
.map(new GarbageCollectionCommitRequestSerDe()::fromJson)
.containsExactly(new GarbageCollectionCommitRequest(tableProperties.get(TABLE_ID), List.of(oldFile.getFilename())));
}

private InstanceProperties createInstanceProperties() {
InstanceProperties instanceProperties = createTestInstanceProperties();
instanceProperties.set(STATESTORE_COMMITTER_QUEUE_URL, createFifoQueueGetUrl());
Expand All @@ -183,8 +220,16 @@ private TableProperties createTableWithGCDelay(InstanceProperties instanceProper
}

private GarbageCollector createGarbageCollector(InstanceProperties instanceProperties, TableProperties tableProperties, StateStore stateStore) {
return createGarbageCollectorWithMaxCommitLength(
StateStoreCommitRequestInS3Uploader.MAX_JSON_LENGTH,
instanceProperties, tableProperties, stateStore);
}

private GarbageCollector createGarbageCollectorWithMaxCommitLength(int maxLength, InstanceProperties instanceProperties, TableProperties tableProperties, StateStore stateStore) {
return new GarbageCollector(deleteFileAndSketches(configuration), instanceProperties,
new FixedStateStoreProvider(tableProperties, stateStore), sendAsyncCommit(instanceProperties, sqsClient));
new FixedStateStoreProvider(tableProperties, stateStore),
sendAsyncCommit(instanceProperties, sqsClient,
new StateStoreCommitRequestInS3Uploader(instanceProperties, s3Client::putObject, maxLength, () -> UUID.randomUUID().toString())));
}

private static Schema getSchema() {
Expand All @@ -206,6 +251,12 @@ private List<GarbageCollectionCommitRequest> receiveGarbageCollectionCommitReque
.collect(Collectors.toList());
}

private List<StateStoreCommitRequestInS3> receiveS3CommitRequests() {
return receiveCommitMessages().stream()
.map(message -> new StateStoreCommitRequestInS3SerDe().fromJson(message.getBody()))
.collect(Collectors.toList());
}

private List<Message> receiveCommitMessages() {
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest()
.withQueueUrl(instanceProperties.get(STATESTORE_COMMITTER_QUEUE_URL))
Expand Down
Loading

0 comments on commit d48d87a

Please sign in to comment.