Skip to content

Commit

Permalink
Merge pull request #2009 from gchq/1790-compaction-contention-system-…
Browse files Browse the repository at this point in the history
…test

Issue 1790 - System test for compactions in contention
  • Loading branch information
patchwork01 authored Mar 18, 2024
2 parents 6344e8c + 70f1adc commit 06d0c60
Show file tree
Hide file tree
Showing 15 changed files with 357 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,21 @@ public class RunCompactionTasks {
private final String clusterName;
private final String launchType;
private final int maximumRunningTasks;
private final QueueMessageCount.Client queueMessageCount;
private final TaskCounts taskCounts;
private final Scaler scaler;
private final TaskLauncher launchTasks;

public RunCompactionTasks(AmazonSQS sqsClient,
AmazonECS ecsClient,
AmazonS3 s3Client,
AmazonAutoScaling asClient,
InstanceProperties instanceProperties) {
this(instanceProperties, QueueMessageCount.withSqsClient(sqsClient),
public RunCompactionTasks(
InstanceProperties instanceProperties, AmazonECS ecsClient, AmazonAutoScaling asClient) {
this(instanceProperties,
(clusterName) -> ECSTaskCount.getNumPendingAndRunningTasks(clusterName, ecsClient),
createEC2Scaler(instanceProperties, asClient, ecsClient),
(startTime, numberOfTasksToCreate) -> launchTasks(ecsClient, instanceProperties, startTime, numberOfTasksToCreate));
}

public RunCompactionTasks(InstanceProperties instanceProperties,
QueueMessageCount.Client queueMessageCount,
TaskCounts taskCounts,
Scaler scaler,
TaskLauncher launchTasks) {
public RunCompactionTasks(
InstanceProperties instanceProperties, TaskCounts taskCounts, Scaler scaler, TaskLauncher launchTasks) {
this.instanceProperties = instanceProperties;
this.queueMessageCount = queueMessageCount;
this.taskCounts = taskCounts;
this.scaler = scaler;
this.launchTasks = launchTasks;
Expand All @@ -111,44 +103,42 @@ public interface Scaler {
void scaleTo(String asGroupName, int numberContainers);
}

public void run() {
public void run(QueueMessageCount.Client queueMessageCount) {
long startTime = System.currentTimeMillis();
LOGGER.info("Queue URL is {}", sqsJobQueueUrl);
// Find out number of messages in queue that are not being processed
int queueSize = queueMessageCount.getQueueMessageCount(sqsJobQueueUrl)
.getApproximateNumberOfMessages();
LOGGER.info("Queue size is {}", queueSize);
// Request 1 task for each item on the queue
run(startTime, queueSize);
LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
runToMeetTargetTasks(startTime, Math.min(queueSize, maximumRunningTasks));
}

public void run(int requestedTasks) {
run(System.currentTimeMillis(), requestedTasks);
public void runToMeetTargetTasks(int requestedTasks) {
runToMeetTargetTasks(System.currentTimeMillis(), requestedTasks);
}

private void run(long startTime, int requestedTasks) {
if (requestedTasks == 0) {
LOGGER.info("Finishing as number of tasks requested was 0");
private void runToMeetTargetTasks(long startTime, int targetTasks) {
if (targetTasks == 0) {
LOGGER.info("Finishing as target tasks was 0");
return;
}
// Find out number of pending and running tasks
LOGGER.info("Target concurrent tasks is {}", targetTasks);

int numRunningAndPendingTasks = taskCounts.getRunningAndPending(clusterName);
LOGGER.info("Number of running and pending tasks is {}", numRunningAndPendingTasks);

if (numRunningAndPendingTasks >= maximumRunningTasks) {
LOGGER.info("Finishing as maximum running tasks of {} has been reached", maximumRunningTasks);
if (numRunningAndPendingTasks >= targetTasks) {
LOGGER.info("Finishing as target has been reached");
return;
}
int maxNumTasksToCreate = maximumRunningTasks - numRunningAndPendingTasks;
LOGGER.info("Maximum concurrent tasks is {}", maximumRunningTasks);
LOGGER.info("Maximum number of tasks that can be created is {}", maxNumTasksToCreate);
int numberOfTasksToCreate = Math.min(requestedTasks, maxNumTasksToCreate);

if (launchType.equalsIgnoreCase("EC2")) {
int totalTasks = numberOfTasksToCreate + numRunningAndPendingTasks;
LOGGER.info("Total number of tasks if all launches succeed {}", totalTasks);
scaler.scaleTo(instanceProperties.get(COMPACTION_AUTO_SCALING_GROUP), totalTasks);
scaler.scaleTo(instanceProperties.get(COMPACTION_AUTO_SCALING_GROUP), targetTasks);
}

int numberOfTasksToCreate = targetTasks - numRunningAndPendingTasks;
LOGGER.info("Tasks to create is {}", numberOfTasksToCreate);
launchTasks.launchTasks(startTime, numberOfTasksToCreate);
}

Expand Down Expand Up @@ -282,8 +272,8 @@ public static void main(String[] args) {
try {
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties)
.run(numberOfTasks);
new RunCompactionTasks(instanceProperties, ecsClient, asClient)
.runToMeetTargetTasks(numberOfTasks);
} finally {
sqsClient.shutdown();
ecsClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties;
Expand Down Expand Up @@ -62,15 +63,6 @@ void shouldCreateOneTasksWhenQueueHasOneMessages() {
// Then
assertThat(tasksCreated).isEqualTo(5);
}

private int runTasks(QueueMessageCount.Client queueMessageClient, TaskCounts taskCounts) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, queueMessageClient, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run();
return tasksLaunched.get();
}
}

@DisplayName("Launch tasks with tasks already running")
Expand All @@ -89,7 +81,7 @@ void shouldCreateTasksUnderMaximumConcurrentLimit() {
}

@Test
void shouldCreateTasksWhenMaximumConcurrentTasksHasBeenMet() {
void shouldCreateTasksUpToMaximumConcurrentTasks() {
// Given
instanceProperties.setNumber(MAXIMUM_CONCURRENT_COMPACTION_TASKS, 1);

Expand Down Expand Up @@ -135,6 +127,18 @@ void shouldNotCreateTasksWhenExistingRunningOrPendingTaskCountIsMoreThanMaxConcu
// Then
assertThat(tasksCreated).isZero();
}

@Test
void shouldForceCreateTasksOverMaximumConcurrentTasks() {
// Given
instanceProperties.setNumber(MAXIMUM_CONCURRENT_COMPACTION_TASKS, 1);

// When
int tasksCreated = runToMeetTargetTasks(2, noRunningOrPendingTasks());

// Then
assertThat(tasksCreated).isEqualTo(2);
}
}

@DisplayName("Auto scale if needed")
Expand Down Expand Up @@ -172,20 +176,32 @@ void shouldAutoScaleWithExistingRunningOrPendingTasks() {
instanceProperties.set(COMPACTION_ECS_LAUNCHTYPE, "EC2");

// When
runTasks(2, runningOrPendingTasks(3));
runTasks(5, runningOrPendingTasks(3));

// Then
assertThat(numContainersByScalingGroup).isEqualTo(Map.of(
TEST_AUTO_SCALING_GROUP, 5));
}
}

private int runTasks(QueueMessageCount.Client queueMessageClient, TaskCounts taskCounts) {
return run(taskCounts, runTasks -> runTasks.run(queueMessageClient));
}

private int runTasks(int requestedTasks, TaskCounts taskCounts) {
return runTasks(messagesOnQueue(requestedTasks), taskCounts);
}

private int runToMeetTargetTasks(int requestedTasks, TaskCounts taskCounts) {
return run(taskCounts, runTasks -> runTasks.runToMeetTargetTasks(requestedTasks));
}

private int run(TaskCounts taskCounts, Consumer<RunCompactionTasks> run) {
AtomicInteger tasksLaunched = new AtomicInteger();
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, noMessagesOnQueue(), taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
RunCompactionTasks runTasks = new RunCompactionTasks(instanceProperties, taskCounts, scaler, (startTime, numberOfTasksToCreate) -> {
tasksLaunched.set(numberOfTasksToCreate);
});
runTasks.run(requestedTasks);
run.accept(runTasks);
return tasksLaunched.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;

import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.task.common.QueueMessageCount;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
Expand All @@ -37,6 +38,7 @@
@SuppressWarnings("unused")
public class RunCompactionTasksLambda {
private final RunCompactionTasks runTasks;
private final QueueMessageCount.Client queueMessageCount;

public RunCompactionTasksLambda() {
String s3Bucket = validateParameter(CONFIG_BUCKET.toEnvironmentVariable());
Expand All @@ -46,11 +48,12 @@ public RunCompactionTasksLambda() {
AmazonAutoScaling asClient = AmazonAutoScalingClientBuilder.defaultClient();
InstanceProperties instanceProperties = new InstanceProperties();
instanceProperties.loadFromS3(s3Client, s3Bucket);
this.runTasks = new RunCompactionTasks(sqsClient, ecsClient, s3Client, asClient, instanceProperties);
this.runTasks = new RunCompactionTasks(instanceProperties, ecsClient, asClient);
this.queueMessageCount = QueueMessageCount.withSqsClient(sqsClient);
}

public void eventHandler(ScheduledEvent event, Context context) {
runTasks.run();
runTasks.run(queueMessageCount);
}

private static String validateParameter(String parameterName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package sleeper.systemtest.drivers.compaction;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.ecs.AmazonECS;
import com.amazonaws.services.sqs.AmazonSQS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -38,6 +40,7 @@
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.task.common.RunCompactionTasks;

import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;
Expand All @@ -50,12 +53,16 @@ public class AwsCompactionDriver implements CompactionDriver {
private final LambdaClient lambdaClient;
private final AmazonDynamoDB dynamoDBClient;
private final AmazonSQS sqsClient;
private final AmazonECS ecsClient;
private final AmazonAutoScaling asClient;

public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients clients) {
this.instance = instance;
this.lambdaClient = clients.getLambda();
this.dynamoDBClient = clients.getDynamoDB();
this.sqsClient = clients.getSqs();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
}

@Override
Expand Down Expand Up @@ -99,4 +106,22 @@ public void invokeTasks(int expectedTasks, PollWithRetries poll) {
throw new RuntimeException(e);
}
}

@Override
public void forceStartTasks(int numberOfTasks, PollWithRetries poll) {
CompactionTaskStatusStore store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instance.getInstanceProperties());
long tasksFinishedBefore = store.getAllTasks().stream().filter(CompactionTaskStatus::isFinished).count();
new RunCompactionTasks(instance.getInstanceProperties(), ecsClient, asClient)
.runToMeetTargetTasks(numberOfTasks);
try {
poll.pollUntil("tasks are started", () -> {
long tasksStarted = store.getAllTasks().size() - tasksFinishedBefore;
LOGGER.info("Found {} running compaction tasks", tasksStarted);
return tasksStarted >= numberOfTasks;
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package sleeper.systemtest.drivers.util;

import com.amazonaws.services.autoscaling.AmazonAutoScaling;
import com.amazonaws.services.autoscaling.AmazonAutoScalingClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.ecr.AmazonECR;
Expand Down Expand Up @@ -53,6 +55,7 @@ public class SystemTestClients {
private final EmrServerlessClient emrServerless = EmrServerlessClient.create();
private final AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.defaultClient();
private final AmazonECS ecs = AmazonECSClientBuilder.defaultClient();
private final AmazonAutoScaling autoScaling = AmazonAutoScalingClientBuilder.defaultClient();
private final AmazonECR ecr = AmazonECRClientBuilder.defaultClient();
private final CloudWatchClient cloudWatch = CloudWatchClient.create();

Expand Down Expand Up @@ -100,6 +103,10 @@ public AmazonECS getEcs() {
return ecs;
}

public AmazonAutoScaling getAutoScaling() {
return autoScaling;
}

public AmazonECR getEcr() {
return ecr;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,10 @@ public interface CompactionDriver {
default void invokeTasks(int expectedTasks) {
invokeTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}

void forceStartTasks(int numberOfTasks, PollWithRetries poll);

default void forceStartTasks(int expectedTasks) {
forceStartTasks(expectedTasks, PollWithRetries.intervalAndPollingTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public SystemTestCompaction invokeTasks(int expectedTasks) {
return this;
}

public SystemTestCompaction forceStartTasks(int expectedTasks, PollWithRetries poll) {
driver.forceStartTasks(expectedTasks, poll);
return this;
}

public SystemTestCompaction waitForJobs() {
waitForJobs.waitForJobs(lastJobIds);
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,11 @@ static GenerateNumberedValue forField(KeyType keyType, Field field) {
}

static GenerateNumberedValue numberStringAndZeroPadTo(int size) {
return num -> StringUtils.leftPad(num + "", size, "0");
return num -> numberStringAndZeroPadTo(size, num);
}

static String numberStringAndZeroPadTo(int size, long number) {
return StringUtils.leftPad(number + "", size, "0");
}

static UnaryOperator<Object> addPrefix(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValue.numberStringAndZeroPadTo;
import static sleeper.systemtest.dsl.sourcedata.GenerateNumberedValueOverrides.overrideField;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.DEFAULT_SCHEMA;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.MAIN;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.ROW_KEY_FIELD_NAME;
import static sleeper.systemtest.dsl.testutil.InMemoryTestInstance.withDefaultProperties;

@InMemoryDslTest
public class CompactionTest {

@BeforeEach
void setUp(SleeperSystemTest sleeper) throws Exception {
sleeper.connectToInstance(withDefaultProperties("main"));
sleeper.connectToInstance(MAIN);
}

@Nested
Expand Down
Loading

0 comments on commit 06d0c60

Please sign in to comment.