Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3799 - System test creating many compaction jobs at once #3826

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
cf13825
Add SplitPointsTestHelperTest
patchwork01 Nov 28, 2024
5dd4fbc
Update method for NthRecordGenerator Interface
rtjd6554 Nov 28, 2024
8535d53
Refactor method signature to create partitions
patchwork01 Nov 28, 2024
ae821fa
Merge branch '3824-partitions-printer-leaves' into 3799-system-test-c…
patchwork01 Nov 28, 2024
2a16cab
Update assertion to show leaf partitions
patchwork01 Nov 28, 2024
4200bc0
Extract SplitPointsTestHelper
patchwork01 Nov 28, 2024
1048cfa
New functionality for CreateSplitPoints within SystemTestDsl
rtjd6554 Nov 28, 2024
aa572a2
Rename PartitionTreeTestHelper
patchwork01 Nov 28, 2024
a60d7e2
Rename SystemTestPartitionsTestHelper
patchwork01 Nov 28, 2024
c960c92
Write CreateManyCompactionsTest
patchwork01 Nov 28, 2024
2aecfcf
Use BasicCompactionStrategy in CreateManyCompactionsTest
patchwork01 Nov 28, 2024
d56de19
Typo correction
rtjd6554 Nov 28, 2024
e1513ba
New class within SystemTest for largeQuantity compaction
rtjd6554 Nov 28, 2024
c4c5730
Merge branch 'develop' into 3799-system-test-creating-many-compaction…
patchwork01 Nov 28, 2024
c820a12
Remove extra constructors for GenerateNumberedRecords
patchwork01 Nov 28, 2024
6135f53
Add GenerateNumberedRecords.iterableOf
patchwork01 Nov 28, 2024
8ab037b
Rename GenerateNumberedRecords.iterableFrom
patchwork01 Nov 28, 2024
a3daa82
Adjust method names in GenerateNumberedRecords
patchwork01 Nov 28, 2024
e164f2c
Add GenerateNumberedRecords.iteratorFrom
patchwork01 Nov 28, 2024
b780291
Merge branch 'develop' into 3799-system-test-creating-many-compaction…
patchwork01 Nov 29, 2024
b52538d
Fix imports
patchwork01 Nov 29, 2024
d24d18d
Move SystemTestSchema into DSL
patchwork01 Nov 29, 2024
b05fdf8
Use SystemTestSchema in in-memory tests
patchwork01 Nov 29, 2024
f94fed6
Rename in memory test instance constant
patchwork01 Nov 29, 2024
ed7f07f
Increase wait time creating jobs
patchwork01 Nov 29, 2024
9ce6fb8
Create static field for LocalStack system test context
patchwork01 Nov 29, 2024
17f0c66
Increase wait time creating jobs
patchwork01 Nov 29, 2024
e39f00f
Remove noisy logging in PartitionsFromSplitPoints
patchwork01 Nov 29, 2024
dca8741
Avoid toString call when debug logging is turned off
patchwork01 Nov 29, 2024
758356a
Poll state store instead of status store in CreateManyCompactionsST
patchwork01 Nov 29, 2024
624a83a
Merge branch '3840-handle-failures-in-compaction-dispatch' into 3799-…
patchwork01 Dec 2, 2024
6ae7618
Merge branch 'develop' into 3799-system-test-creating-many-compaction…
patchwork01 Dec 3, 2024
f8ba977
Compact with DataFusion in CreateManyCompactionsST
patchwork01 Dec 3, 2024
b89072a
Fix IOException in RustBridge
patchwork01 Dec 3, 2024
6c339a3
Don't run jobs in CreateManyCompactionsST
patchwork01 Dec 3, 2024
32dd7cf
Drain compaction jobs queue in CreateManyCompactionsST
patchwork01 Dec 3, 2024
8617e81
Use base driver to drain compaction queue
patchwork01 Dec 3, 2024
8a3e00a
Assert full compaction in CreateManyCompactionsST
patchwork01 Dec 3, 2024
1440f38
Remove unused field
patchwork01 Dec 3, 2024
3a3f578
Delete messages when draining compaction queue
patchwork01 Dec 3, 2024
d514d96
Set wait time when receiving compaction jobs
patchwork01 Dec 3, 2024
9243071
Fix draining compaction jobs from queue
patchwork01 Dec 3, 2024
663e49f
Remove extra test
patchwork01 Dec 3, 2024
485637f
Remove unnecessary queue creation, run drain compactions in a separat…
patchwork01 Dec 3, 2024
34809b4
Adjust LocalStackTestInstance to match InMemoryTestInstance
patchwork01 Dec 3, 2024
33965bf
Share LocalStack container between NightlyTestOutputS3IT and other te…
patchwork01 Dec 4, 2024
46e065f
Log queue sent/drained in AwsCompactionDriverDrainCompactionsIT
patchwork01 Dec 4, 2024
2ba1a90
Set short test ID for LocalStack
patchwork01 Dec 4, 2024
3c7b66b
Use v2 SQS client to drain compaction jobs queue
patchwork01 Dec 4, 2024
64fceff
Remove unused dependency
patchwork01 Dec 4, 2024
4076173
Merge branch 'develop' into 3799-system-test-creating-many-compaction…
patchwork01 Dec 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
"org.mockito.Mockito.*",
"org.mockito.ArgumentMatchers.*",
"org.mockito.Answers.*",
"sleeper.core.partition.PartitionTreeTestHelper.*",
"sleeper.core.schema.SchemaTestHelper.*",
"sleeper.core.statestore.inmemory.StateStoreTestHelper.*",
"sleeper.configuration.properties.InstancePropertiesTestHelper.*",
"sleeper.configuration.properties.table.TablePropertiesTestHelper.*",
"sleeper.core.util.ExponentialBackoffWithJitterTestHelper.*",
"sleeper.core.testutils.SupplierTestHelper.*"
"sleeper.core.testutils.SupplierTestHelper.*",
"sleeper.systemtest.dsl.testutil.SystemTestPartitionsTestHelper.*"
],
"comment.java.configuration.runtimes": [
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static synchronized Compaction getRustCompactor() throws IOException {
return nativeLib;

} catch (UnsatisfiedLinkError err) {
throw (IOException) new IOException().initCause(err);
throw (IOException) new IOException("Could not initialise Rust bridge", err);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public List<Partition> construct() {
List<Partition> partitionsInLayer = List.of(tree.getRootPartition());
while (!partitionsInLayer.isEmpty()) {
LOGGER.debug("Layer {}", layer++);
partitionsInLayer.forEach(partition -> LOGGER.debug(partition.toString()));
partitionsInLayer.forEach(partition -> LOGGER.debug("{}", partition));
partitionsInLayer = partitionsInLayer.stream()
.map(partition -> partition.getChildPartitionIds())
.flatMap(List::stream)
Expand Down Expand Up @@ -136,7 +136,6 @@ private List<Partition.Builder> addLayer(List<Partition.Builder> partitionsInLay
leftPartition.parentPartitionId(id);
rightPartition.parentPartitionId(id);
parents.add(parent);
LOGGER.debug("Created parent partition {} joining partitions {}", id, childPartitionIds);
}
}
allPartitions.addAll(parents);
Expand Down Expand Up @@ -165,7 +164,6 @@ private List<Partition.Builder> createLeafPartitions() {
.childPartitionIds(new ArrayList<>())
.dimension(-1);
leafPartitions.add(partition);
LOGGER.debug("Created leaf partition {} for region {}", id, region);
}
LOGGER.info("Created {} leaf partitions from {} split points", leafPartitions.size(), splitPoints.size());
return leafPartitions;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.partition;

import sleeper.core.record.Record;
import sleeper.core.schema.Field;
import sleeper.core.schema.Schema;

import java.util.ArrayList;
import java.util.List;

/**
* A test helper for creating partitions based on split points.
*/
public class PartitionTreeTestHelper {

private PartitionTreeTestHelper() {
}

/**
* Creates a partition tree by finding records at the boundaries. This can be used when we are generating predefined
* test data, and we can generate the nth record in sort order at will. This will create a leaf partition for each
* group of records where we specify the number of records in a group. The boundaries of each partition will be the
* value of the split field for the record at the boundary. This will take the first row key as the field to split
* on. This may be used to avoid needing to hold all the test data in memory at once.
*
* @param recordsPerPartition the number of records to fit in each partition
* @param totalRecords the total number of records that will be in the table
* @param generator a method to find the nth record in sort order
* @param schema the schema
* @return the partition tree
*/
public static PartitionTree createPartitionTreeWithRecordsPerPartitionAndTotal(int recordsPerPartition, long totalRecords, NthRecordGenerator generator, Schema schema) {
List<Object> splitPoints = new ArrayList<>();
Field splitField = schema.getRowKeyFields().get(0);
for (long i = recordsPerPartition; i < totalRecords; i += recordsPerPartition) {
splitPoints.add(generator.getNthRecord(i).get(splitField.getName()));
}
return PartitionsFromSplitPoints.treeFrom(schema, splitPoints);
}

/**
* A generator for the nth record in sort order to generate test data on the fly. Used when the test data is defined
* deterministically to avoid needing to hold it all in memory at once.
*/
@FunctionalInterface
public interface NthRecordGenerator {

/**
* Gets the nth record in the test data in sort order.
*
* @param n the index of the record, starting from 0
* @return the record
*/
Record getNthRecord(long n);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.core.partition;

import org.junit.jupiter.api.Test;

import sleeper.core.record.Record;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.IntType;
import sleeper.core.testutils.printers.PartitionsPrinter;

import java.util.List;
import java.util.Map;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.core.partition.PartitionTreeTestHelper.createPartitionTreeWithRecordsPerPartitionAndTotal;
import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;

public class PartitionTreeTestHelperTest {

@Test
void shouldCreateSplitPointsFromRecordRangeAndRecordsPerPartition() {
// Given
Schema schema = schemaWithKey("key", new IntType());
List<Record> records = List.of(
new Record(Map.of("key", 10)),
new Record(Map.of("key", 20)),
new Record(Map.of("key", 30)));

// When
PartitionTree tree = createPartitionTreeWithRecordsPerPartition(1, records, schema);

// Then
assertThat(PartitionsPrinter.printPartitions(schema, tree))
.isEqualTo("""
Leaf partition at LL:
{"key":{"min":-2147483648,"minInclusive":true,"max":20,"maxInclusive":false},"stringsBase64Encoded":true}
Leaf partition at LR:
{"key":{"min":20,"minInclusive":true,"max":30,"maxInclusive":false},"stringsBase64Encoded":true}
Leaf partition at R:
{"key":{"min":30,"minInclusive":true,"max":null,"maxInclusive":false},"stringsBase64Encoded":true}
Partition at L:
{"key":{"min":-2147483648,"minInclusive":true,"max":30,"maxInclusive":false},"stringsBase64Encoded":true}
Partition at root:
{"key":{"min":-2147483648,"minInclusive":true,"max":null,"maxInclusive":false},"stringsBase64Encoded":true}
""");
}

private PartitionTree createPartitionTreeWithRecordsPerPartition(int recordsPerPartition, List<Record> records, Schema schema) {
return createPartitionTreeWithRecordsPerPartitionAndTotal(recordsPerPartition, records.size(),
index -> records.get((int) index),
schema);
}
}
5 changes: 0 additions & 5 deletions java/system-test/system-test-drivers/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,6 @@
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,15 @@
import software.amazon.awssdk.services.ec2.Ec2Client;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import sleeper.clients.deploy.InvokeLambda;
import sleeper.compaction.core.job.CompactionJob;
import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.compaction.core.job.CompactionJobStatusStore;
import sleeper.compaction.core.job.creation.CreateCompactionJobs;
import sleeper.compaction.core.task.CompactionTaskStatus;
Expand All @@ -45,8 +52,12 @@
import sleeper.task.common.RunCompactionTasks;

import java.io.IOException;
import java.util.List;
import java.util.stream.Stream;

import static java.util.function.Predicate.not;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;

public class AwsCompactionDriver implements CompactionDriver {
Expand All @@ -57,16 +68,19 @@ public class AwsCompactionDriver implements CompactionDriver {
private final AmazonDynamoDB dynamoDBClient;
private final AmazonS3 s3Client;
private final AmazonSQS sqsClient;
private final SqsClient sqsClientV2;
private final EcsClient ecsClient;
private final AutoScalingClient asClient;
private final Ec2Client ec2Client;
private final CompactionJobSerDe serDe = new CompactionJobSerDe();

public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients clients) {
this.instance = instance;
this.lambdaClient = clients.getLambda();
this.dynamoDBClient = clients.getDynamoDB();
this.s3Client = clients.getS3();
this.sqsClient = clients.getSqs();
this.sqsClientV2 = clients.getSqsV2();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
this.ec2Client = clients.getEc2();
Expand Down Expand Up @@ -138,4 +152,41 @@ public void forceStartTasks(int numberOfTasks, PollWithRetries poll) {
public void scaleToZero() {
EC2Scaler.create(instance.getInstanceProperties(), asClient, ec2Client).scaleTo(0);
}

@Override
public List<CompactionJob> drainJobsQueueForWholeInstance() {
String queueUrl = instance.getInstanceProperties().get(COMPACTION_JOB_QUEUE_URL);
LOGGER.info("Draining compaction jobs queue: {}", queueUrl);
List<CompactionJob> jobs = Stream.iterate(
receiveJobs(queueUrl), not(List::isEmpty), lastJobs -> receiveJobs(queueUrl))
.flatMap(List::stream).toList();
LOGGER.info("Found {} compaction jobs", jobs.size());
return jobs;
}

private List<CompactionJob> receiveJobs(String queueUrl) {
ReceiveMessageResponse receiveResult = sqsClientV2.receiveMessage(request -> request
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(5));
List<Message> messages = receiveResult.messages();
if (messages.isEmpty()) {
return List.of();
}
DeleteMessageBatchResponse deleteResult = sqsClientV2.deleteMessageBatch(request -> request
.queueUrl(queueUrl)
.entries(messages.stream()
.map(message -> DeleteMessageBatchRequestEntry.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.build())
.toList()));
if (!deleteResult.failed().isEmpty()) {
throw new RuntimeException("Failed deleting compaction job messages: " + deleteResult.failed());
}
return messages.stream()
.map(Message::body)
.map(serDe::fromJson)
.toList();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.drivers.compaction;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;

import sleeper.compaction.core.job.CompactionJob;
import sleeper.compaction.core.job.CompactionJobFactory;
import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.core.statestore.FileReferenceFactory;
import sleeper.core.util.SplitIntoBatches;
import sleeper.systemtest.drivers.testutil.LocalStackDslTest;
import sleeper.systemtest.drivers.testutil.LocalStackSystemTestDrivers;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.SystemTestContext;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;

import java.util.List;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.systemtest.drivers.testutil.LocalStackTestInstance.DRAIN_COMPACTIONS;

@LocalStackDslTest
public class AwsCompactionDriverDrainCompactionsIT {
public static final Logger LOGGER = LoggerFactory.getLogger(AwsCompactionDriverDrainCompactionsIT.class);

SqsClient sqs;
CompactionDriver driver;
SystemTestInstanceContext instance;

@BeforeEach
void setUp(SleeperSystemTest sleeper, SystemTestContext context, LocalStackSystemTestDrivers drivers) {
sleeper.connectToInstance(DRAIN_COMPACTIONS);
sqs = drivers.clients().getSqsV2();
driver = drivers.compaction(context);
instance = context.instance();
}

@Test
void shouldDrainCompactionJobsFromQueue() {
// Given
FileReferenceFactory fileFactory = FileReferenceFactory.from(instance.getStateStore());
CompactionJobFactory jobFactory = new CompactionJobFactory(instance.getInstanceProperties(), instance.getTableProperties());
List<CompactionJob> jobs = IntStream.rangeClosed(1, 20)
.mapToObj(i -> jobFactory.createCompactionJob(
List.of(fileFactory.rootFile("file" + i + ".parquet", i)), "root"))
.toList();
send(jobs);

// When / Then
assertThat(driver.drainJobsQueueForWholeInstance())
.containsExactlyElementsOf(jobs);
}

private void send(List<CompactionJob> jobs) {
String queueUrl = instance.getInstanceProperties().get(COMPACTION_JOB_QUEUE_URL);
LOGGER.info("Sending to compaction jobs queue: {}", queueUrl);
CompactionJobSerDe serDe = new CompactionJobSerDe();
for (List<CompactionJob> batch : SplitIntoBatches.splitListIntoBatchesOf(10, jobs)) {
sqs.sendMessageBatch(request -> request
.queueUrl(queueUrl)
.entries(batch.stream()
.map(job -> SendMessageBatchRequestEntry.builder()
.messageBody(serDe.toJson(job))
.id(job.getId())
.build())
.toList()));
}
}

}
Loading
Loading