Skip to content

Commit

Permalink
Merge pull request #3939 from gchq/3724-compaction-task-creation-in-t…
Browse files Browse the repository at this point in the history
…ests

Issue 3724 - Enable compaction task creation schedule in system tests
  • Loading branch information
patchwork01 authored Dec 18, 2024
2 parents 277f4e5 + f75f98f commit 5593130
Show file tree
Hide file tree
Showing 27 changed files with 346 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,8 @@ private void lambdaToCreateIngestTasks(CoreStacks coreStacks, Queue ingestJobQue
// Cloudwatch rule to trigger this lambda
Rule rule = Rule.Builder
.create(this, "IngestTasksCreationPeriodicTrigger")
.ruleName(SleeperScheduleRule.INGEST.buildRuleName(instanceProperties))
.description(SleeperScheduleRule.INGEST.getDescription())
.ruleName(SleeperScheduleRule.INGEST_TASK_CREATION.buildRuleName(instanceProperties))
.description(SleeperScheduleRule.INGEST_TASK_CREATION.getDescription())
.enabled(!shouldDeployPaused(this))
.schedule(Schedule.rate(Duration.minutes(instanceProperties.getInt(INGEST_TASK_CREATION_PERIOD_IN_MINUTES))))
.targets(Collections.singletonList(new LambdaFunction(handler)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class SleeperScheduleRule {
public static final SleeperScheduleRule GARBAGE_COLLECTOR = add(
GARBAGE_COLLECTOR_CLOUDWATCH_RULE, "%s-GarbageCollectorPeriodicTrigger",
"Triggers garbage collection to delete unused files");
public static final SleeperScheduleRule INGEST = add(
public static final SleeperScheduleRule INGEST_TASK_CREATION = add(
INGEST_CLOUDWATCH_RULE, "%s-IngestTasksCreationRule",
"Triggers scaling ingest tasks to run queued jobs");
public static final SleeperScheduleRule INGEST_BATCHER_JOB_CREATION = add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.autoscaling.AutoScalingClient;
import software.amazon.awssdk.services.ec2.Ec2Client;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.Message;
Expand All @@ -33,28 +32,22 @@
import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.compaction.core.job.CompactionJobStatusStore;
import sleeper.compaction.core.job.creation.CreateCompactionJobs;
import sleeper.compaction.core.task.CompactionTaskStatus;
import sleeper.compaction.core.task.CompactionTaskStatusStore;
import sleeper.compaction.job.creation.AwsCreateCompactionJobs;
import sleeper.compaction.status.store.job.CompactionJobStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.util.ObjectFactory;
import sleeper.core.util.ObjectFactoryException;
import sleeper.core.util.PollWithRetries;
import sleeper.systemtest.drivers.util.AwsDrainSqsQueue;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.compaction.CompactionDriver;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.task.common.EC2Scaler;
import sleeper.task.common.RunCompactionTasks;

import java.io.IOException;
import java.util.List;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_TRIGGER_LAMBDA_FUNCTION;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_TASK_CREATION_LAMBDA_FUNCTION;

public class AwsCompactionDriver implements CompactionDriver {
private static final Logger LOGGER = LoggerFactory.getLogger(AwsCompactionDriver.class);
Expand All @@ -65,7 +58,6 @@ public class AwsCompactionDriver implements CompactionDriver {
private final AmazonS3 s3Client;
private final AmazonSQS sqsClient;
private final SqsClient sqsClientV2;
private final EcsClient ecsClient;
private final AutoScalingClient asClient;
private final Ec2Client ec2Client;
private final CompactionJobSerDe serDe = new CompactionJobSerDe();
Expand All @@ -77,7 +69,6 @@ public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients
this.s3Client = clients.getS3();
this.sqsClient = clients.getSqs();
this.sqsClientV2 = clients.getSqsV2();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
this.ec2Client = clients.getEc2();
}
Expand Down Expand Up @@ -109,41 +100,6 @@ public void forceCreateJobs() {
});
}

@Override
public void invokeTasks(int expectedTasks, PollWithRetries poll) {
CompactionTaskStatusStore store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instance.getInstanceProperties());
long tasksFinishedBefore = store.getAllTasks().stream().filter(CompactionTaskStatus::isFinished).count();
try {
poll.pollUntil("tasks are started", () -> {
InvokeLambda.invokeWith(lambdaClient, instance.getInstanceProperties().get(COMPACTION_TASK_CREATION_LAMBDA_FUNCTION));
long tasksStarted = store.getAllTasks().size() - tasksFinishedBefore;
LOGGER.info("Found {} running compaction tasks", tasksStarted);
return tasksStarted >= expectedTasks;
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@Override
public void forceStartTasks(int numberOfTasks, PollWithRetries poll) {
CompactionTaskStatusStore store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instance.getInstanceProperties());
long tasksFinishedBefore = store.getAllTasks().stream().filter(CompactionTaskStatus::isFinished).count();
new RunCompactionTasks(instance.getInstanceProperties(), ecsClient, asClient, ec2Client)
.runToMeetTargetTasks(numberOfTasks);
try {
poll.pollUntil("tasks are started", () -> {
long tasksStarted = store.getAllTasks().size() - tasksFinishedBefore;
LOGGER.info("Found {} running compaction tasks", tasksStarted);
return tasksStarted >= numberOfTasks;
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
}

@Override
public void scaleToZero() {
EC2Scaler.create(instance.getInstanceProperties(), asClient, ec2Client).scaleTo(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory;
import sleeper.systemtest.drivers.util.SystemTestClients;
import sleeper.systemtest.dsl.ingest.IngestTasksDriver;
import sleeper.systemtest.dsl.ingest.WaitForIngestTasks;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.util.WaitForTasks;

public class AwsIngestTasksDriver implements IngestTasksDriver {

Expand All @@ -37,9 +37,9 @@ public AwsIngestTasksDriver(SystemTestInstanceContext instance, SystemTestClient
}

@Override
public WaitForIngestTasks waitForTasksForCurrentInstance() {
public WaitForTasks waitForTasksForCurrentInstance() {
InstanceProperties instanceProperties = instance.getInstanceProperties();
IngestJobStatusStore statusStore = IngestJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
return new WaitForIngestTasks(statusStore);
return new WaitForTasks(statusStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import sleeper.compaction.core.job.CompactionJob;
import sleeper.compaction.core.job.CompactionJobStatusStore;
import sleeper.core.util.PollWithRetries;

import java.util.List;

Expand All @@ -30,10 +29,6 @@ public interface CompactionDriver {

void forceCreateJobs();

void invokeTasks(int expectedTasks, PollWithRetries poll);

void forceStartTasks(int numberOfTasks, PollWithRetries poll);

void scaleToZero();

List<CompactionJob> drainJobsQueueForWholeInstance();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import sleeper.systemtest.dsl.sourcedata.IngestSourceFilesContext;
import sleeper.systemtest.dsl.util.PollWithRetriesDriver;
import sleeper.systemtest.dsl.util.WaitForJobs;
import sleeper.systemtest.dsl.util.WaitForTasks;

import java.time.Duration;
import java.util.List;
Expand Down Expand Up @@ -65,19 +66,16 @@ public SystemTestCompaction forceCreateJobs(int expectedJobs) {
}

public SystemTestCompaction splitFilesAndRunJobs(int expectedJobs) {
forceCreateJobs(expectedJobs).invokeTasks(1).waitForJobsToFinishThenCommit(
forceCreateJobs(expectedJobs).waitForTasks(1).waitForJobsToFinishThenCommit(
pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(5), Duration.ofMinutes(30)),
pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(5), Duration.ofMinutes(5)));
return this;
}

public SystemTestCompaction invokeTasks(int expectedTasks) {
driver.invokeTasks(expectedTasks, pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
return this;
}

public SystemTestCompaction forceStartTasks(int expectedTasks) {
driver.forceStartTasks(expectedTasks, pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
public SystemTestCompaction waitForTasks(int expectedTasks) {
new WaitForTasks(driver.getJobStatusStore())
.waitUntilNumTasksStartedAJob(expectedTasks, lastJobIds,
pollDriver.pollWithIntervalAndTimeout(Duration.ofSeconds(10), Duration.ofMinutes(3)));
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
*/
package sleeper.systemtest.dsl.ingest;

import sleeper.systemtest.dsl.util.WaitForTasks;

public interface IngestTasksDriver {

WaitForIngestTasks waitForTasksForCurrentInstance();
WaitForTasks waitForTasksForCurrentInstance();

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static sleeper.core.deploy.SleeperScheduleRule.INGEST;
import static sleeper.core.deploy.SleeperScheduleRule.COMPACTION_TASK_CREATION;
import static sleeper.core.deploy.SleeperScheduleRule.INGEST_TASK_CREATION;
import static sleeper.core.deploy.SleeperScheduleRule.QUERY_WARM_LAMBDA;
import static sleeper.core.deploy.SleeperScheduleRule.TRANSACTION_LOG_SNAPSHOT_CREATION;
import static sleeper.core.deploy.SleeperScheduleRule.TRANSACTION_LOG_SNAPSHOT_DELETION;
Expand Down Expand Up @@ -101,7 +102,8 @@ public static final class Builder {
TRANSACTION_LOG_SNAPSHOT_CREATION,
TRANSACTION_LOG_SNAPSHOT_DELETION,
TRANSACTION_LOG_TRANSACTION_DELETION,
INGEST).collect(toSet());
INGEST_TASK_CREATION,
COMPACTION_TASK_CREATION).collect(toSet());
private String shortName;

private Builder() {
Expand All @@ -127,6 +129,11 @@ public Builder enableSchedules(List<SleeperScheduleRule> enableSchedules) {
return this;
}

public Builder disableSchedules(List<SleeperScheduleRule> disableSchedules) {
this.enableSchedules.removeAll(disableSchedules);
return this;
}

public SystemTestInstanceConfiguration build() {
return new SystemTestInstanceConfiguration(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.systemtest.dsl.ingest;
package sleeper.systemtest.dsl.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sleeper.compaction.core.job.CompactionJobStatusStore;
import sleeper.core.record.process.status.ProcessRun;
import sleeper.core.util.PollWithRetries;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.systemtest.dsl.util.PollWithRetriesDriver;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;

public class WaitForIngestTasks {
public static final Logger LOGGER = LoggerFactory.getLogger(WaitForIngestTasks.class);
public class WaitForTasks {
public static final Logger LOGGER = LoggerFactory.getLogger(WaitForTasks.class);

private final IngestJobStatusStore jobStatusStore;
private final JobStatusStore jobStatusStore;

public WaitForIngestTasks(IngestJobStatusStore jobStatusStore) {
this.jobStatusStore = jobStatusStore;
public WaitForTasks(IngestJobStatusStore jobStatusStore) {
this.jobStatusStore = JobStatusStore.forIngest(jobStatusStore);
}

public WaitForTasks(CompactionJobStatusStore jobStatusStore) {
this.jobStatusStore = JobStatusStore.forCompaction(jobStatusStore);
}

public void waitUntilOneTaskStartedAJob(List<String> jobIds, PollWithRetriesDriver pollDriver) {
Expand All @@ -61,12 +67,27 @@ public void waitUntilNumTasksStartedAJob(int expectedTasks, List<String> jobIds,
}

private int numTasksStartedAJob(List<String> jobIds) {
Set<String> taskIds = jobIds.stream()
.flatMap(jobId -> jobStatusStore.getJob(jobId).stream())
.flatMap(status -> status.getJobRuns().stream())
Set<String> taskIds = jobStatusStore.findRunsOfJobs(jobIds)
.map(ProcessRun::getTaskId)
.collect(toSet());
.collect(toUnmodifiableSet());
LOGGER.info("Found {} tasks with runs for given jobs", taskIds.size());
return taskIds.size();
}

@FunctionalInterface
private interface JobStatusStore {
Stream<ProcessRun> findRunsOfJobs(Collection<String> jobIds);

static JobStatusStore forIngest(IngestJobStatusStore store) {
return jobIds -> jobIds.stream().parallel()
.flatMap(jobId -> store.getJob(jobId).stream())
.flatMap(job -> job.getJobRuns().stream());
}

static JobStatusStore forCompaction(CompactionJobStatusStore store) {
return jobIds -> jobIds.stream().parallel()
.flatMap(jobId -> store.getJob(jobId).stream())
.flatMap(job -> job.getJobRuns().stream());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ void shouldCompactFilesUsingDefaultCompactionStrategy(SleeperSystemTest sleeper)
.numberedRecords(numbers.range(36, 46));

// When
sleeper.compaction().createJobs(1).invokeTasks(1).waitForJobs();
sleeper.compaction().createJobs(1).waitForTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
Expand All @@ -95,7 +95,7 @@ void shouldCompactFilesUsingBasicCompactionStrategy(SleeperSystemTest sleeper) {
.numberedRecords(numbers.range(75, 100));

// When
sleeper.compaction().createJobs(2).invokeTasks(1).waitForJobs();
sleeper.compaction().createJobs(2).waitForTasks(1).waitForJobs();

// Then
assertThat(sleeper.directQuery().allRecordsInTable())
Expand Down Expand Up @@ -140,7 +140,7 @@ void shouldCompactOneFileIntoExistingFilesOnLeafPartitions(SleeperSystemTest sle
sleeper.ingest().direct(tempDir).numberedRecords(LongStream.range(0, 50).map(n -> n * 2 + 1));

// When we run compaction
sleeper.compaction().createJobs(4).invokeTasks(1).waitForJobs();
sleeper.compaction().createJobs(4).waitForTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
Expand All @@ -166,7 +166,7 @@ void shouldCompactOneFileFromRootIntoExistingFilesOnLeafPartitions(SleeperSystem
// And we run compaction
sleeper.compaction()
.createJobs(0).createJobs(4) // Split down two levels of the tree
.invokeTasks(1).waitForJobs();
.waitForTasks(1).waitForJobs();

// Then the same records should be present, in one file on each leaf partition
assertThat(sleeper.directQuery().allRecordsInTable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void shouldCreateLargeQuantitiesOfCompactionJobsAtOnce(SleeperSystemTest sleeper
// When
sleeper.compaction()
.createJobs(1024)
.invokeTasks(1).waitForJobs();
.waitForTasks(1).waitForJobs();

// Then
AllReferencesToAllFiles files = sleeper.tableFiles().all();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ void shouldApplyOneCompactionPerPartition(SleeperSystemTest sleeper) {

// When we run compaction
sleeper.compaction()
.forceStartTasks(NUMBER_OF_COMPACTIONS)
.createJobs(NUMBER_OF_COMPACTIONS)
.waitForTasks(NUMBER_OF_COMPACTIONS)
.waitForJobs();

// Then we have one output file per compaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ void shouldGarbageCollectFilesAfterCompaction(SleeperSystemTest sleeper) {
IntStream.range(0, 1000)
.mapToObj(i -> numbers.range(i * 100, i * 100 + 100))
.forEach(range -> ingest.numberedRecords(range));
sleeper.compaction().createJobs(200).invokeTasks(1).waitForJobs();
sleeper.compaction().createJobs(200).waitForTasks(1).waitForJobs();

// When
sleeper.garbageCollection().invoke().waitFor(
Expand Down
Loading

0 comments on commit 5593130

Please sign in to comment.