Skip to content

Commit

Permalink
Merge pull request #3573 from gchq/3554-optional-file-assignment-wait
Browse files Browse the repository at this point in the history
Issue 3554 - Make compaction task file assignment wait optional
  • Loading branch information
patchwork01 authored Oct 29, 2024
2 parents 74b8d4b + 697c1aa commit 9f02b77
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 3 deletions.
11 changes: 11 additions & 0 deletions example/full/instance.properties
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,17 @@ sleeper.compaction.job.failed.visibility.timeout.seconds=60
# messages in the time defined by this property, it will try to wait for a message again.
sleeper.compaction.task.wait.time.seconds=20

# Set to true if compaction tasks should wait for input files to be assigned to a compaction job
# before starting it. The compaction task will poll the state store for whether the input files have
# been assigned to the job, and will only start once this has occurred.
# This prevents invalid compaction jobs from being run, particularly in the case where the compaction
# job creator runs again before the input files are assigned.
# This also causes compaction tasks to wait idle while input files are assigned, and puts extra load
# on the state store when there are many compaction tasks.
# If this is false, any created job will be executed, and will only be validated when committed to the
# state store.
sleeper.compaction.task.wait.for.input.file.assignment=false

# The time in seconds for a compaction task to wait after receiving no compaction jobs before
# attempting to receive a message again.
# When a compaction task waits for compaction jobs to appear on the SQS queue, if the task receives no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT;

/**
* Runs a compaction task. Executes jobs from a queue, updating the status stores with progress of the task.
Expand Down Expand Up @@ -129,6 +130,7 @@ private Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.B
Duration maxIdleTime = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS));
int maxConsecutiveFailures = instanceProperties.getInt(COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES);
Duration delayBeforeRetry = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS));
boolean waitForFileAssignment = instanceProperties.getBoolean(COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT);
int numConsecutiveFailures = 0;
while (numConsecutiveFailures < maxConsecutiveFailures) {
Optional<MessageHandle> messageOpt = messageReceiver.receiveMessage();
Expand All @@ -154,7 +156,9 @@ private Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.B
String jobRunId = jobRunIdSupplier.get();
try {
propertiesReloader.reloadIfNeeded();
waitForFiles.wait(job, taskId, jobRunId);
if (waitForFileAssignment) {
waitForFiles.wait(job, taskId, jobRunId);
}
RecordsProcessedSummary summary = compact(job, jobRunId);
taskFinishedBuilder.addJobSummary(summary);
message.completed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,16 @@
*/
package sleeper.compaction.task;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.commit.CompactionJobCommitRequest;
import sleeper.core.record.process.ProcessRunTime;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.record.process.RecordsProcessedSummary;

import java.time.Duration;
import java.time.Instant;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -28,9 +33,18 @@
import static org.assertj.core.api.Assertions.assertThat;
import static sleeper.compaction.job.CompactionJobStatusTestData.failedCompactionRun;
import static sleeper.compaction.job.CompactionJobStatusTestData.jobCreated;
import static sleeper.compaction.job.CompactionJobStatusTestData.uncommittedCompactionRun;
import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT;
import static sleeper.core.properties.table.TableProperty.STATESTORE_ASYNC_COMMITS_ENABLED;
import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary;

public class CompactionTaskAssignFilesTest extends CompactionTaskTestBase {

@BeforeEach
void setUp() {
instanceProperties.set(COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT, "true");
}

@Test
void shouldRetryOnceWaitingForFilesToBeAssignedToJob() throws Exception {
// Given
Expand Down Expand Up @@ -111,4 +125,61 @@ void shouldFailWhenFileDeletedBeforeJob() throws Exception {
failedCompactionRun(DEFAULT_TASK_ID, new ProcessRunTime(waitForFilesTime, failTime), List.of(
"File reference not found in partition root, filename " + job.getInputFiles().get(0)))));
}

@Test
void shouldFailAtEndWhenFileAssignmentCheckDisabledWithDirectCommit() throws Exception {
// Given
instanceProperties.set(COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT, "false");
tableProperties.set(STATESTORE_ASYNC_COMMITS_ENABLED, "false");
CompactionJob job = createJob("test-job");
send(job);
stateStore.clearFileData();

// When
runTaskCheckingFiles(
waitForFileAssignment(timePassesAMinuteAtATimeFrom(Instant.parse("2024-10-28T11:45:00Z"))).withAttempts(10),
processJobs(jobSucceeds()),
timePassesAMinuteAtATimeFrom(Instant.parse("2024-10-28T11:50:00Z")));

// Then
assertThat(failedJobs).containsExactly(job);
assertThat(jobsOnQueue).isEmpty();
assertThat(foundWaitsForFileAssignment).isEmpty();
assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly(
jobCreated(job, DEFAULT_CREATED_TIME,
failedCompactionRun(DEFAULT_TASK_ID,
Instant.parse("2024-10-28T11:51:00Z"),
Instant.parse("2024-10-28T11:52:00Z"),
Instant.parse("2024-10-28T11:53:00Z"),
List.of("1 replace file reference requests failed to update the state store",
"File not found: " + job.getInputFiles().get(0)))));
}

