Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1790 - System test for compactions in contention #2009

Merged
merged 21 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b1b330a
Set up ParallelCompactionsTest
patchwork01 Mar 12, 2024
198170a
Merge branch '1918-update-multiple-tables-system-tests' into 1790-com…
patchwork01 Mar 13, 2024
c3fc294
Reformat GenerateNumberedValue
patchwork01 Mar 13, 2024
15395db
Make ParallelCompactionsTest work
patchwork01 Mar 13, 2024
cdd3ee3
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
f8564eb
Merge remote-tracking branch 'origin/2026-split-task-creation-lambda-…
patchwork01 Mar 14, 2024
1274c9d
Force starting compaction tasks
patchwork01 Mar 14, 2024
144e457
Add ParallelCompactionsIT
patchwork01 Mar 14, 2024
b3a1a4a
Remove unused constructor
patchwork01 Mar 14, 2024
a0196c7
Rearrange given comments
patchwork01 Mar 14, 2024
058194b
Merge branch '2026-split-task-creation-lambda-modules' into 1790-comp…
patchwork01 Mar 14, 2024
a494dcc
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
8fc0d8f
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
db76524
Fix record assertions in parallel compactions tests
patchwork01 Mar 14, 2024
878c4d6
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 15, 2024
9c52fe0
Add temporary directory for ParallelCompactionsIT
patchwork01 Mar 15, 2024
2734f24
Merge branch '2062-python-properties-files' into 1790-compaction-cont…
patchwork01 Mar 18, 2024
f62b02c
Set timeout for starting compaction tasks
patchwork01 Mar 18, 2024
5482bad
Increase compaction task max idle time in system tests
patchwork01 Mar 18, 2024
ccfd1e2
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 18, 2024
70f1adc
Allow forcing more than maximum compaction tasks
patchwork01 Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,21 @@ public class RunCompactionTasks {
private final String clusterName;
private final String launchType;
private final int maximumRunningTasks;
private final QueueMessageCount.Client queueMessageCount;
private final TaskCounts taskCounts;
private final Scaler scaler;
private final TaskLauncher launchTasks;

public RunCompactionTasks(AmazonSQS sqsClient,
AmazonECS ecsClient,
AmazonS3 s3Client,
AmazonAutoScaling asClient,
InstanceProperties instanceProperties) {
this(instanceProperties, QueueMessageCount.withSqsClient(sqsClient),
public RunCompactionTasks(
InstanceProperties instanceProperties, AmazonECS ecsClient, AmazonAutoScaling asClient) {
this(instanceProperties,
(clusterName) -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient),
createEC2Scaler(instanceProperties, asClient, ecsClient),
(startTime, numberOfTasksToCreate) -> launchTasks(ecsClient, instanceProperties, startTime, numberOfTasksToCreate));
}

public RunCompactionTasks(InstanceProperties instanceProperties,
QueueMessageCount.Client queueMessageCount,
TaskCounts taskCounts,
Scaler scaler,
TaskLauncher launchTasks) {
public RunCompactionTasks(
InstanceProperties instanceProperties, TaskCounts taskCounts, Scaler scaler, TaskLauncher launchTasks) {
this.instanceProperties = instanceProperties;
this.queueMessageCount = queueMessageCount;
this.taskCounts = taskCounts;
this.scaler = scaler;
this.launchTasks = launchTasks;
Expand All @@ -111,7 +103,7 @@ public interface Scaler {
void scaleTo(String asGroupName, int numberContainers);
}

public void run() {
public void run(QueueMessageCount.Client queueMessageCount) {
long startTime = System.currentTimeMillis();
LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
// Find out number of messages in queue that are not being processed
Expand Down Expand Up @@ -282,7 +274,7 @@ public static void main(String[] args) {
try {
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties)
new RunCompactionTasks(instanceProperties, ecsClient, asClient)
.run(numberOfTasks);
} finally {
sqsClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ void shouldCreateOneTasksWhenQueueHasOneMessages() {

private int runTasks(QueueMessageCount.Client queueMessageClient, TaskCounts taskCounts) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, queueMessageClient, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run();
runTasks.run(queueMessageClient);
return tasksLaunched.get();
}
}
Expand Down Expand Up @@ -182,7 +182,7 @@ void shouldAutoScaleWithExistingRunningOrPendingTasks() {

private int runTasks(int requestedTasks, TaskCounts taskCounts) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, noMessagesOnQueue(), taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run(requestedTasks);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.task.common.QueueMessageCount;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
Expand All @@ -37,6 +38,7 @@
@SuppressWarnings("unused")
public class RunCompactionTasksLambda {
private final RunCompactionTasks runTasks;
private final QueueMessageCount.Client queueMessageCount;

public RunCompactionTasksLambda() {
String s3Bucket = validateParameter(CONFIG_BUCKET.toEnvironmentVariable());
Expand All @@ -46,11 +48,12 @@ public RunCompactionTasksLambda() {
AmazonAutoScaling asClient = AmazonAutoScalingClientBuilder.defaultClient();
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
this.runTasks = new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties);
this.runTasks = new RunCompactionTasks(instanceProperties, ecsClient, asClient);
this.queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
}

public void eventHandler(ScheduledEvent event, Context context) {
runTasks.run();
runTasks.run(queueMessageCount);
}

private static String validateParameter(String parameterName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package sleeper.systemtest.drivers.compaction;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.ecs.AmazonECS;
import com.amazonaws.services.sqs.AmazonSQS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +40,7 @@
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;
Expand All @@ -50,12 +53,16 @@ public class AwsCompactionDriver implements CompactionDriver {
private final LambdaClient lambdaClient;
private final AmazonDynamoDB dynamoDBClient;
private final AmazonSQS sqsClient;
private final AmazonECS ecsClient;
private final AmazonAutoScaling asClient;

public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients clients) {
this.instance = instance;
this.lambdaClient = clients.getLambda();
this.dynamoDBClient = clients.getDynamoDB();
this.sqsClient = clients.getSqs();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
}

@Override
Expand Down Expand Up @@ -99,4 +106,22 @@ public void invokeTasks(int expectedTasks, PollWithRetries poll) {
throw new RuntimeException(e);
}
}

@Override
public void forceStartTasks(int numberOfTasks, PollWithRetries poll) {
CompactionTaskStatusStore store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instance.getInstanceProperties());
long tasksFinishedBefore = store.getAllTasks().stream().filter(CompactionTaskStatus::isFinished).count();
new RunCompactionTasks(instance.getInstanceProperties(), ecsClient, asClient)
.run(numberOfTasks);
try {
poll.pollUntil("tasks are started", () -> {
long tasksStarted = store.getAllTasks().size() - tasksFinishedBefore;
LOGGER.info("Found {} running compaction tasks", tasksStarted);
return tasksStarted >= numberOfTasks;
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package sleeper.systemtest.drivers.util;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.autoscaling.AmazonAutoScalingClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.ecr.AmazonECR;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class SystemTestClients {
private final EmrServerlessClient emrServerless = EmrServerlessClient.create();
private final AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.defaultClient();
private final AmazonECS ecs = AmazonECSClientBuilder.defaultClient();
private final AmazonAutoScaling autoScaling = AmazonAutoScalingClientBuilder.defaultClient();
private final AmazonECR ecr = AmazonECRClientBuilder.defaultClient();
private final CloudWatchClient cloudWatch = CloudWatchClient.create();

Expand Down Expand Up @@ -100,6 +103,10 @@ public AmazonECS getEcs() {
return ecs;
}

public AmazonAutoScaling getAutoScaling() {
return autoScaling;
}

public AmazonECR getEcr() {
return ecr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public interface CompactionDriver {
default void invokeTasks(int expectedTasks) {
invokeTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}

void forceStartTasks(int numberOfTasks, PollWithRetries poll);

default void forceStartTasks(int expectedTasks) {
forceStartTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public SystemTestCompaction invokeTasks(int expectedTasks) {
return this;
}

public SystemTestCompaction forceStartTasks(int expectedTasks) {
driver.forceStartTasks(expectedTasks);
return this;
}

public SystemTestCompaction waitForJobs() {
waitForJobs.waitForJobs(lastJobIds);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ static GenerateNumberedValue forField(KeyType keyType, Field field) {
}

static GenerateNumberedValue numberStringAndZeroPadTo(int size) {
return num -> StringUtils.leftPad(num + "", size, "0");
return num -> numberStringAndZeroPadTo(size, num);
}

static String numberStringAndZeroPadTo(int size, long number) {
return StringUtils.leftPad(number + "", size, "0");
}

static UnaryOperator<Object> addPrefix(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
public class CompactionTest {

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(withDefaultProperties("main"));
sleeper.connectToInstance(MAIN);
}

@Nested
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.dsl.compaction;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import sleeper.compaction.strategy.impl.BasicCompactionStrategy;
import sleeper.configuration.properties.validation.IngestFileWritingStrategy;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.schema.Schema;
import sleeper.systemtest.dsl.SleeperSystemTest;
import sleeper.systemtest.dsl.testutil.InMemoryDslTest;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;
import java.util.stream.LongStream;

import static java.util.stream.Collectors.toUnmodifiableList;
import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_FILES_BATCH_SIZE;
import static sleeper.configuration.properties.table.TableProperty.COMPACTION_STRATEGY_CLASS;
import static sleeper.configuration.properties.table.TableProperty.INGEST_FILE_WRITING_STRATEGY;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.addPrefix;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;

@InMemoryDslTest
public class ParallelCompactionsTest {
private final Schema schema = DEFAULT_SCHEMA;
public static final int NUMBER_OF_COMPACTIONS = 5;

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(MAIN);
}

@Test
void shouldApplyOneCompactionPerPartition(SleeperSystemTest sleeper) {
// Given we have partitions split evenly across the intended range of records
sleeper.setGeneratorOverrides(overrideField(ROW_KEY_FIELD_NAME,
numberStringAndZeroPadTo(4).then(addPrefix("row-"))));
List<String> leafIds = IntStream.range(0, NUMBER_OF_COMPACTIONS)
.mapToObj(i -> "" + i)
.collect(toUnmodifiableList());
List<Object> splitPoints = IntStream.range(1, NUMBER_OF_COMPACTIONS)
.map(i -> 10000 * i / NUMBER_OF_COMPACTIONS)
.mapToObj(i -> "row-" + numberStringAndZeroPadTo(4, i))
.collect(toUnmodifiableList());
sleeper.partitioning().setPartitions(new PartitionsBuilder(schema)
.leavesWithSplits(leafIds, splitPoints)
.anyTreeJoiningAllLeaves()
.buildTree());
// And we have records spread across all partitions in two files per partition
// And we configure to compact every partition
sleeper.updateTableProperties(Map.of(
INGEST_FILE_WRITING_STRATEGY, IngestFileWritingStrategy.ONE_FILE_PER_LEAF.toString(),
COMPACTION_STRATEGY_CLASS, BasicCompactionStrategy.class.getName(),
COMPACTION_FILES_BATCH_SIZE, "2"));
sleeper.ingest().direct(null)
.numberedRecords(LongStream.range(0, 10000))
.numberedRecords(LongStream.range(0, 10000));

// When we run compaction
sleeper.compaction()
.forceStartTasks(NUMBER_OF_COMPACTIONS)
.createJobs(NUMBER_OF_COMPACTIONS)
.waitForJobs();

// Then we have one output file per compaction
assertThat(sleeper.tableFiles().references())
.hasSize(NUMBER_OF_COMPACTIONS);
// And we have the same records afterwards
assertThat(inAnyOrder(sleeper.directQuery().allRecordsInTable()))
.isEqualTo(inAnyOrder(sleeper.generateNumberedRecords(
LongStream.range(0, 10000)
.flatMap(i -> LongStream.of(i, i)))));
kr565370 marked this conversation as resolved.
Show resolved Hide resolved
}

private static Set<Record> inAnyOrder(Iterable<Record> records) {
Set<Record> set = new HashSet<>();
records.forEach(set::add);
return set;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import static sleeper.systemtest.dsl.instance.SystemTestInstanceConfiguration.usingSystemTestDefaults;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.createDslInstanceProperties;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.createDslTableProperties;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
Expand Down Expand Up @@ -99,8 +100,7 @@ void shouldSetPartitionsForMultipleTables(SleeperSystemTest sleeper) {
.buildTree();

// When
sleeper.tables().forEach(() ->
sleeper.partitioning().setPartitions(partitions));
sleeper.tables().forEach(() -> sleeper.partitioning().setPartitions(partitions));

// Then
assertThat(sleeper.partitioning().treeByTable())
Expand Down Expand Up @@ -199,7 +199,7 @@ void shouldGenerateNameForPredefinedTable(SleeperSystemTest sleeper) {
// When
sleeper.connectToInstance(usingSystemTestDefaults("predeftable", () -> {
InstanceProperties instanceProperties = createDslInstanceProperties();
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
TableProperties tableProperties = createDslTableProperties(instanceProperties);
tableProperties.set(TABLE_NAME, "predefined-test-table");
return new DeployInstanceConfiguration(instanceProperties, tableProperties);
}));
Expand All @@ -215,7 +215,7 @@ void shouldRefusePredefinedTableWithNoName(SleeperSystemTest sleeper) {
// Given
SystemTestInstanceConfiguration configuration = usingSystemTestDefaults("nonametable", () -> {
InstanceProperties instanceProperties = createDslInstanceProperties();
TableProperties tableProperties = createTestTableProperties(instanceProperties, DEFAULT_SCHEMA);
TableProperties tableProperties = createDslTableProperties(instanceProperties);
tableProperties.unset(TABLE_NAME);
return new DeployInstanceConfiguration(instanceProperties, tableProperties);
});
Expand Down
Loading
Loading