diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java index 685351bdc4..d1363d8760 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactSortedFiles.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.jars.ObjectFactoryException; import sleeper.configuration.properties.instance.InstanceProperties; @@ -38,7 +37,6 @@ import sleeper.core.partition.Partition; import sleeper.core.record.Record; import sleeper.core.record.process.RecordsProcessed; -import sleeper.core.record.process.RecordsProcessedSummary; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.StateStore; @@ -53,14 +51,12 @@ import sleeper.statestore.StateStoreProvider; import java.io.IOException; -import java.time.Instant; import java.time.LocalDateTime; import java.util.ArrayList; import java.util.List; import java.util.NoSuchElementException; import java.util.Objects; -import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER; import static sleeper.sketches.s3.SketchesSerDeToS3.sketchesPathForDataFile; /** @@ -72,44 +68,19 @@ public class CompactSortedFiles implements CompactionTask.CompactionRunner { private final TablePropertiesProvider tablePropertiesProvider; private final ObjectFactory objectFactory; private final StateStoreProvider stateStoreProvider; - private final CompactionJobStatusStore jobStatusStore; - private final String taskId; private static final Logger LOGGER = LoggerFactory.getLogger(CompactSortedFiles.class); public CompactSortedFiles( InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider, - StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, CompactionJobStatusStore jobStatusStore, - String taskId) { + StateStoreProvider stateStoreProvider, ObjectFactory objectFactory) { this.instanceProperties = instanceProperties; this.tablePropertiesProvider = tablePropertiesProvider; this.objectFactory = objectFactory; this.stateStoreProvider = stateStoreProvider; - this.jobStatusStore = jobStatusStore; - this.taskId = taskId; } - public RecordsProcessedSummary compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { - Instant startTime = Instant.now(); - String id = compactionJob.getId(); - LOGGER.info("Compaction job {}: compaction called at {}", id, startTime); - jobStatusStore.jobStarted(compactionJob, startTime, taskId); - - RecordsProcessed recordsProcessed = compactUntimed(compactionJob); - - Instant finishTime = Instant.now(); - // Print summary - LOGGER.info("Compaction job {}: finished at {}", id, finishTime); - - RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, startTime, finishTime); - METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds()); - METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", id, summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond())); - METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(), String.format("%.1f", summary.getRecordsWrittenPerSecond())); - jobStatusStore.jobFinished(compactionJob, summary, taskId); - return summary; - } - - private RecordsProcessed compactUntimed(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { + public RecordsProcessed compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException { TableProperties tableProperties = tablePropertiesProvider.getById(compactionJob.getTableId()); Schema schema = tableProperties.getSchema(); StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java index 14c60185b3..50ce8f3243 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/CompactionTask.java @@ -20,11 +20,13 @@ import org.slf4j.LoggerFactory; import sleeper.compaction.job.CompactionJob; +import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.task.CompactionTaskFinishedStatus; import sleeper.compaction.task.CompactionTaskStatus; import sleeper.compaction.task.CompactionTaskStatusStore; import sleeper.configuration.properties.PropertiesReloader; import sleeper.configuration.properties.instance.InstanceProperties; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.record.process.RecordsProcessedSummary; import sleeper.core.util.LoggedDuration; @@ -38,6 +40,7 @@ import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; +import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER; /** * Runs a compaction task, updating the {@link CompactionTaskStatusStore} with progress of the task. @@ -52,6 +55,7 @@ public class CompactionTask { private final Duration delayBeforeRetry; private final MessageReceiver messageReceiver; private final CompactionRunner compactor; + private final CompactionJobStatusStore jobStatusStore; private final CompactionTaskStatusStore taskStatusStore; private final String taskId; private final PropertiesReloader propertiesReloader; @@ -59,13 +63,14 @@ public class CompactionTask { private int totalNumberOfMessagesProcessed = 0; public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, - MessageReceiver messageReceiver, CompactionRunner compactor, CompactionTaskStatusStore taskStore, String taskId) { - this(instanceProperties, propertiesReloader, messageReceiver, compactor, taskStore, taskId, Instant::now, threadSleep()); + MessageReceiver messageReceiver, CompactionRunner compactor, + CompactionJobStatusStore jobStore, CompactionTaskStatusStore taskStore, String taskId) { + this(instanceProperties, propertiesReloader, messageReceiver, compactor, jobStore, taskStore, taskId, Instant::now, threadSleep()); } public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, - MessageReceiver messageReceiver, CompactionRunner compactor, CompactionTaskStatusStore taskStore, String taskId, - Supplier timeSupplier, Consumer sleepForTime) { + MessageReceiver messageReceiver, CompactionRunner compactor, CompactionJobStatusStore jobStore, + CompactionTaskStatusStore taskStore, String taskId, Supplier timeSupplier, Consumer sleepForTime) { maxIdleTime = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS)); maxConsecutiveFailures = instanceProperties.getInt(COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES); delayBeforeRetry = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS)); @@ -74,6 +79,7 @@ public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader this.sleepForTime = sleepForTime; this.messageReceiver = messageReceiver; this.compactor = compactor; + this.jobStatusStore = jobStore; this.taskStatusStore = taskStore; this.taskId = taskId; } @@ -84,7 +90,7 @@ public void run() throws IOException { LOGGER.info("Starting task {}", taskId); taskStatusStore.taskStarted(taskStatusBuilder.build()); CompactionTaskFinishedStatus.Builder taskFinishedBuilder = CompactionTaskFinishedStatus.builder(); - Instant finishTime = handleMessages(startTime, taskFinishedBuilder::addJobSummary); + Instant finishTime = handleMessages(startTime, taskFinishedBuilder); if (numConsecutiveFailures >= maxConsecutiveFailures) { LOGGER.info("Terminating compaction task as {} consecutive failures exceeds maximum of {}", numConsecutiveFailures, maxConsecutiveFailures); @@ -96,7 +102,7 @@ public void run() throws IOException { taskStatusStore.taskFinished(taskFinished); } - public Instant handleMessages(Instant startTime, Consumer summaryConsumer) throws IOException { + public Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.Builder taskFinishedBuilder) throws IOException { Instant lastActiveTime = startTime; while (numConsecutiveFailures < maxConsecutiveFailures) { Optional messageOpt = messageReceiver.receiveMessage(); @@ -119,14 +125,13 @@ public Instant handleMessages(Instant startTime, Consumer receiveMessage() throws IOException; @@ -144,7 +171,7 @@ interface MessageReceiver { @FunctionalInterface interface CompactionRunner { - RecordsProcessedSummary compact(CompactionJob job) throws Exception; + RecordsProcessed compact(CompactionJob job) throws Exception; } interface MessageHandle extends AutoCloseable { diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java index 034b4e0abb..5e72fefc22 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java @@ -86,11 +86,10 @@ public static void main(String[] args) throws IOException, ObjectFactoryExceptio ObjectFactory objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp"); CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties, - tablePropertiesProvider, stateStoreProvider, - objectFactory, jobStatusStore, taskId); + tablePropertiesProvider, stateStoreProvider, objectFactory); CompactionTask task = new CompactionTask(instanceProperties, propertiesReloader, new SqsCompactionQueueHandler(sqsClient, instanceProperties), - compactSortedFiles, taskStatusStore, taskId); + compactSortedFiles, jobStatusStore, taskStatusStore, taskId); task.run(); sqsClient.shutdown(); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java index 0adc82efdb..41ad7152c6 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesEmptyOutputIT.java @@ -21,7 +21,7 @@ import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; @@ -54,8 +54,8 @@ void shouldMergeFilesCorrectlyWhenSomeAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -88,8 +88,8 @@ void shouldMergeFilesCorrectlyWhenAllAreEmpty() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java index 424ee2cad5..f743aa3175 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIT.java @@ -25,7 +25,7 @@ import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.ByteArrayType; import sleeper.core.schema.type.LongType; @@ -58,8 +58,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithLongKey() throws Exception assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -115,8 +115,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithStringKey() throws Exceptio assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results @@ -179,8 +179,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithByteArrayKey() throws Excep assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java index 23317ba77e..49c3df2a2d 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesIteratorIT.java @@ -25,7 +25,7 @@ import sleeper.core.iterator.impl.AgeOffIterator; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.statestore.FileReference; import sleeper.core.statestore.FileReferenceFactory; @@ -67,9 +67,8 @@ void shouldApplyIteratorDuringCompaction() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); - + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output files and check that they contain the right results assertThat(summary.getRecordsRead()).isEqualTo(200L); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java index 103c8195d1..5289e1cbe9 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesLocalStackIT.java @@ -31,10 +31,8 @@ import software.amazon.awssdk.services.s3.S3AsyncClient; import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; -import sleeper.compaction.status.store.job.CompactionJobStatusStoreFactory; import sleeper.compaction.status.store.job.DynamoDBCompactionJobStatusStoreCreator; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.table.FixedTablePropertiesProvider; @@ -43,7 +41,7 @@ import sleeper.core.partition.PartitionTree; import sleeper.core.partition.PartitionsBuilder; import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; +import sleeper.core.record.process.RecordsProcessed; import sleeper.core.schema.Schema; import sleeper.core.schema.type.LongType; import sleeper.core.statestore.FileReference; @@ -78,7 +76,6 @@ public class CompactSortedFilesLocalStackIT extends CompactSortedFilesTestBase { private static AmazonDynamoDB dynamoDBClient; private static AmazonS3 s3Client; private static S3AsyncClient s3AsyncClient; - private CompactionJobStatusStore jobStatusStore; @BeforeAll public static void beforeAll() { @@ -100,7 +97,6 @@ void setUp() { s3Client.createBucket(instanceProperties.get(DATA_BUCKET)); new S3StateStoreCreator(instanceProperties, dynamoDBClient).create(); DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDBClient); - jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); } protected FileReference ingestRecordsGetFile(StateStore stateStore, List records) throws Exception { @@ -117,12 +113,12 @@ private StateStore createStateStore(Schema schema) { .getStateStore(tableProperties); } - private CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob, StateStore stateStore) throws Exception { + private CompactSortedFiles createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception { tableProperties.setSchema(schema); return new CompactSortedFiles(instanceProperties, new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), - ObjectFactory.noUserJars(), jobStatusStore, DEFAULT_TASK_ID); + ObjectFactory.noUserJars()); } @Test @@ -143,8 +139,8 @@ public void shouldUpdateStateStoreAfterRunningCompactionJob() throws Exception { assignJobIdToInputFiles(stateStore, compactionJob); // When - CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob, stateStore); - RecordsProcessedSummary summary = compactSortedFiles.compact(compactionJob); + CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, stateStore); + RecordsProcessed summary = compactSortedFiles.compact(compactionJob); // Then // - Read output file and check that it contains the right results diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java deleted file mode 100644 index 0a246d5f7e..0000000000 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactSortedFilesReportingIT.java +++ /dev/null @@ -1,78 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.compaction.job.execution; - -import org.junit.jupiter.api.Test; -import org.mockito.InOrder; -import org.mockito.Mockito; - -import sleeper.compaction.job.CompactionJob; -import sleeper.compaction.job.CompactionJobStatusStore; -import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase; -import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData; -import sleeper.core.partition.PartitionsBuilder; -import sleeper.core.record.Record; -import sleeper.core.record.process.RecordsProcessedSummary; -import sleeper.core.schema.Schema; -import sleeper.core.schema.type.LongType; -import sleeper.core.statestore.FileReference; - -import java.time.Instant; -import java.util.List; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.mock; -import static sleeper.compaction.job.execution.testutils.CompactSortedFilesTestUtils.assignJobIdToInputFiles; -import static sleeper.compaction.job.execution.testutils.CompactSortedFilesTestUtils.createSchemaWithTypesForKeyAndTwoValues; - -class CompactSortedFilesReportingIT extends CompactSortedFilesTestBase { - - private final CompactionJobStatusStore jobStatusStore = mock(CompactionJobStatusStore.class); - - @Test - void shouldReportJobStatusUpdatesWhenCompacting() throws Exception { - // Given - Schema schema = createSchemaWithTypesForKeyAndTwoValues(new LongType(), new LongType(), new LongType()); - tableProperties.setSchema(schema); - stateStore.initialise(new PartitionsBuilder(schema).singlePartition("root").buildList()); - - List data1 = CompactSortedFilesTestData.keyAndTwoValuesSortedEvenLongs(); - List data2 = CompactSortedFilesTestData.keyAndTwoValuesSortedOddLongs(); - FileReference file1 = ingestRecordsGetFile(data1); - FileReference file2 = ingestRecordsGetFile(data2); - - CompactionJob compactionJob = compactionFactory().createCompactionJob(List.of(file1, file2), "root"); - assignJobIdToInputFiles(stateStore, compactionJob); - - // When - RecordsProcessedSummary summary = createCompactSortedFiles(schema, compactionJob, jobStatusStore).compact(compactionJob); - - // Then - InOrder order = Mockito.inOrder(jobStatusStore); - order.verify(jobStatusStore).jobStarted(eq(compactionJob), any(Instant.class), eq(DEFAULT_TASK_ID)); - order.verify(jobStatusStore).jobFinished(compactionJob, summary, DEFAULT_TASK_ID); - order.verifyNoMoreInteractions(); - - assertThat(summary.getStartTime()).isNotNull(); - assertThat(summary.getFinishTime()).isNotNull(); - assertThat(summary.getDurationInSeconds()).isPositive(); - assertThat(summary.getRecordsReadPerSecond()).isPositive(); - assertThat(summary.getRecordsWrittenPerSecond()).isPositive(); - } - -} diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java index ca508f69b8..28922ad73b 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/CompactionTaskTest.java @@ -28,6 +28,7 @@ import sleeper.compaction.task.CompactionTaskFinishedStatus; import sleeper.compaction.task.CompactionTaskStatus; import sleeper.compaction.task.CompactionTaskStatusStore; +import sleeper.compaction.testutils.InMemoryCompactionJobStatusStore; import sleeper.compaction.testutils.InMemoryCompactionTaskStatusStore; import sleeper.configuration.properties.PropertiesReloader; import sleeper.configuration.properties.instance.InstanceProperties; @@ -47,21 +48,24 @@ import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.compaction.job.CompactionJobStatusTestData.finishedCompactionRun; +import static sleeper.compaction.job.CompactionJobStatusTestData.jobCreated; +import static sleeper.compaction.job.CompactionJobStatusTestData.startedCompactionRun; import static sleeper.configuration.properties.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES; import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS; -import static sleeper.core.record.process.RecordsProcessedSummaryTestData.summary; public class CompactionTaskTest { + private static final String DEFAULT_TABLE_ID = "test-table-id"; private static final String DEFAULT_TASK_ID = "test-task-id"; - private static final Instant DEFAULT_START_TIME = Instant.parse("2024-03-04T11:00:00Z"); - private static final Duration DEFAULT_DURATION = Duration.ofSeconds(5); + private static final Instant DEFAULT_CREATED_TIME = Instant.parse("2024-03-04T10:50:00Z"); private final InstanceProperties instanceProperties = createTestInstanceProperties(); private final Queue jobsOnQueue = new LinkedList<>(); private final List successfulJobs = new ArrayList<>(); private final List failedJobs = new ArrayList<>(); + private final InMemoryCompactionJobStatusStore jobStore = new InMemoryCompactionJobStatusStore(); private final CompactionTaskStatusStore taskStore = new InMemoryCompactionTaskStatusStore(); private final List sleeps = new ArrayList<>(); @@ -165,7 +169,8 @@ void shouldTerminateAfterRunningJobAndWaitingForIdleTime() throws Exception { instanceProperties.setNumber(COMPACTION_TASK_DELAY_BEFORE_RETRY_IN_SECONDS, 2); Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start - Instant.parse("2024-02-22T13:50:01Z"), // Job completed + Instant.parse("2024-02-22T13:50:01Z"), // Job started + Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:05Z"))); // Idle time check with empty queue and finish CompactionJob job = createJobOnQueue("job1"); @@ -188,6 +193,7 @@ void shouldTerminateWhenMaxIdleTimeNotMetOnFirstCheckThenIdleAfterProcessingJob( Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start Instant.parse("2024-02-22T13:50:01Z"), // First check + Instant.parse("2024-02-22T13:50:02Z"), // Job started Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:06Z"))); // Second check + finish CompactionJob job = createJob("job1"); @@ -217,7 +223,8 @@ void shouldTerminateWhenMaxIdleTimeNotMetOnFirstCheckThenNotMetAfterProcessingJo Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start Instant.parse("2024-02-22T13:50:01Z"), // First check - Instant.parse("2024-02-22T13:50:02Z"), // Job completed + Instant.parse("2024-02-22T13:50:02Z"), // Job started + Instant.parse("2024-02-22T13:50:03Z"), // Job completed Instant.parse("2024-02-22T13:50:04Z"), // Second check Instant.parse("2024-02-22T13:50:06Z"))); // Third check + finish CompactionJob job = createJob("job1"); @@ -299,86 +306,98 @@ void shouldResetConsecutiveFailureCountIfJobProcessedSuccessfully() throws Excep } @Nested - @DisplayName("Update task status store") - class UpdateTaskStatusStore { + @DisplayName("Update status stores") + class UpdateStatusStores { @Test - void shouldSaveTaskWhenOneJobSucceeds() throws Exception { + void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job started Instant.parse("2024-02-22T13:50:02Z"), // Job completed Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); + CompactionJob job = createJobOnQueue("job1"); // When - RecordsProcessedSummary jobSummary = summary( - Instant.parse("2024-02-22T13:50:01Z"), - Instant.parse("2024-02-22T13:50:02Z"), 10L, 10L); + RecordsProcessed recordsProcessed = new RecordsProcessed(10L, 10L); runTask("test-task-1", processJobs( - jobSucceeds(jobSummary)), + jobSucceeds(recordsProcessed)), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), - withJobSummaries(jobSummary)) - .build()); + RecordsProcessedSummary jobSummary = new RecordsProcessedSummary(recordsProcessed, + Instant.parse("2024-02-22T13:50:01Z"), + Instant.parse("2024-02-22T13:50:02Z")); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"), + jobSummary)); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly( + jobCreated(job, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", jobSummary))); } @Test - void shouldSaveTaskWhenMultipleJobsSucceed() throws Exception { + void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job 1 started Instant.parse("2024-02-22T13:50:02Z"), // Job 1 completed + Instant.parse("2024-02-22T13:50:03Z"), // Job 2 started Instant.parse("2024-02-22T13:50:04Z"), // Job 2 completed Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); - createJobOnQueue("job2"); + CompactionJob job1 = createJobOnQueue("job1"); + CompactionJob job2 = createJobOnQueue("job2"); // When - RecordsProcessedSummary job1Summary = summary( - Instant.parse("2024-02-22T13:50:01Z"), - Instant.parse("2024-02-22T13:50:02Z"), 10L, 10L); - RecordsProcessedSummary job2Summary = summary( - Instant.parse("2024-02-22T13:50:03Z"), - Instant.parse("2024-02-22T13:50:04Z"), 5L, 5L); + RecordsProcessed job1RecordsProcessed = new RecordsProcessed(10L, 10L); + RecordsProcessed job2RecordsProcessed = new RecordsProcessed(5L, 5L); runTask("test-task-1", processJobs( - jobSucceeds(job1Summary), - jobSucceeds(job2Summary)), + jobSucceeds(job1RecordsProcessed), + jobSucceeds(job2RecordsProcessed)), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), - withJobSummaries(job1Summary, job2Summary)) - .build()); + RecordsProcessedSummary job1Summary = new RecordsProcessedSummary(job1RecordsProcessed, + Instant.parse("2024-02-22T13:50:01Z"), + Instant.parse("2024-02-22T13:50:02Z")); + RecordsProcessedSummary job2Summary = new RecordsProcessedSummary(job2RecordsProcessed, + Instant.parse("2024-02-22T13:50:03Z"), + Instant.parse("2024-02-22T13:50:04Z")); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"), + job1Summary, job2Summary)); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactlyInAnyOrder( + jobCreated(job1, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", job1Summary)), + jobCreated(job2, DEFAULT_CREATED_TIME, + finishedCompactionRun("test-task-1", job2Summary))); } @Test - void shouldSaveTaskWhenOneJobFails() throws Exception { + void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { // Given Queue times = new LinkedList<>(List.of( Instant.parse("2024-02-22T13:50:00Z"), // Start + Instant.parse("2024-02-22T13:50:01Z"), // Job started Instant.parse("2024-02-22T13:50:05Z"))); // Finish - createJobOnQueue("job1"); + CompactionJob job = createJobOnQueue("job1"); // When runTask("test-task-1", processJobs(jobFails()), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), noJobSummaries()) - .build()); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"))); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).containsExactly( + jobCreated(job, DEFAULT_CREATED_TIME, + startedCompactionRun("test-task-1", Instant.parse("2024-02-22T13:50:01Z")))); } @Test @@ -392,16 +411,11 @@ void shouldSaveTaskWhenNoJobsFound() throws Exception { runTask("test-task-1", processNoJobs(), times::poll); // Then - assertThat(taskStore.getAllTasks()) - .containsExactly(CompactionTaskStatus.builder() - .startTime(Instant.parse("2024-02-22T13:50:00Z")) - .taskId("test-task-1") - .finished(Instant.parse("2024-02-22T13:50:05Z"), noJobSummaries()) - .build()); - } - - private CompactionTaskFinishedStatus.Builder noJobSummaries() { - return withJobSummaries(); + assertThat(taskStore.getAllTasks()).containsExactly( + finishedCompactionTask("test-task-1", + Instant.parse("2024-02-22T13:50:00Z"), + Instant.parse("2024-02-22T13:50:05Z"))); + assertThat(jobStore.getAllJobs(DEFAULT_TABLE_ID)).isEmpty(); } private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSummary... summaries) { @@ -409,6 +423,14 @@ private CompactionTaskFinishedStatus.Builder withJobSummaries(RecordsProcessedSu Stream.of(summaries).forEach(taskFinishedBuilder::addJobSummary); return taskFinishedBuilder; } + + private CompactionTaskStatus finishedCompactionTask(String taskId, Instant startTime, Instant finishTime, RecordsProcessedSummary... summaries) { + return CompactionTaskStatus.builder() + .startTime(startTime) + .taskId(taskId) + .finished(finishTime, withJobSummaries(summaries)) + .build(); + } } private void runTask(CompactionRunner compactor) throws Exception { @@ -436,19 +458,20 @@ private void runTask( Supplier timeSupplier, String taskId) throws Exception { new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), - messageReceiver, compactor, taskStore, taskId, timeSupplier, sleeps::add) + messageReceiver, compactor, jobStore, taskStore, taskId, timeSupplier, sleeps::add) .run(); } private CompactionJob createJobOnQueue(String jobId) { CompactionJob job = createJob(jobId); jobsOnQueue.add(job); + jobStore.jobCreated(job, DEFAULT_CREATED_TIME); return job; } private CompactionJob createJob(String jobId) { return CompactionJob.builder() - .tableId("test-table-id") + .tableId(DEFAULT_TABLE_ID) .jobId(jobId) .partitionId("root") .inputFiles(List.of(UUID.randomUUID().toString())) @@ -515,16 +538,16 @@ private CompactionRunner jobsSucceed(int numJobs) { .toArray(ProcessJob[]::new)); } - private ProcessJob jobSucceeds(RecordsProcessedSummary summary) { + private ProcessJob jobSucceeds(RecordsProcessed summary) { return new ProcessJob(true, summary); } private ProcessJob jobSucceeds() { - return new ProcessJob(true, 10L, DEFAULT_START_TIME, DEFAULT_DURATION); + return new ProcessJob(true, 10L); } private ProcessJob jobFails() { - return new ProcessJob(false, 0L, DEFAULT_START_TIME, DEFAULT_DURATION); + return new ProcessJob(false, 0L); } private CompactionRunner processNoJobs() { @@ -546,13 +569,13 @@ private CompactionRunner processJobs(ProcessJob... actions) { private class ProcessJob { private final boolean succeed; - private final RecordsProcessedSummary summary; + private final RecordsProcessed summary; - ProcessJob(boolean succeed, long records, Instant startTime, Duration duration) { - this(succeed, new RecordsProcessedSummary(new RecordsProcessed(records, records), startTime, duration)); + ProcessJob(boolean succeed, long records) { + this(succeed, new RecordsProcessed(records, records)); } - ProcessJob(boolean succeed, RecordsProcessedSummary summary) { + ProcessJob(boolean succeed, RecordsProcessed summary) { this.succeed = succeed; this.summary = summary; } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java index 8e563d7de1..5e69657bad 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java @@ -340,10 +340,10 @@ private CompactionTask createTask(String taskId) { private CompactionTask createTask(String taskId, StateStoreProvider stateStoreProvider) { CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties, tablePropertiesProvider, stateStoreProvider, - ObjectFactory.noUserJars(), jobStatusStore, taskId); + ObjectFactory.noUserJars()); CompactionTask task = new CompactionTask(instanceProperties, PropertiesReloader.neverReload(), new SqsCompactionQueueHandler(sqs, instanceProperties), - compactSortedFiles, taskStatusStore, taskId); + compactSortedFiles, jobStatusStore, taskStatusStore, taskId); return task; } diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java index fe6c95c03f..46bc1a18f1 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/testutils/CompactSortedFilesTestBase.java @@ -18,9 +18,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.io.TempDir; -import sleeper.compaction.job.CompactionJob; import sleeper.compaction.job.CompactionJobFactory; -import sleeper.compaction.job.CompactionJobStatusStore; import sleeper.compaction.job.execution.CompactSortedFiles; import sleeper.configuration.jars.ObjectFactory; import sleeper.configuration.properties.instance.InstanceProperties; @@ -68,17 +66,12 @@ protected CompactionJobFactory compactionFactory() { return new CompactionJobFactory(instanceProperties, tableProperties); } - protected CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob) throws Exception { - return createCompactSortedFiles(schema, compactionJob, CompactionJobStatusStore.NONE); - } - - protected CompactSortedFiles createCompactSortedFiles( - Schema schema, CompactionJob compactionJob, CompactionJobStatusStore statusStore) throws Exception { + protected CompactSortedFiles createCompactSortedFiles(Schema schema) throws Exception { tableProperties.setSchema(schema); return new CompactSortedFiles(instanceProperties, new FixedTablePropertiesProvider(tableProperties), new FixedStateStoreProvider(tableProperties, stateStore), - ObjectFactory.noUserJars(), statusStore, DEFAULT_TASK_ID); + ObjectFactory.noUserJars()); } protected FileReference ingestRecordsGetFile(List records) throws Exception {