@Test
void shouldSendInvalidCommitToQueueWhenFileAssignmentCheckDisabledWithAsyncCommit() throws Exception {
// Given
instanceProperties.set(COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT, "false");
tableProperties.set(STATESTORE_ASYNC_COMMITS_ENABLED, "true");
CompactionJob job = createJob("test-job");
send(job);
stateStore.clearFileData();

// When
runTaskCheckingFiles(
waitForFileAssignment(timePassesAMinuteAtATimeFrom(Instant.parse("2024-10-28T11:45:00Z"))).withAttempts(10),
processJobs(jobSucceeds(new RecordsProcessed(10L, 5L))),
timePassesAMinuteAtATimeFrom(Instant.parse("2024-10-28T11:50:00Z")));

// Then
RecordsProcessedSummary expectedSummary = summary(
Instant.parse("2024-10-28T11:51:00Z"), Duration.ofMinutes(1), 10, 5);
assertThat(failedJobs).isEmpty();
assertThat(jobsOnQueue).isEmpty();
assertThat(foundWaitsForFileAssignment).isEmpty();
assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly(
jobCreated(job, DEFAULT_CREATED_TIME,
uncommittedCompactionRun(DEFAULT_TASK_ID, expectedSummary)));
assertThat(commitRequestsOnQueue).containsExactly(
new CompactionJobCommitRequest(job, DEFAULT_TASK_ID, "test-job-run-1", expectedSummary));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@ protected void runTask(String taskId, CompactionRunner compactor, Supplier<Strin
}

protected void runTaskCheckingFiles(StateStoreWaitForFiles fileAssignmentCheck, CompactionRunner compactor) throws Exception {
runTask(pollQueue(), fileAssignmentCheck, compactor, timePassesAMinuteAtATime(), DEFAULT_TASK_ID, jobRunIdsInSequence());
runTaskCheckingFiles(fileAssignmentCheck, compactor, timePassesAMinuteAtATime());
}

protected void runTaskCheckingFiles(StateStoreWaitForFiles fileAssignmentCheck, CompactionRunner compactor, Supplier<Instant> timeSupplier) throws Exception {
runTask(pollQueue(), fileAssignmentCheck, compactor, timeSupplier, DEFAULT_TASK_ID, jobRunIdsInSequence());
}

protected void runTask(
Expand Down Expand Up @@ -327,7 +331,11 @@ protected void setAsyncCommit(boolean enabled, TableProperties... tablePropertie
}

private Supplier<Instant> timePassesAMinuteAtATime() {
return Stream.iterate(Instant.parse("2024-09-04T09:50:00Z"),
return timePassesAMinuteAtATimeFrom(Instant.parse("2024-09-04T09:50:00Z"));
}

protected Supplier<Instant> timePassesAMinuteAtATimeFrom(Instant firstTime) {
return Stream.iterate(firstTime,
time -> time.plus(Duration.ofMinutes(1)))
.iterator()::next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,19 @@ public interface CompactionProperty {
.defaultValue("20")
.validationPredicate(val -> SleeperPropertyValueUtils.isNonNegativeIntLtEqValue(val, 20))
.propertyGroup(InstancePropertyGroup.COMPACTION).build();
UserDefinedInstanceProperty COMPACTION_TASK_WAIT_FOR_INPUT_FILE_ASSIGNMENT = Index.propertyBuilder("sleeper.compaction.task.wait.for.input.file.assignment")
.description("Set to true if compaction tasks should wait for input files to be assigned to a compaction " +
"job before starting it. The compaction task will poll the state store for whether the input " +
"files have been assigned to the job, and will only start once this has occurred.\n" +
"This prevents invalid compaction jobs from being run, particularly in the case where the " +
"compaction job creator runs again before the input files are assigned.\n" +
"This also causes compaction tasks to wait idle while input files are assigned, and puts extra " +
"load on the state store when there are many compaction tasks.\n" +
"If this is false, any created job will be executed, and will only be validated when committed " +
"to the state store.")
.defaultValue("false")
.validationPredicate(SleeperPropertyValueUtils::isTrueOrFalse)
.propertyGroup(InstancePropertyGroup.COMPACTION).build();
UserDefinedInstanceProperty COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS = Index.propertyBuilder("sleeper.compaction.task.delay.before.retry.seconds")
.description("The time in seconds for a compaction task to wait after receiving no compaction jobs " +
"before attempting to receive a message again.\n" +
Expand Down
11 changes: 11 additions & 0 deletions scripts/templates/instanceproperties.template
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,17 @@ sleeper.compaction.job.failed.visibility.timeout.seconds=60
# messages in the time defined by this property, it will try to wait for a message again.
sleeper.compaction.task.wait.time.seconds=20

# Set to true if compaction tasks should wait for input files to be assigned to a compaction job
# before starting it. The compaction task will poll the state store for whether the input files have
# been assigned to the job, and will only start once this has occurred.
# This prevents invalid compaction jobs from being run, particularly in the case where the compaction
# job creator runs again before the input files are assigned.
# This also causes compaction tasks to wait idle while input files are assigned, and puts extra load
# on the state store when there are many compaction tasks.
# If this is false, any created job will be executed, and will only be validated when committed to the
# state store.
sleeper.compaction.task.wait.for.input.file.assignment=false

# The time in seconds for a compaction task to wait after receiving no compaction jobs before
# attempting to receive a message again.
# When a compaction task waits for compaction jobs to appear on the SQS queue, if the task receives no
Expand Down

0 comments on commit 9f02b77

Please sign in to comment.