Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1790 - System test for compactions in contention #2009

Merged
merged 21 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
b1b330a
Set up ParallelCompactionsTest
patchwork01 Mar 12, 2024
198170a
Merge branch '1918-update-multiple-tables-system-tests' into 1790-com…
patchwork01 Mar 13, 2024
c3fc294
Reformat GenerateNumberedValue
patchwork01 Mar 13, 2024
15395db
Make ParallelCompactionsTest work
patchwork01 Mar 13, 2024
cdd3ee3
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
f8564eb
Merge remote-tracking branch 'origin/2026-split-task-creation-lambda-…
patchwork01 Mar 14, 2024
1274c9d
Force starting compaction tasks
patchwork01 Mar 14, 2024
144e457
Add ParallelCompactionsIT
patchwork01 Mar 14, 2024
b3a1a4a
Remove unused constructor
patchwork01 Mar 14, 2024
a0196c7
Rearrange given comments
patchwork01 Mar 14, 2024
058194b
Merge branch '2026-split-task-creation-lambda-modules' into 1790-comp…
patchwork01 Mar 14, 2024
a494dcc
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
8fc0d8f
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 14, 2024
db76524
Fix record assertions in parallel compactions tests
patchwork01 Mar 14, 2024
878c4d6
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 15, 2024
9c52fe0
Add temporary directory for ParallelCompactionsIT
patchwork01 Mar 15, 2024
2734f24
Merge branch '2062-python-properties-files' into 1790-compaction-cont…
patchwork01 Mar 18, 2024
f62b02c
Set timeout for starting compaction tasks
patchwork01 Mar 18, 2024
5482bad
Increase compaction task max idle time in system tests
patchwork01 Mar 18, 2024
ccfd1e2
Merge branch 'develop' into 1790-compaction-contention-system-test
patchwork01 Mar 18, 2024
70f1adc
Allow forcing more than maximum compaction tasks
patchwork01 Mar 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,21 @@ public class RunCompactionTasks {
private final String clusterName;
private final String launchType;
private final int maximumRunningTasks;
private final QueueMessageCount.Client queueMessageCount;
private final TaskCounts taskCounts;
private final Scaler scaler;
private final TaskLauncher launchTasks;

public RunCompactionTasks(AmazonSQS sqsClient,
AmazonECS ecsClient,
AmazonS3 s3Client,
AmazonAutoScaling asClient,
InstanceProperties instanceProperties) {
this(instanceProperties, QueueMessageCount.withSqsClient(sqsClient),
public RunCompactionTasks(
InstanceProperties instanceProperties, AmazonECS ecsClient, AmazonAutoScaling asClient) {
this(instanceProperties,
(clusterName) -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient),
createEC2Scaler(instanceProperties, asClient, ecsClient),
(startTime, numberOfTasksToCreate) -> launchTasks(ecsClient, instanceProperties, startTime, numberOfTasksToCreate));
}

public RunCompactionTasks(InstanceProperties instanceProperties,
QueueMessageCount.Client queueMessageCount,
TaskCounts taskCounts,
Scaler scaler,
TaskLauncher launchTasks) {
public RunCompactionTasks(
InstanceProperties instanceProperties, TaskCounts taskCounts, Scaler scaler, TaskLauncher launchTasks) {
this.instanceProperties = instanceProperties;
this.queueMessageCount = queueMessageCount;
this.taskCounts = taskCounts;
this.scaler = scaler;
this.launchTasks = launchTasks;
Expand All @@ -111,44 +103,42 @@ public interface Scaler {
void scaleTo(String asGroupName, int numberContainers);
}

public void run() {
public void run(QueueMessageCount.Client queueMessageCount) {
long startTime = System.currentTimeMillis();
LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
// Find out number of messages in queue that are not being processed
int queueSize = queueMessageCount.getQueueMessageCount(sqsJobQueueUrl)
.getApproximateNumberOfMessages();
LOGGER.info("Queue size is {}", queueSize);
// Request 1 task for each item on the queue
run(startTime, queueSize);
LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
runToMeetTargetTasks(startTime, Math.min(queueSize, maximumRunningTasks));
}

public void run(int requestedTasks) {
run(System.currentTimeMillis(), requestedTasks);
public void runToMeetTargetTasks(int requestedTasks) {
runToMeetTargetTasks(System.currentTimeMillis(), requestedTasks);
}

private void run(long startTime, int requestedTasks) {
if (requestedTasks == 0) {
LOGGER.info("Finishing as number of tasks requested was 0");
private void runToMeetTargetTasks(long startTime, int targetTasks) {
if (targetTasks == 0) {
LOGGER.info("Finishing as target tasks was 0");
return;
}
// Find out number of pending and running tasks
LOGGER.info("Target concurrent tasks is {}", targetTasks);

int numRunningAndPendingTasks = taskCounts.getRunningAndPending(clusterName);
LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);

if (numRunningAndPendingTasks >= maximumRunningTasks) {
LOGGER.info("Finishing as maximum running tasks of {} has been reached", maximumRunningTasks);
if (numRunningAndPendingTasks >= targetTasks) {
LOGGER.info("Finishing as target has been reached");
return;
}
int maxNumTasksToCreate = maximumRunningTasks - numRunningAndPendingTasks;
LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
LOGGER.info("Maximum number of tasks that can be created is {}", maxNumTasksToCreate);
int numberOfTasksToCreate = Math.min(requestedTasks, maxNumTasksToCreate);

if (launchType.equalsIgnoreCase("EC2")) {
int totalTasks = numberOfTasksToCreate + numRunningAndPendingTasks;
LOGGER.info("Total number of tasks if all launches succeed {}", totalTasks);
scaler.scaleTo(instanceProperties.get(COMPACTION_AUTO_SCALING_GROUP), totalTasks);
scaler.scaleTo(instanceProperties.get(COMPACTION_AUTO_SCALING_GROUP), targetTasks);
}

int numberOfTasksToCreate = targetTasks - numRunningAndPendingTasks;
LOGGER.info("Tasks to create is {}", numberOfTasksToCreate);
launchTasks.launchTasks(startTime, numberOfTasksToCreate);
}

Expand Down Expand Up @@ -282,8 +272,8 @@ public static void main(String[] args) {
try {
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties)
.run(numberOfTasks);
new RunCompactionTasks(instanceProperties, ecsClient, asClient)
.runToMeetTargetTasks(numberOfTasks);
} finally {
sqsClient.shutdown();
ecsClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties;
Expand Down Expand Up @@ -62,15 +63,6 @@ void shouldCreateOneTasksWhenQueueHasOneMessages() {
// Then
assertThat(tasksCreated).isEqualTo(5);
}

private int runTasks(QueueMessageCount.Client queueMessageClient, TaskCounts taskCounts) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, queueMessageClient, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run();
return tasksLaunched.get();
}
}

@DisplayName("Launch tasks with tasks already running")
Expand All @@ -89,7 +81,7 @@ void shouldCreateTasksUnderMaximumConcurrentLimit() {
}

@Test
void shouldCreateTasksWhenMaximumConcurrentTasksHasBeenMet() {
void shouldCreateTasksUpToMaximumConcurrentTasks() {
// Given
instanceProperties.setNumber(MAXIMUM_CONCURRENT_COMPACTION_TASKS, 1);

Expand Down Expand Up @@ -135,6 +127,18 @@ void shouldNotCreateTasksWhenExistingRunningOrPendingTaskCountIsMoreThanMaxConcu
// Then
assertThat(tasksCreated).isZero();
}

@Test
void shouldForceCreateTasksOverMaximumConcurrentTasks() {
// Given
instanceProperties.setNumber(MAXIMUM_CONCURRENT_COMPACTION_TASKS, 1);

// When
int tasksCreated = runToMeetTargetTasks(2, noRunningOrPendingTasks());

// Then
assertThat(tasksCreated).isEqualTo(2);
}
}

@DisplayName("Auto scale if needed")
Expand Down Expand Up @@ -172,20 +176,32 @@ void shouldAutoScaleWithExistingRunningOrPendingTasks() {
instanceProperties.set(COMPACTION_ECS_LAUNCHTYPE, "EC2");

// When
runTasks(2, runningOrPendingTasks(3));
runTasks(5, runningOrPendingTasks(3));

// Then
assertThat(numContainersByScalingGroup).isEqualTo(Map.of(
TEST_AUTO_SCALING_GROUP, 5));
}
}

private int runTasks(QueueMessageCount.Client queueMessageClient, TaskCounts taskCounts) {
return run(taskCounts, runTasks -> runTasks.run(queueMessageClient));
}

private int runTasks(int requestedTasks, TaskCounts taskCounts) {
return runTasks(messagesOnQueue(requestedTasks), taskCounts);
}

private int runToMeetTargetTasks(int requestedTasks, TaskCounts taskCounts) {
return run(taskCounts, runTasks -> runTasks.runToMeetTargetTasks(requestedTasks));
}

private int run(TaskCounts taskCounts, Consumer<RunCompactionTasks> run) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, noMessagesOnQueue(), taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run(requestedTasks);
run.accept(runTasks);
return tasksLaunched.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.task.common.QueueMessageCount;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
Expand All @@ -37,6 +38,7 @@
@SuppressWarnings("unused")
public class RunCompactionTasksLambda {
private final RunCompactionTasks runTasks;
private final QueueMessageCount.Client queueMessageCount;

public RunCompactionTasksLambda() {
String s3Bucket = validateParameter(CONFIG_BUCKET.toEnvironmentVariable());
Expand All @@ -46,11 +48,12 @@ public RunCompactionTasksLambda() {
AmazonAutoScaling asClient = AmazonAutoScalingClientBuilder.defaultClient();
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
this.runTasks = new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties);
this.runTasks = new RunCompactionTasks(instanceProperties, ecsClient, asClient);
this.queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
}

public void eventHandler(ScheduledEvent event, Context context) {
runTasks.run();
runTasks.run(queueMessageCount);
}

private static String validateParameter(String parameterName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package sleeper.systemtest.drivers.compaction;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.ecs.AmazonECS;
import com.amazonaws.services.sqs.AmazonSQS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +40,7 @@
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;
Expand All @@ -50,12 +53,16 @@ public class AwsCompactionDriver implements CompactionDriver {
private final LambdaClient lambdaClient;
private final AmazonDynamoDB dynamoDBClient;
private final AmazonSQS sqsClient;
private final AmazonECS ecsClient;
private final AmazonAutoScaling asClient;

public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients clients) {
this.instance = instance;
this.lambdaClient = clients.getLambda();
this.dynamoDBClient = clients.getDynamoDB();
this.sqsClient = clients.getSqs();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
}

@Override
Expand Down Expand Up @@ -99,4 +106,22 @@ public void invokeTasks(int expectedTasks, PollWithRetries poll) {
throw new RuntimeException(e);
}
}

@Override
public void forceStartTasks(int numberOfTasks, PollWithRetries poll) {
CompactionTaskStatusStore store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instance.getInstanceProperties());
long tasksFinishedBefore = store.getAllTasks().stream().filter(CompactionTaskStatus::isFinished).count();
new RunCompactionTasks(instance.getInstanceProperties(), ecsClient, asClient)
.runToMeetTargetTasks(numberOfTasks);
try {
poll.pollUntil("tasks are started", () -> {
long tasksStarted = store.getAllTasks().size() - tasksFinishedBefore;
LOGGER.info("Found {} running compaction tasks", tasksStarted);
return tasksStarted >= numberOfTasks;
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package sleeper.systemtest.drivers.util;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.autoscaling.AmazonAutoScalingClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.ecr.AmazonECR;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class SystemTestClients {
private final EmrServerlessClient emrServerless = EmrServerlessClient.create();
private final AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.defaultClient();
private final AmazonECS ecs = AmazonECSClientBuilder.defaultClient();
private final AmazonAutoScaling autoScaling = AmazonAutoScalingClientBuilder.defaultClient();
private final AmazonECR ecr = AmazonECRClientBuilder.defaultClient();
private final CloudWatchClient cloudWatch = CloudWatchClient.create();

Expand Down Expand Up @@ -100,6 +103,10 @@ public AmazonECS getEcs() {
return ecs;
}

public AmazonAutoScaling getAutoScaling() {
return autoScaling;
}

public AmazonECR getEcr() {
return ecr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public interface CompactionDriver {
default void invokeTasks(int expectedTasks) {
invokeTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}

void forceStartTasks(int numberOfTasks, PollWithRetries poll);

default void forceStartTasks(int expectedTasks) {
forceStartTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public SystemTestCompaction invokeTasks(int expectedTasks) {
return this;
}

public SystemTestCompaction forceStartTasks(int expectedTasks, PollWithRetries poll) {
driver.forceStartTasks(expectedTasks, poll);
return this;
}

public SystemTestCompaction waitForJobs() {
waitForJobs.waitForJobs(lastJobIds);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ static GenerateNumberedValue forField(KeyType keyType, Field field) {
}

static GenerateNumberedValue numberStringAndZeroPadTo(int size) {
return num -> StringUtils.leftPad(num + "", size, "0");
return num -> numberStringAndZeroPadTo(size, num);
}

static String numberStringAndZeroPadTo(int size, long number) {
return StringUtils.leftPad(number + "", size, "0");
}

static UnaryOperator<Object> addPrefix(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
public class CompactionTest {

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(withDefaultProperties("main"));
sleeper.connectToInstance(MAIN);
}

@Nested
Expand Down
Loading
Loading