From 4880fcd1ced4d35486b6a0663fca26eb2c25ef22 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 11:39:04 +0000 Subject: [PATCH 01/17] Extract finishedCompactionTask in test --- .../job/execution/CompactionTask.java | 8 ++- .../job/execution/CompactionTaskTest.java | 71 +++++++++---------- 2 files changed, 41 insertions(+), 38 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index 1397eb3ff4..02ee2bf746 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -20,6 +20,7 @@ import org.slf4j.LoggerFactory; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.task.CompactionTaskFinishedStatus; import sleeper.compaction.task.CompactionTaskStatus; import sleeper.compaction.task.CompactionTaskStatusStore; @@ -49,20 +50,23 @@ public class CompactionTask { private final Duration maxIdleTime; private final MessageReceiver messageReceiver; private final CompactionRunner compactor; + private final CompactionJobStatusStore jobStatusStore; private final CompactionTaskStatusStore taskStatusStore; private final String taskId; private final PropertiesReloader propertiesReloader; private int numConsecutiveFailures = 0; private int totalNumberOfMessagesProcessed = 0; - public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, Supplier timeSupplier, - MessageReceiver messageReceiver, CompactionRunner compactor, CompactionTaskStatusStore taskStore, String taskId) { + public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, + Supplier timeSupplier, MessageReceiver messageReceiver, CompactionRunner compactor, + CompactionJobStatusStore jobStatusStore, CompactionTaskStatusStore taskStore, String taskId) { maxIdleTime = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS)); maxConsecutiveFailures = instanceProperties.getInt(COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES); this.propertiesReloader = propertiesReloader; this.timeSupplier = timeSupplier; this.messageReceiver = messageReceiver; this.compactor = compactor; + this.jobStatusStore = jobStatusStore; this.taskStatusStore = taskStore; this.taskId = taskId; } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index 911562f9b7..b69fb66993 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -22,12 +22,14 @@ import org.junit.jupiter.api.Test; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.CompactionTask.CompactionRunner; import sleeper.compaction.job.execution.CompactionTask.MessageHandle; import sleeper.compaction.job.execution.CompactionTask.MessageReceiver; import sleeper.compaction.task.CompactionTaskFinishedStatus; import sleeper.compaction.task.CompactionTaskStatus; import sleeper.compaction.task.CompactionTaskStatusStore; +import sleeper.compaction.testutils.InMemoryCompactionJobStatusStore; import sleeper.compaction.testutils.InMemoryCompactionTaskStatusStore; import sleeper.configuration.properties.PropertiesReloader; import sleeper.configuration.properties.instance.InstanceProperties; @@ -61,6 +63,7 @@ public class CompactionTaskTest { private final Queue jobsOnQueue = new LinkedList<>(); private final List successfulJobs = new ArrayList<>(); private final List failedJobs = new ArrayList<>(); + private final CompactionJobStatusStore jobStore = new InMemoryCompactionJobStatusStore(); private final CompactionTaskStatusStore taskStore = new InMemoryCompactionTaskStatusStore(); @BeforeEach @@ -269,10 +272,10 @@ void shouldResetConsecutiveFailureCountIfJobProcessedSuccessfully() throws Excep } @Nested - @DisplayName("Update task status store") - class UpdateTaskStatusStore { + @DisplayName("Update status stores") + class UpdateStatusStores { @Test - void shouldSaveTaskWhenOneJobSucceeds() throws Exception { + void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start @@ -289,17 +292,15 @@ void shouldSaveTaskWhenOneJobSucceeds() throws Exception { times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), - withJobSummaries(jobSummary)) - .build()); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"), + jobSummary)); } @Test - void shouldSaveTaskWhenMultipleJobsSucceed() throws Exception { + void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start @@ -322,17 +323,15 @@ void shouldSaveTaskWhenMultipleJobsSucceed() throws Exception { times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), - withJobSummaries(job1Summary, job2Summary)) - .build()); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"), + job1Summary, job2Summary)); } @Test - void shouldSaveTaskWhenOneJobFails() throws Exception { + void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start @@ -343,12 +342,10 @@ void shouldSaveTaskWhenOneJobFails() throws Exception { runTask("test-task-1", processJobs(jobFails()), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), noJobSummaries()) - .build()); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"))); } @Test @@ -362,16 +359,10 @@ void shouldSaveTaskWhenNoJobsFound() throws Exception { runTask("test-task-1", processNoJobs(), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), noJobSummaries()) - .build()); - } - - private CompactionTaskFinishedStatus.Builder noJobSummaries() { - return withJobSummaries(); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"))); } private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSummary... summaries) { @@ -379,6 +370,14 @@ private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSu Stream.of(summaries).forEach(taskFinishedBuilder::addJobSummary); return taskFinishedBuilder; } + + private CompactionTaskStatus finishedCompactionTask(String taskId, Instant startTime, Instant finishTime, RecordsProcessedSummary... summaries) { + return CompactionTaskStatus.builder() + .startTime(startTime) + .taskId("test-task-1") + .finished(finishTime, withJobSummaries(summaries)) + .build(); + } } private void runTask(CompactionRunner compactor) throws Exception { @@ -406,7 +405,7 @@ private void runTask( Supplier timeSupplier, String taskId) throws Exception { new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), timeSupplier, messageReceiver, - compactor, taskStore, taskId) + compactor, jobStore, taskStore, taskId) .run(); } From f903894a62815843dc1b251d32d724b7d36590ae Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 11:58:59 +0000 Subject: [PATCH 02/17] Fix compilation failures --- .../compaction/job/execution/ECSCompactionTaskRunner.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java index fd4d43b203..86991d44e8 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java @@ -91,7 +91,7 @@ public static void main(String[] args) throws InterruptedException, IOException, objectFactory, jobStatusStore, taskId); CompactionTask task = new CompactionTask(instanceProperties, propertiesReloader, Instant::now, new SqsCompactionQueueHandler(sqsClient, instanceProperties)::receiveFromSqs, - job -> compactSortedFiles.run(job), taskStatusStore, taskId); + job -> compactSortedFiles.run(job), null, taskStatusStore, taskId); task.run(); sqsClient.shutdown(); From 47c44c5f3d8ba5c8ce3c4486349e49b9455073a8 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:08:08 +0000 Subject: [PATCH 03/17] Start updating tests in CompactionTaskTest to assert on job store --- .../job/execution/CompactionTask.java | 16 ++++++++++++++-- .../job/execution/ECSCompactionTaskRunner.java | 2 +- .../job/execution/CompactionTaskTest.java | 18 +++++++++++++----- .../ECSCompactionTaskRunnerLocalStackIT.java | 2 +- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index b5b6b9a721..22e5c85d88 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -38,6 +38,7 @@ import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; +import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER; /** * Runs a compaction task, updating the {@link CompactionTaskStatusStore} with progress of the task. @@ -107,14 +108,25 @@ public Instant handleMessages(Instant startTime, Consumer compactSortedFiles.run(job), null, taskStatusStore, taskId); + job -> compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId); task.run(); sqsClient.shutdown(); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index 1843fb7f76..bcc49c15ec 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -22,7 +22,6 @@ import org.junit.jupiter.api.Test; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.CompactionTask.CompactionRunner; import sleeper.compaction.job.execution.CompactionTask.MessageHandle; import sleeper.compaction.job.execution.CompactionTask.MessageReceiver; @@ -49,13 +48,17 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.compaction.job.CompactionJobStatusTestData.finishedCompactionRun; +import static sleeper.compaction.job.CompactionJobStatusTestData.jobCreated; import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; import static sleeper.core.record.process.RecordsProcessedSummaryTestData.summary; public class CompactionTaskTest { + private static final String DEFAULT_TABLE_ID = "test-table-id"; private static final String DEFAULT_TASK_ID = "test-task-id"; + private static final Instant DEFAULT_CREATED_TIME = Instant.parse("2024-03-04T10:50:00Z"); private static final Instant DEFAULT_START_TIME = Instant.parse("2024-03-04T11:00:00Z"); private static final Duration DEFAULT_DURATION = Duration.ofSeconds(5); @@ -63,7 +66,7 @@ public class CompactionTaskTest { private final Queue jobsOnQueue = new LinkedList<>(); private final List successfulJobs = new ArrayList<>(); private final List failedJobs = new ArrayList<>(); - private final CompactionJobStatusStore jobStore = new InMemoryCompactionJobStatusStore(); + private final InMemoryCompactionJobStatusStore jobStore = new InMemoryCompactionJobStatusStore(); private final CompactionTaskStatusStore taskStore = new InMemoryCompactionTaskStatusStore(); @BeforeEach @@ -279,9 +282,10 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job started Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); + CompactionJob job = createJobOnQueue("job1"); // When RecordsProcessedSummary jobSummary = summary( @@ -297,6 +301,9 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:05Z"), jobSummary)); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly( + jobCreated(job, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", jobSummary))); } @Test @@ -374,7 +381,7 @@ private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSu private CompactionTaskStatus finishedCompactionTask(String taskId, Instant startTime, Instant finishTime, RecordsProcessedSummary... summaries) { return CompactionTaskStatus.builder() .startTime(startTime) - .taskId("test-task-1") + .taskId(taskId) .finished(finishTime, withJobSummaries(summaries)) .build(); } @@ -412,12 +419,13 @@ private void runTask( private CompactionJob createJobOnQueue(String jobId) { CompactionJob job = createJob(jobId); jobsOnQueue.add(job); + jobStore.jobCreated(job, DEFAULT_CREATED_TIME); return job; } private CompactionJob createJob(String jobId) { return CompactionJob.builder() - .tableId("test-table-id") + .tableId(DEFAULT_TABLE_ID) .jobId(jobId) .partitionId("root") .inputFiles(List.of(UUID.randomUUID().toString())) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java index 029aaf73b8..806a10b112 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java @@ -344,7 +344,7 @@ private CompactionTask createTask(String taskId, StateStoreProvider stateStorePr ObjectFactory.noUserJars(), jobStatusStore, taskId); CompactionTask task = new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), Instant::now, new SqsCompactionQueueHandler(sqs, instanceProperties)::receiveFromSqs, - job -> compactSortedFiles.run(job), taskStatusStore, taskId); + job -> compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId); return task; } From 4dea26f41522925f65d98406ede685551646c7a6 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:10:36 +0000 Subject: [PATCH 04/17] Fix failing tests --- .../sleeper/compaction/job/execution/CompactionTaskTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index bcc49c15ec..1bb5c6a238 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -311,7 +311,9 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job 1 started Instant.parse("2024-02-22T13:50:02Z"), // Job 1 completed + Instant.parse("2024-02-22T13:50:03Z"), // Job 2 started Instant.parse("2024-02-22T13:50:04Z"), // Job 2 completed Instant.parse("2024-02-22T13:50:05Z"))); // Finish createJobOnQueue("job1"); @@ -342,6 +344,7 @@ void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job started Instant.parse("2024-02-22T13:50:05Z"))); // Finish createJobOnQueue("job1"); From 42677032e8f82c75ec488936e9b50d69db49b661 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:13:24 +0000 Subject: [PATCH 05/17] Update remaining tests to assert on job store --- .../job/execution/CompactionTaskTest.java | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index 1bb5c6a238..37b2e7b671 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -50,6 +50,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static sleeper.compaction.job.CompactionJobStatusTestData.finishedCompactionRun; import static sleeper.compaction.job.CompactionJobStatusTestData.jobCreated; +import static sleeper.compaction.job.CompactionJobStatusTestData.startedCompactionRun; import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; @@ -316,8 +317,8 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { Instant.parse("2024-02-22T13:50:03Z"), // Job 2 started Instant.parse("2024-02-22T13:50:04Z"), // Job 2 completed Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); - createJobOnQueue("job2"); + CompactionJob job1 = createJobOnQueue("job1"); + CompactionJob job2 = createJobOnQueue("job2"); // When RecordsProcessedSummary job1Summary = summary( @@ -337,6 +338,11 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:05Z"), job1Summary, job2Summary)); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactlyInAnyOrder( + jobCreated(job1, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", job1Summary)), + jobCreated(job2, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", job2Summary))); } @Test @@ -346,7 +352,7 @@ void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { Instant.parse("2024-02-22T13:50:00Z"), // Start Instant.parse("2024-02-22T13:50:01Z"), // Job started Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); + CompactionJob job = createJobOnQueue("job1"); // When runTask("test-task-1", processJobs(jobFails()), times::poll); @@ -356,6 +362,9 @@ void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:05Z"))); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly( + jobCreated(job, DEFAULT_CREATED_TIME, + startedCompactionRun("test-task-1", Instant.parse("2024-02-22T13:50:01Z")))); } @Test @@ -373,6 +382,7 @@ void shouldSaveTaskWhenNoJobsFound() throws Exception { finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:05Z"))); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).isEmpty(); } private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSummary... summaries) { From a607bf7e0cdef3a2148a45c7003f09703bf42cbe Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:28:59 +0000 Subject: [PATCH 06/17] Remove job status store from CompactSortedFiles --- .../job/execution/CompactSortedFiles.java | 30 +------ .../job/execution/CompactionTask.java | 8 +- .../execution/ECSCompactionTaskRunner.java | 3 +- .../CompactSortedFilesEmptyOutputIT.java | 10 +-- .../job/execution/CompactSortedFilesIT.java | 14 ++-- .../CompactSortedFilesIteratorIT.java | 6 +- .../CompactSortedFilesLocalStackIT.java | 14 ++-- .../CompactSortedFilesReportingIT.java | 78 ------------------- .../job/execution/CompactionTaskTest.java | 27 ++++--- .../ECSCompactionTaskRunnerLocalStackIT.java | 2 +- .../testutils/CompactSortedFilesTestBase.java | 11 +-- 11 files changed, 47 insertions(+), 156 deletions(-) delete mode 100644 java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java index 3591e5af19..8742477ede 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.jars.ObjectFactoryException; import sleeper.configuration.properties.instance.InstanceProperties; @@ -38,7 +37,6 @@ import sleeper.core.partition.Partition; import sleeper.core.record.Record; import sleeper.core.record.process.RecordsProcessed; -import sleeper.core.record.process.RecordsProcessedSummary; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.StateStore; @@ -53,14 +51,12 @@ import sleeper.statestore.StateStoreProvider; import java.io.IOException; -import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; -import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER; import static sleeper.sketches.s3.SketchesSerDeToS3.sketchesPathForDataFile; /** @@ -72,41 +68,21 @@ public class CompactSortedFiles { private final TablePropertiesProvider tablePropertiesProvider; private final ObjectFactory objectFactory; private final StateStoreProvider stateStoreProvider; - private final CompactionJobStatusStore jobStatusStore; - private final String taskId; private static final Logger LOGGER = LoggerFactory.getLogger(CompactSortedFiles.class); public CompactSortedFiles( InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider, - StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, CompactionJobStatusStore jobStatusStore, - String taskId) { + StateStoreProvider stateStoreProvider, ObjectFactory objectFactory) { this.instanceProperties = instanceProperties; this.tablePropertiesProvider = tablePropertiesProvider; this.objectFactory = objectFactory; this.stateStoreProvider = stateStoreProvider; - this.jobStatusStore = jobStatusStore; - this.taskId = taskId; } - public RecordsProcessedSummary run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { - Instant startTime = Instant.now(); - String id = compactionJob.getId(); - LOGGER.info("Compaction job {}: compaction called at {}", id, startTime); - jobStatusStore.jobStarted(compactionJob, startTime, taskId); - + public RecordsProcessed run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { RecordsProcessed recordsProcessed = compact(compactionJob); - - Instant finishTime = Instant.now(); - // Print summary - LOGGER.info("Compaction job {}: finished at {}", id, finishTime); - - RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, startTime, finishTime); - METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds()); - METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", id, summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond())); - METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(), String.format("%.1f", summary.getRecordsWrittenPerSecond())); - jobStatusStore.jobFinished(compactionJob, summary, taskId); - return summary; + return recordsProcessed; } private RecordsProcessed compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index 22e5c85d88..f3e592562e 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -26,6 +26,7 @@ import sleeper.compaction.task.CompactionTaskStatusStore; import sleeper.configuration.properties.PropertiesReloader; import sleeper.configuration.properties.instance.InstanceProperties; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.record.process.RecordsProcessedSummary; import sleeper.core.util.LoggedDuration; @@ -114,12 +115,13 @@ public Instant handleMessages(Instant startTime, Consumer compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java index 89ed5881e8..6ba84be9fd 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java @@ -21,7 +21,7 @@ import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; @@ -54,8 +54,8 @@ void shouldMergeFilesCorrectlyWhenSomeAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results @@ -88,8 +88,8 @@ void shouldMergeFilesCorrectlyWhenAllAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java index 3235e0ea36..744e29f98a 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java @@ -25,7 +25,7 @@ import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.ByteArrayType; import sleeper.core.schema.type.LongType; @@ -58,8 +58,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithLongKey() throws Exception assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results @@ -115,8 +115,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithStringKey() throws Exceptio assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results @@ -179,8 +179,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithByteArrayKey() throws Excep assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java index c7ed29d639..a2af88b083 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java @@ -25,7 +25,7 @@ import sleeper.core.iterator.impl.AgeOffIterator; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; @@ -67,8 +67,8 @@ void shouldApplyIteratorDuringCompaction() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output files and check that they contain the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java index df02bef783..3ab4e8d4fe 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java @@ -31,10 +31,8 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; -import sleeper.compaction.status.store.job.CompactionJobStatusStoreFactory; import sleeper.compaction.status.store.job.DynamoDBCompactionJobStatusStoreCreator; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.table.FixedTablePropertiesProvider; @@ -43,7 +41,7 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; @@ -78,7 +76,6 @@ public class CompactSortedFilesLocalStackIT extends CompactSortedFilesTestBase { private static AmazonDynamoDB dynamoDBClient; private static AmazonS3 s3Client; private static S3AsyncClient s3AsyncClient; - private CompactionJobStatusStore jobStatusStore; @BeforeAll public static void beforeAll() { @@ -100,7 +97,6 @@ void setUp() { s3Client.createBucket(instanceProperties.get(DATA_BUCKET)); new S3StateStoreCreator(instanceProperties, dynamoDBClient).create(); DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDBClient); - jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); } protected FileReference ingestRecordsGetFile(StateStore stateStore, List records) throws Exception { @@ -117,12 +113,12 @@ private StateStore createStateStore(Schema schema) { .getStateStore(tableProperties); } - private CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob, StateStore stateStore) throws Exception { + private CompactSortedFiles createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception { tableProperties.setSchema(schema); return new CompactSortedFiles(instanceProperties, new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), - ObjectFactory.noUserJars(), jobStatusStore, DEFAULT_TASK_ID); + ObjectFactory.noUserJars()); } @Test @@ -143,8 +139,8 @@ public void shouldUpdateStateStoreAfterRunningCompactionJob() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob, stateStore); - RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, stateStore); + RecordsProcessed summary = compactSortedFiles.run(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java deleted file mode 100644 index 882c1bc72b..0000000000 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.compaction.job.execution; - -import org.junit.jupiter.api.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; - -import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; -import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; -import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; -import sleeper.core.partition.PartitionsBuilder; -import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; -import sleeper.core.schema.Schema; -import sleeper.core.schema.type.LongType; -import sleeper.core.statestore.FileReference; - -import java.time.Instant; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static sleeper.compaction.job.execution.testutils.CompactSortedFilesTestUtils.assignJobIdToInputFiles; -import static sleeper.compaction.job.execution.testutils.CompactSortedFilesTestUtils.createSchemaWithTypesForKeyAndTwoValues; - -class CompactSortedFilesReportingIT extends CompactSortedFilesTestBase { - - private final CompactionJobStatusStore jobStatusStore = mock(CompactionJobStatusStore.class); - - @Test - void shouldReportJobStatusUpdatesWhenCompacting() throws Exception { - // Given - Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); - tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); - - List data1 = CompactSortedFilesTestData.keyAndTwoValuesSortedEvenLongs(); - List data2 = CompactSortedFilesTestData.keyAndTwoValuesSortedOddLongs(); - FileReference file1 = ingestRecordsGetFile(data1); - FileReference file2 = ingestRecordsGetFile(data2); - - CompactionJob compactionJob = compactionFactory().createCompactionJob(List.of(file1, file2), "root"); - assignJobIdToInputFiles(stateStore, compactionJob); - - // When - RecordsProcessedSummary summary = createCompactSortedFiles(schema, compactionJob, jobStatusStore).run(compactionJob); - - // Then - InOrder order = Mockito.inOrder(jobStatusStore); - order.verify(jobStatusStore).jobStarted(eq(compactionJob), any(Instant.class), eq(DEFAULT_TASK_ID)); - order.verify(jobStatusStore).jobFinished(compactionJob, summary, DEFAULT_TASK_ID); - order.verifyNoMoreInteractions(); - - assertThat(summary.getStartTime()).isNotNull(); - assertThat(summary.getFinishTime()).isNotNull(); - assertThat(summary.getDurationInSeconds()).isPositive(); - assertThat(summary.getRecordsReadPerSecond()).isPositive(); - assertThat(summary.getRecordsWrittenPerSecond()).isPositive(); - } - -} diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index 37b2e7b671..1f4843ebd6 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -165,7 +165,8 @@ void shouldTerminateAfterRunningJobAndWaitingForIdleTime() throws Exception { instanceProperties.setNumber(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS, 3); Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start - Instant.parse("2024-02-22T13:50:01Z"), // Job completed + Instant.parse("2024-02-22T13:50:01Z"), // Job started + Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:05Z"))); // Idle time check with empty queue and finish CompactionJob job = createJobOnQueue("job1"); @@ -186,6 +187,7 @@ void shouldTerminateWhenMaxIdleTimeNotMetOnFirstCheckThenIdleAfterProcessingJob( Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start Instant.parse("2024-02-22T13:50:01Z"), // First check + Instant.parse("2024-02-22T13:50:02Z"), // Job started Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:06Z"))); // Second check + finish CompactionJob job = createJob("job1"); @@ -213,7 +215,8 @@ void shouldTerminateWhenMaxIdleTimeNotMetOnFirstCheckThenNotMetAfterProcessingJo Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start Instant.parse("2024-02-22T13:50:01Z"), // First check - Instant.parse("2024-02-22T13:50:02Z"), // Job completed + Instant.parse("2024-02-22T13:50:02Z"), // Job started + Instant.parse("2024-02-22T13:50:03Z"), // Job completed Instant.parse("2024-02-22T13:50:04Z"), // Second check Instant.parse("2024-02-22T13:50:06Z"))); // Third check + finish CompactionJob job = createJob("job1"); @@ -293,7 +296,7 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { Instant.parse("2024-02-22T13:50:01Z"), Instant.parse("2024-02-22T13:50:02Z"), 10L, 10L); runTask("test-task-1", processJobs( - jobSucceeds(jobSummary)), + jobSucceeds(jobSummary.getRecordsProcessed())), times::poll); // Then @@ -328,8 +331,8 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { Instant.parse("2024-02-22T13:50:03Z"), Instant.parse("2024-02-22T13:50:04Z"), 5L, 5L); runTask("test-task-1", processJobs( - jobSucceeds(job1Summary), - jobSucceeds(job2Summary)), + jobSucceeds(job1Summary.getRecordsProcessed()), + jobSucceeds(job2Summary.getRecordsProcessed())), times::poll); // Then @@ -505,16 +508,16 @@ private CompactionRunner jobsSucceed(int numJobs) { .toArray(ProcessJob[]::new)); } - private ProcessJob jobSucceeds(RecordsProcessedSummary summary) { + private ProcessJob jobSucceeds(RecordsProcessed summary) { return new ProcessJob(true, summary); } private ProcessJob jobSucceeds() { - return new ProcessJob(true, 10L, DEFAULT_START_TIME, DEFAULT_DURATION); + return new ProcessJob(true, 10L); } private ProcessJob jobFails() { - return new ProcessJob(false, 0L, DEFAULT_START_TIME, DEFAULT_DURATION); + return new ProcessJob(false, 0L); } private CompactionRunner processNoJobs() { @@ -536,13 +539,13 @@ private CompactionRunner processJobs(ProcessJob... actions) { private class ProcessJob { private final boolean succeed; - private final RecordsProcessedSummary summary; + private final RecordsProcessed summary; - ProcessJob(boolean succeed, long records, Instant startTime, Duration duration) { - this(succeed, new RecordsProcessedSummary(new RecordsProcessed(records, records), startTime, duration)); + ProcessJob(boolean succeed, long records) { + this(succeed, new RecordsProcessed(records, records)); } - ProcessJob(boolean succeed, RecordsProcessedSummary summary) { + ProcessJob(boolean succeed, RecordsProcessed summary) { this.succeed = succeed; this.summary = summary; } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java index 806a10b112..c8562bd268 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java @@ -341,7 +341,7 @@ private CompactionTask createTask(String taskId) { private CompactionTask createTask(String taskId, StateStoreProvider stateStoreProvider) { CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties, tablePropertiesProvider, stateStoreProvider, - ObjectFactory.noUserJars(), jobStatusStore, taskId); + ObjectFactory.noUserJars()); CompactionTask task = new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), Instant::now, new SqsCompactionQueueHandler(sqs, instanceProperties)::receiveFromSqs, job -> compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java index fe6c95c03f..46bc1a18f1 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java @@ -18,9 +18,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; -import sleeper.compaction.job.CompactionJob; import sleeper.compaction.job.CompactionJobFactory; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.CompactSortedFiles; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.instance.InstanceProperties; @@ -68,17 +66,12 @@ protected CompactionJobFactory compactionFactory() { return new CompactionJobFactory(instanceProperties, tableProperties); } - protected CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob) throws Exception { - return createCompactSortedFiles(schema, compactionJob, CompactionJobStatusStore.NONE); - } - - protected CompactSortedFiles createCompactSortedFiles( - Schema schema, CompactionJob compactionJob, CompactionJobStatusStore statusStore) throws Exception { + protected CompactSortedFiles createCompactSortedFiles(Schema schema) throws Exception { tableProperties.setSchema(schema); return new CompactSortedFiles(instanceProperties, new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), - ObjectFactory.noUserJars(), statusStore, DEFAULT_TASK_ID); + ObjectFactory.noUserJars()); } protected FileReference ingestRecordsGetFile(List records) throws Exception { From 91c9c58c9d2a82973e31444afb6a15b19487319b Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:30:10 +0000 Subject: [PATCH 07/17] Introduce variable for jobFinishTime --- .../sleeper/compaction/job/execution/CompactionTask.java | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index f3e592562e..a9a3fead4b 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -116,8 +116,8 @@ public Instant handleMessages(Instant startTime, Consumer Date: Mon, 11 Mar 2024 12:32:31 +0000 Subject: [PATCH 08/17] Remove unnecessary delegation --- .../sleeper/compaction/job/execution/CompactSortedFiles.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java index 8742477ede..71c435bca5 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java @@ -81,11 +81,6 @@ public CompactSortedFiles( } public RecordsProcessed run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { - RecordsProcessed recordsProcessed = compact(compactionJob); - return recordsProcessed; - } - - private RecordsProcessed compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { TableProperties tableProperties = tablePropertiesProvider.getById(compactionJob.getTableId()); Schema schema = tableProperties.getSchema(); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); From 0c41e3c2723df67b80f7af63ed4a029ffafd7408 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 12:37:31 +0000 Subject: [PATCH 09/17] Remove unused test variables --- .../sleeper/compaction/job/execution/CompactionTaskTest.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index 1f4843ebd6..d19a805d1b 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -35,7 +35,6 @@ import sleeper.core.record.process.RecordsProcessed; import sleeper.core.record.process.RecordsProcessedSummary; -import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Iterator; @@ -60,8 +59,6 @@ public class CompactionTaskTest { private static final String DEFAULT_TABLE_ID = "test-table-id"; private static final String DEFAULT_TASK_ID = "test-task-id"; private static final Instant DEFAULT_CREATED_TIME = Instant.parse("2024-03-04T10:50:00Z"); - private static final Instant DEFAULT_START_TIME = Instant.parse("2024-03-04T11:00:00Z"); - private static final Duration DEFAULT_DURATION = Duration.ofSeconds(5); private final InstanceProperties instanceProperties = createTestInstanceProperties(); private final Queue jobsOnQueue = new LinkedList<>(); From b6ee9b562bc83d6a19215d5aeab6f8443600cc03 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:27:59 +0000 Subject: [PATCH 10/17] Move job started call into try/catch block --- .../job/execution/CompactionTask.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index a9a3fead4b..f6bd2a46ac 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -109,11 +109,10 @@ public Instant handleMessages(Instant startTime, Consumer Date: Mon, 11 Mar 2024 13:32:31 +0000 Subject: [PATCH 11/17] Extract methods from handleMessages method --- .../job/execution/CompactionTask.java | 44 ++++++++++++------- 1 file changed, 28 insertions(+), 16 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index f6bd2a46ac..5840de74eb 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -110,26 +110,12 @@ public Instant handleMessages(Instant startTime, Consumer receiveMessage() throws InterruptedException, IOException; From 0a6616f406d2561a6498da78e2dd99d095f8d44a Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:33:11 +0000 Subject: [PATCH 12/17] Remove incorrect copied lines --- .../java/sleeper/compaction/job/execution/CompactionTask.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index 5840de74eb..a59d75bea6 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -134,8 +134,7 @@ private RecordsProcessedSummary compact(CompactionJob job) throws Exception { RecordsProcessed recordsProcessed = compactor.compact(job); Instant jobFinishTime = timeSupplier.get(); RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime); - totalNumberOfMessagesProcessed++; - numConsecutiveFailures = 0; + ; jobStatusStore.jobFinished(job, summary, taskId); logMetrics(job, summary); return summary; From 61ab61601163e3c4d0abea538c2711db8b30712f Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:33:35 +0000 Subject: [PATCH 13/17] Remove lone semicolon --- .../java/sleeper/compaction/job/execution/CompactionTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index a59d75bea6..a0f9c9e283 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -134,7 +134,6 @@ private RecordsProcessedSummary compact(CompactionJob job) throws Exception { RecordsProcessed recordsProcessed = compactor.compact(job); Instant jobFinishTime = timeSupplier.get(); RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime); - ; jobStatusStore.jobFinished(job, summary, taskId); logMetrics(job, summary); return summary; From 090338849b1ad68d05d403b3f1f78c08baabecd7 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:33:50 +0000 Subject: [PATCH 14/17] Remove comment --- .../java/sleeper/compaction/job/execution/CompactionTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index a0f9c9e283..c0bd25ed9b 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -140,7 +140,6 @@ private RecordsProcessedSummary compact(CompactionJob job) throws Exception { } private void logMetrics(CompactionJob job, RecordsProcessedSummary summary) { - // Print summary LOGGER.info("Compaction job {}: finished at {}", job.getId(), summary.getFinishTime()); METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", job.getId(), summary.getDurationInSeconds()); METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", job.getId(), From 9ae61e3e4746f77b38c3ff3e2fa9633db65bb96a Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 13:35:27 +0000 Subject: [PATCH 15/17] Change type of parameter to CompactionTaskFinishedStatus.Builder --- .../sleeper/compaction/job/execution/CompactionTask.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index c0bd25ed9b..d62ff266a3 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -34,7 +34,6 @@ import java.time.Duration; import java.time.Instant; import java.util.Optional; -import java.util.function.Consumer; import java.util.function.Supplier; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; @@ -79,7 +78,7 @@ public void run() throws InterruptedException, IOException { LOGGER.info("Starting task {}", taskId); taskStatusStore.taskStarted(taskStatusBuilder.build()); CompactionTaskFinishedStatus.Builder taskFinishedBuilder = CompactionTaskFinishedStatus.builder(); - Instant finishTime = handleMessages(startTime, taskFinishedBuilder::addJobSummary); + Instant finishTime = handleMessages(startTime, taskFinishedBuilder); if (numConsecutiveFailures >= maxConsecutiveFailures) { LOGGER.info("Terminating compaction task as {} consecutive failures exceeds maximum of {}", numConsecutiveFailures, maxConsecutiveFailures); @@ -91,7 +90,7 @@ public void run() throws InterruptedException, IOException { taskStatusStore.taskFinished(taskFinished); } - public Instant handleMessages(Instant startTime, Consumer summaryConsumer) throws InterruptedException, IOException { + public Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.Builder taskFinishedBuilder) throws InterruptedException, IOException { Instant lastActiveTime = startTime; while (numConsecutiveFailures < maxConsecutiveFailures) { Optional messageOpt = messageReceiver.receiveMessage(); @@ -111,7 +110,7 @@ public Instant handleMessages(Instant startTime, Consumer Date: Mon, 11 Mar 2024 13:42:57 +0000 Subject: [PATCH 16/17] Move definition of summaries to assertions --- .../job/execution/CompactionTaskTest.java | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index d19a805d1b..d551ebe898 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -53,7 +53,6 @@ import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; -import static sleeper.core.record.process.RecordsProcessedSummaryTestData.summary; public class CompactionTaskTest { private static final String DEFAULT_TABLE_ID = "test-table-id"; @@ -289,14 +288,15 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { CompactionJob job = createJobOnQueue("job1"); // When - RecordsProcessedSummary jobSummary = summary( - Instant.parse("2024-02-22T13:50:01Z"), - Instant.parse("2024-02-22T13:50:02Z"), 10L, 10L); + RecordsProcessed recordsProcessed = new RecordsProcessed(10L, 10L); runTask("test-task-1", processJobs( - jobSucceeds(jobSummary.getRecordsProcessed())), + jobSucceeds(recordsProcessed)), times::poll); // Then + RecordsProcessedSummary jobSummary = new RecordsProcessedSummary(recordsProcessed, + Instant.parse("2024-02-22T13:50:01Z"), + Instant.parse("2024-02-22T13:50:02Z")); assertThat(taskStore.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), @@ -321,18 +321,20 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { CompactionJob job2 = createJobOnQueue("job2"); // When - RecordsProcessedSummary job1Summary = summary( - Instant.parse("2024-02-22T13:50:01Z"), - Instant.parse("2024-02-22T13:50:02Z"), 10L, 10L); - RecordsProcessedSummary job2Summary = summary( - Instant.parse("2024-02-22T13:50:03Z"), - Instant.parse("2024-02-22T13:50:04Z"), 5L, 5L); + RecordsProcessed job1RecordsProcessed = new RecordsProcessed(10L, 10L); + RecordsProcessed job2RecordsProcessed = new RecordsProcessed(5L, 5L); runTask("test-task-1", processJobs( - jobSucceeds(job1Summary.getRecordsProcessed()), - jobSucceeds(job2Summary.getRecordsProcessed())), + jobSucceeds(job1RecordsProcessed), + jobSucceeds(job2RecordsProcessed)), times::poll); // Then + RecordsProcessedSummary job1Summary = new RecordsProcessedSummary(job1RecordsProcessed, + Instant.parse("2024-02-22T13:50:01Z"), + Instant.parse("2024-02-22T13:50:02Z")); + RecordsProcessedSummary job2Summary = new RecordsProcessedSummary(job2RecordsProcessed, + Instant.parse("2024-02-22T13:50:03Z"), + Instant.parse("2024-02-22T13:50:04Z")); assertThat(taskStore.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), From 97a161df2920419ebaab4c220f3ac1bd57c81633 Mon Sep 17 00:00:00 2001 From: kr565370 <112558283+kr565370@users.noreply.github.com> Date: Mon, 11 Mar 2024 14:31:05 +0000 Subject: [PATCH 17/17] Remove duplicate call to jobFinished --- .../java/sleeper/compaction/job/execution/CompactionTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index c657b97e5c..50ce8f3243 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -162,7 +162,6 @@ private void logMetrics(CompactionJob job, RecordsProcessedSummary summary) { summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond())); METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", job.getId(), summary.getRecordsWritten(), String.format("%.1f", summary.getRecordsWrittenPerSecond())); - jobStatusStore.jobFinished(job, summary, taskId); } @FunctionalInterface