diff --git a/java/cdk/src/main/java/sleeper/cdk/stack/compaction/CompactionTrackerStack.java b/java/cdk/src/main/java/sleeper/cdk/stack/compaction/CompactionTrackerStack.java index c4646d58d6..0f9718fd36 100644 --- a/java/cdk/src/main/java/sleeper/cdk/stack/compaction/CompactionTrackerStack.java +++ b/java/cdk/src/main/java/sleeper/cdk/stack/compaction/CompactionTrackerStack.java @@ -27,7 +27,7 @@ import sleeper.cdk.stack.core.ManagedPoliciesStack; import sleeper.compaction.status.store.job.DynamoDBCompactionJobTracker; import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat; -import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore; +import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker; import sleeper.core.properties.instance.InstanceProperties; import static sleeper.cdk.util.Utils.removalPolicy; @@ -77,7 +77,7 @@ public CompactionTrackerStack( tasksTable = Table.Builder .create(this, "DynamoDBCompactionTaskStatusTable") - .tableName(DynamoDBCompactionTaskStatusStore.taskStatusTableName(instanceId)) + .tableName(DynamoDBCompactionTaskTracker.taskStatusTableName(instanceId)) .removalPolicy(removalPolicy) .billingMode(BillingMode.PAY_PER_REQUEST) .partitionKey(Attribute.builder() diff --git a/java/clients/src/main/java/sleeper/clients/admin/AdminClientTrackerFactory.java b/java/clients/src/main/java/sleeper/clients/admin/AdminClientTrackerFactory.java index 074b12bfae..aa45986b70 100644 --- a/java/clients/src/main/java/sleeper/clients/admin/AdminClientTrackerFactory.java +++ b/java/clients/src/main/java/sleeper/clients/admin/AdminClientTrackerFactory.java @@ -18,7 +18,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.properties.table.TablePropertiesProvider; import sleeper.core.tracker.compaction.job.CompactionJobTracker; @@ -36,7 +36,7 @@ public interface AdminClientTrackerFactory { CompactionJobTracker loadCompactionJobTracker(InstanceProperties instanceProperties); - CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties); + CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties); IngestJobStatusStore loadIngestJobStatusStore(InstanceProperties instanceProperties); @@ -52,8 +52,8 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance } @Override - public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) { - return CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties); + public CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties) { + return CompactionTaskTrackerFactory.getTracker(dynamoDB, instanceProperties); } @Override diff --git a/java/clients/src/main/java/sleeper/clients/admin/CompactionStatusReportScreen.java b/java/clients/src/main/java/sleeper/clients/admin/CompactionStatusReportScreen.java index 8d6d3ea555..5d7cdfdb31 100644 --- a/java/clients/src/main/java/sleeper/clients/admin/CompactionStatusReportScreen.java +++ b/java/clients/src/main/java/sleeper/clients/admin/CompactionStatusReportScreen.java @@ -104,7 +104,7 @@ private void runCompactionJobStatusReport(InstanceProperties properties, TableSt } private void runCompactionTaskStatusReport(InstanceProperties properties, CompactionTaskQuery queryType) { - new CompactionTaskStatusReport(trackers.loadCompactionTaskStatusStore(properties), + new CompactionTaskStatusReport(trackers.loadCompactionTaskTracker(properties), new StandardCompactionTaskStatusReporter(out.printStream()), queryType).run(); confirmReturnToMainScreen(out, in); } diff --git a/java/clients/src/main/java/sleeper/clients/docker/stack/CompactionDockerStack.java b/java/clients/src/main/java/sleeper/clients/docker/stack/CompactionDockerStack.java index c2ed79c2bd..e579231bca 100644 --- a/java/clients/src/main/java/sleeper/clients/docker/stack/CompactionDockerStack.java +++ b/java/clients/src/main/java/sleeper/clients/docker/stack/CompactionDockerStack.java @@ -20,7 +20,7 @@ import software.amazon.awssdk.services.sqs.SqsClient; import sleeper.compaction.status.store.job.DynamoDBCompactionJobTrackerCreator; -import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStoreCreator; +import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTrackerCreator; import sleeper.core.properties.instance.InstanceProperties; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; @@ -44,7 +44,7 @@ public static CompactionDockerStack from(InstanceProperties instanceProperties, public void deploy() { DynamoDBCompactionJobTrackerCreator.create(instanceProperties, dynamoDB); - DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDB); + DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDB); String queueName = "sleeper-" + instanceProperties.get(ID) + "-CompactionJobQ"; String queueUrl = sqsClient.createQueue(request -> request.queueName(queueName)).queueUrl(); instanceProperties.set(COMPACTION_JOB_QUEUE_URL, queueUrl); @@ -53,7 +53,7 @@ public void deploy() { @Override public void tearDown() { DynamoDBCompactionJobTrackerCreator.tearDown(instanceProperties, dynamoDB); - DynamoDBCompactionTaskStatusStoreCreator.tearDown(instanceProperties, dynamoDB); + DynamoDBCompactionTaskTrackerCreator.tearDown(instanceProperties, dynamoDB); sqsClient.deleteQueue(request -> request.queueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL))); } diff --git a/java/clients/src/main/java/sleeper/clients/status/report/CompactionTaskStatusReport.java b/java/clients/src/main/java/sleeper/clients/status/report/CompactionTaskStatusReport.java index e04a665d6f..1f5e4395f9 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/CompactionTaskStatusReport.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/CompactionTaskStatusReport.java @@ -23,7 +23,7 @@ import sleeper.clients.status.report.compaction.task.CompactionTaskQuery; import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReportArguments; import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReporter; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.configuration.properties.S3InstanceProperties; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.tracker.compaction.task.CompactionTaskTracker; @@ -32,21 +32,21 @@ public class CompactionTaskStatusReport { - private final CompactionTaskTracker store; + private final CompactionTaskTracker tracker; private final CompactionTaskStatusReporter reporter; private final CompactionTaskQuery query; public CompactionTaskStatusReport( - CompactionTaskTracker store, + CompactionTaskTracker tracker, CompactionTaskStatusReporter reporter, CompactionTaskQuery query) { - this.store = store; + this.tracker = tracker; this.reporter = reporter; this.query = query; } public void run() { - reporter.report(query, query.run(store)); + reporter.report(query, query.run(tracker)); } public static void main(String[] args) { @@ -65,8 +65,8 @@ public static void main(String[] args) { try { InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, arguments.getInstanceId()); - CompactionTaskTracker statusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); - new CompactionTaskStatusReport(statusStore, arguments.getReporter(), arguments.getQuery()).run(); + CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); + new CompactionTaskStatusReport(tracker, arguments.getReporter(), arguments.getQuery()).run(); } finally { s3Client.shutdown(); dynamoDBClient.shutdown(); diff --git a/java/clients/src/main/java/sleeper/clients/status/report/StatusReport.java b/java/clients/src/main/java/sleeper/clients/status/report/StatusReport.java index e2febac471..a803f06e5a 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/StatusReport.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/StatusReport.java @@ -30,7 +30,7 @@ import sleeper.clients.status.report.job.query.JobQuery; import sleeper.clients.status.report.partitions.PartitionsStatusReporter; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.configuration.properties.S3InstanceProperties; import sleeper.configuration.properties.S3TableProperties; import sleeper.core.properties.instance.InstanceProperties; @@ -56,7 +56,7 @@ public class StatusReport { private final boolean verbose; private final StateStore stateStore; private final CompactionJobTracker compactionJobTracker; - private final CompactionTaskTracker compactionTaskStatusStore; + private final CompactionTaskTracker compactionTaskTracker; private final SqsClient sqsClient; private final QueueMessageCount.Client messageCount; private final TablePropertiesProvider tablePropertiesProvider; @@ -64,14 +64,14 @@ public class StatusReport { public StatusReport( InstanceProperties instanceProperties, TableProperties tableProperties, boolean verbose, StateStore stateStore, - CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskStatusStore, + CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskTracker, SqsClient sqsClient, QueueMessageCount.Client messageCount, TablePropertiesProvider tablePropertiesProvider) { this.instanceProperties = instanceProperties; this.tableProperties = tableProperties; this.verbose = verbose; this.stateStore = stateStore; this.compactionJobTracker = compactionJobTracker; - this.compactionTaskStatusStore = compactionTaskStatusStore; + this.compactionTaskTracker = compactionTaskTracker; this.sqsClient = sqsClient; this.messageCount = messageCount; this.tablePropertiesProvider = tablePropertiesProvider; @@ -92,7 +92,7 @@ private void run() { JobQuery.Type.UNFINISHED).run(); // Tasks - new CompactionTaskStatusReport(compactionTaskStatusStore, + new CompactionTaskStatusReport(compactionTaskTracker, new StandardCompactionTaskStatusReporter(System.out), CompactionTaskQuery.UNFINISHED).run(); @@ -120,11 +120,11 @@ public static void main(String[] args) { StateStoreFactory stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, new Configuration()); StateStore stateStore = stateStoreFactory.getStateStore(tableProperties); CompactionJobTracker compactionJobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient, instanceProperties); - CompactionTaskTracker compactionTaskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); + CompactionTaskTracker compactionTaskTracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); StatusReport statusReport = new StatusReport( instanceProperties, tableProperties, verbose, - stateStore, compactionJobTracker, compactionTaskStatusStore, + stateStore, compactionJobTracker, compactionTaskTracker, sqsClient, QueueMessageCount.withSqsClient(sqsClientV1), tablePropertiesProvider); statusReport.run(); } finally { diff --git a/java/clients/src/main/java/sleeper/clients/status/report/compaction/task/CompactionTaskQuery.java b/java/clients/src/main/java/sleeper/clients/status/report/compaction/task/CompactionTaskQuery.java index 77bc3895ca..286e72d5c9 100644 --- a/java/clients/src/main/java/sleeper/clients/status/report/compaction/task/CompactionTaskQuery.java +++ b/java/clients/src/main/java/sleeper/clients/status/report/compaction/task/CompactionTaskQuery.java @@ -26,7 +26,7 @@ public interface CompactionTaskQuery { CompactionTaskQuery UNFINISHED = CompactionTaskTracker::getTasksInProgress; CompactionTaskQuery ALL = CompactionTaskTracker::getAllTasks; - List run(CompactionTaskTracker store); + List run(CompactionTaskTracker tracker); static CompactionTaskQuery from(String type) { switch (type) { diff --git a/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolder.java b/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolder.java index d0d232430d..33daa6d3b9 100644 --- a/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolder.java +++ b/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolder.java @@ -33,7 +33,7 @@ public class AdminClientProcessTrackerHolder implements AdminClientTrackerFactory { private final Map compactionJobTrackerByInstance = new HashMap<>(); - private final Map compactionTaskStoreByInstance = new HashMap<>(); + private final Map compactionTaskTrackerByInstance = new HashMap<>(); private final Map ingestJobStoreByInstance = new HashMap<>(); private final Map ingestTaskStoreByInstance = new HashMap<>(); private final Map ingestBatcherStoreByInstance = new HashMap<>(); @@ -43,7 +43,7 @@ public void setTracker(String instanceId, CompactionJobTracker tracker) { } public void setTracker(String instanceId, CompactionTaskTracker tracker) { - compactionTaskStoreByInstance.put(instanceId, tracker); + compactionTaskTrackerByInstance.put(instanceId, tracker); } public void setTracker(String instanceId, IngestJobStatusStore tracker) { @@ -65,8 +65,8 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance } @Override - public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) { - return Optional.ofNullable(compactionTaskStoreByInstance.get(instanceProperties.get(ID))) + public CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties) { + return Optional.ofNullable(compactionTaskTrackerByInstance.get(instanceProperties.get(ID))) .orElse(CompactionTaskTracker.NONE); } diff --git a/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolderTest.java b/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolderTest.java index 4399bbb82f..3570a0016b 100644 --- a/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolderTest.java +++ b/java/clients/src/test/java/sleeper/clients/admin/testutils/AdminClientProcessTrackerHolderTest.java @@ -46,7 +46,7 @@ void shouldSetCompactionJobTracker() { } @Test - void shouldSetCompactionTaskStatusStore() { + void shouldSetCompactionTaskTracker() { // Given InMemoryCompactionTaskTracker store = new InMemoryCompactionTaskTracker(); InstanceProperties properties = createValidInstanceProperties(); @@ -56,7 +56,7 @@ void shouldSetCompactionTaskStatusStore() { RunAdminClient runner = runClient().tracker(store); // Then - assertThat(runner.trackers().loadCompactionTaskStatusStore(properties)) + assertThat(runner.trackers().loadCompactionTaskTracker(properties)) .isSameAs(store); } @@ -102,13 +102,13 @@ void shouldReturnNoCompactionJobTracker() { } @Test - void shouldReturnNoCompactionTaskStatusStore() { + void shouldReturnNoCompactionTaskTracker() { // Given InstanceProperties properties = createValidInstanceProperties(); setInstanceProperties(properties); // When / Then - assertThat(runClient().trackers().loadCompactionTaskStatusStore(properties)) + assertThat(runClient().trackers().loadCompactionTaskTracker(properties)) .isSameAs(CompactionTaskTracker.NONE); } diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/task/CompactionTask.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/task/CompactionTask.java index 28c3eaf846..c382eac98d 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/task/CompactionTask.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/task/CompactionTask.java @@ -71,7 +71,7 @@ public class CompactionTask { private final MessageReceiver messageReceiver; private final CompactionRunnerFactory selector; private final CompactionJobTracker jobTracker; - private final CompactionTaskTracker taskStatusStore; + private final CompactionTaskTracker taskTracker; private final CompactionJobCommitterOrSendToLambda jobCommitter; private final String taskId; private final Supplier jobRunIdSupplier; @@ -82,10 +82,10 @@ public CompactionTask(InstanceProperties instanceProperties, TablePropertiesProv PropertiesReloader propertiesReloader, StateStoreProvider stateStoreProvider, MessageReceiver messageReceiver, StateStoreWaitForFiles waitForFiles, CompactionJobCommitterOrSendToLambda jobCommitter, CompactionJobTracker jobStore, - CompactionTaskTracker taskStore, CompactionRunnerFactory selector, String taskId) { + CompactionTaskTracker taskTracker, CompactionRunnerFactory selector, String taskId) { this(instanceProperties, tablePropertiesProvider, propertiesReloader, stateStoreProvider, messageReceiver, waitForFiles, jobCommitter, - jobStore, taskStore, selector, taskId, + jobStore, taskTracker, selector, taskId, () -> UUID.randomUUID().toString(), Instant::now, threadSleep()); } @@ -108,7 +108,7 @@ public CompactionTask( this.messageReceiver = messageReceiver; this.selector = selector; this.jobTracker = jobTracker; - this.taskStatusStore = taskTracker; + this.taskTracker = taskTracker; this.taskId = taskId; this.jobRunIdSupplier = jobRunIdSupplier; this.jobCommitter = jobCommitter; @@ -119,14 +119,14 @@ public void run() throws IOException { Instant startTime = timeSupplier.get(); CompactionTaskStatus.Builder taskStatusBuilder = CompactionTaskStatus.builder().taskId(taskId).startTime(startTime); LOGGER.info("Starting task {}", taskId); - taskStatusStore.taskStarted(taskStatusBuilder.build()); + taskTracker.taskStarted(taskStatusBuilder.build()); CompactionTaskFinishedStatus.Builder taskFinishedBuilder = CompactionTaskFinishedStatus.builder(); Instant finishTime = handleMessages(startTime, taskFinishedBuilder); CompactionTaskStatus taskFinished = taskStatusBuilder.finished(finishTime, taskFinishedBuilder).build(); LOGGER.info("Total number of messages processed = {}", taskFinished.getJobRuns()); LOGGER.info("Total run time = {}", LoggedDuration.withFullOutput(startTime, finishTime)); - taskStatusStore.taskFinished(taskFinished); + taskTracker.taskFinished(taskFinished); } private Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.Builder taskFinishedBuilder) throws IOException { diff --git a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskCommitTest.java b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskCommitTest.java index f6e9705067..7c2b985580 100644 --- a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskCommitTest.java +++ b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskCommitTest.java @@ -402,7 +402,7 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception { // Then RecordsProcessedSummary jobSummary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime); - assertThat(taskStore.getAllTasks()).containsExactly( + assertThat(taskTracker.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", taskStartTime, taskFinishTime, jobSummary)); assertThat(jobTracker.getAllJobs(DEFAULT_TABLE_ID)).containsExactly( compactionJobCreated(job, DEFAULT_CREATED_TIME, @@ -441,7 +441,7 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception { job1StartTime, job1FinishTime); RecordsProcessedSummary job2Summary = new RecordsProcessedSummary(job2RecordsProcessed, job2StartTime, job2FinishTime); - assertThat(taskStore.getAllTasks()).containsExactly( + assertThat(taskTracker.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", taskStartTime, taskFinishTime, job1Summary, job2Summary)); assertThat(jobTracker.getAllJobs(DEFAULT_TABLE_ID)).containsExactlyInAnyOrder( compactionJobCreated(job1, DEFAULT_CREATED_TIME, @@ -467,7 +467,7 @@ void shouldSaveTaskAndJobWhenOneJobFails() throws Exception { runTask("test-task-1", processJobs(jobFails(failure)), times::poll); // Then - assertThat(taskStore.getAllTasks()).containsExactly( + assertThat(taskTracker.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:06Z"))); @@ -491,7 +491,7 @@ void shouldSaveTaskWhenNoJobsFound() throws Exception { runTask("test-task-1", processNoJobs(), times::poll); // Then - assertThat(taskStore.getAllTasks()).containsExactly( + assertThat(taskTracker.getAllTasks()).containsExactly( finishedCompactionTask("test-task-1", Instant.parse("2024-02-22T13:50:00Z"), Instant.parse("2024-02-22T13:50:05Z"))); diff --git a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskTestBase.java b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskTestBase.java index 1a218eea8b..0e94c7c963 100644 --- a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskTestBase.java +++ b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/task/CompactionTaskTestBase.java @@ -82,7 +82,7 @@ public class CompactionTaskTestBase { protected final List consumedJobs = new ArrayList<>(); protected final List jobsReturnedToQueue = new ArrayList<>(); protected final InMemoryCompactionJobTracker jobTracker = new InMemoryCompactionJobTracker(); - protected final CompactionTaskTracker taskStore = new InMemoryCompactionTaskTracker(); + protected final CompactionTaskTracker taskTracker = new InMemoryCompactionTaskTracker(); protected final List sleeps = new ArrayList<>(); protected final List commitRequestsOnQueue = new ArrayList<>(); protected final List foundWaitsForFileAssignment = new ArrayList<>(); @@ -150,7 +150,7 @@ private void runTask( CompactionRunnerFactory selector = (job, properties) -> compactor; new CompactionTask(instanceProperties, tablePropertiesProvider(), PropertiesReloader.neverReload(), stateStoreProvider(), messageReceiver, fileAssignmentCheck, - committer, jobTracker, taskStore, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add) + committer, jobTracker, taskTracker, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add) .run(); } diff --git a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java index e8a89c0375..4b9b7b062b 100644 --- a/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java +++ b/java/compaction/compaction-job-execution/src/main/java/sleeper/compaction/job/execution/ECSCompactionTaskRunner.java @@ -33,7 +33,7 @@ import sleeper.compaction.core.task.CompactionTask; import sleeper.compaction.core.task.StateStoreWaitForFiles; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.configuration.jars.S3UserJarsLoader; import sleeper.configuration.properties.S3InstanceProperties; import sleeper.configuration.properties.S3PropertiesReloader; @@ -94,7 +94,7 @@ public static void main(String[] args) throws IOException, ObjectFactoryExceptio HadoopConfigurationProvider.getConfigurationForECS(instanceProperties)); CompactionJobTracker jobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient, instanceProperties); - CompactionTaskTracker taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, + CompactionTaskTracker taskTracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); String taskId = UUID.randomUUID().toString(); @@ -109,7 +109,7 @@ public static void main(String[] args) throws IOException, ObjectFactoryExceptio tablePropertiesProvider, stateStoreProvider, jobTracker, instanceProperties, sqsClient); CompactionTask task = new CompactionTask(instanceProperties, tablePropertiesProvider, propertiesReloader, stateStoreProvider, new SqsCompactionQueueHandler(sqsClient, instanceProperties), - waitForFiles, committerOrLambda, jobTracker, taskStatusStore, compactionSelector, taskId); + waitForFiles, committerOrLambda, jobTracker, taskTracker, compactionSelector, taskId); task.run(); } finally { sqsClient.shutdown(); diff --git a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java index d97d67d4e9..82b5e0bf1e 100644 --- a/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java +++ b/java/compaction/compaction-job-execution/src/test/java/sleeper/compaction/job/execution/ECSCompactionTaskRunnerLocalStackIT.java @@ -50,8 +50,8 @@ import sleeper.compaction.core.task.StateStoreWaitForFiles; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; import sleeper.compaction.status.store.job.DynamoDBCompactionJobTrackerCreator; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; -import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStoreCreator; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; +import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTrackerCreator; import sleeper.configuration.properties.S3InstanceProperties; import sleeper.configuration.properties.S3TableProperties; import sleeper.configuration.table.index.DynamoDBTableIndexCreator; @@ -146,7 +146,7 @@ public class ECSCompactionTaskRunnerLocalStackIT { private final TableProperties tableProperties = createTable(); private final String tableId = tableProperties.get(TABLE_ID); private final CompactionJobTracker jobTracker = CompactionJobTrackerFactory.getTracker(dynamoDB, instanceProperties); - private final CompactionTaskTracker taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties); + private final CompactionTaskTracker taskTracker = CompactionTaskTrackerFactory.getTracker(dynamoDB, instanceProperties); @AfterEach void tearDown() { @@ -158,7 +158,7 @@ void tearDown() { @BeforeEach void setUp() { DynamoDBCompactionJobTrackerCreator.create(instanceProperties, dynamoDB); - DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDB); + DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDB); } @TempDir @@ -457,7 +457,7 @@ private CompactionTask createTask( StateStoreWaitForFiles waitForFiles = new StateStoreWaitForFiles(tablePropertiesProvider, stateStoreProvider, jobTracker); CompactionTask task = new CompactionTask(instanceProperties, tablePropertiesProvider, PropertiesReloader.neverReload(), stateStoreProvider, new SqsCompactionQueueHandler(sqs, instanceProperties), - waitForFiles, committer, jobTracker, taskStatusStore, selector, taskId, + waitForFiles, committer, jobTracker, taskTracker, selector, taskId, jobRunIdSupplier, timeSupplier, duration -> { }); return task; diff --git a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskStatusStoreFactory.java b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskTrackerFactory.java similarity index 79% rename from java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskStatusStoreFactory.java rename to java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskTrackerFactory.java index af8785c52b..1d77aba391 100644 --- a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskStatusStoreFactory.java +++ b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/CompactionTaskTrackerFactory.java @@ -23,14 +23,14 @@ import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_STATUS_STORE_ENABLED; -public class CompactionTaskStatusStoreFactory { +public class CompactionTaskTrackerFactory { - private CompactionTaskStatusStoreFactory() { + private CompactionTaskTrackerFactory() { } - public static CompactionTaskTracker getStatusStore(AmazonDynamoDB dynamoDB, InstanceProperties properties) { + public static CompactionTaskTracker getTracker(AmazonDynamoDB dynamoDB, InstanceProperties properties) { if (properties.getBoolean(COMPACTION_STATUS_STORE_ENABLED)) { - return new DynamoDBCompactionTaskStatusStore(dynamoDB, properties); + return new DynamoDBCompactionTaskTracker(dynamoDB, properties); } else { return CompactionTaskTracker.NONE; } diff --git a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStore.java b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTracker.java similarity index 95% rename from java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStore.java rename to java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTracker.java index a3b19d5fa0..b5802e65ac 100644 --- a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStore.java +++ b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTracker.java @@ -48,17 +48,17 @@ import static sleeper.dynamodb.tools.DynamoDBUtils.instanceTableName; import static sleeper.dynamodb.tools.DynamoDBUtils.streamPagedItems; -public class DynamoDBCompactionTaskStatusStore implements CompactionTaskTracker { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBCompactionTaskStatusStore.class); +public class DynamoDBCompactionTaskTracker implements CompactionTaskTracker { + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBCompactionTaskTracker.class); private final AmazonDynamoDB dynamoDB; private final String statusTableName; private final DynamoDBCompactionTaskStatusFormat format; - public DynamoDBCompactionTaskStatusStore(AmazonDynamoDB dynamoDB, InstanceProperties properties) { + public DynamoDBCompactionTaskTracker(AmazonDynamoDB dynamoDB, InstanceProperties properties) { this(dynamoDB, properties, Instant::now); } - public DynamoDBCompactionTaskStatusStore( + public DynamoDBCompactionTaskTracker( AmazonDynamoDB dynamoDB, InstanceProperties properties, Supplier getTimeNow) { this.dynamoDB = dynamoDB; this.statusTableName = taskStatusTableName(properties.get(ID)); diff --git a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreator.java b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreator.java similarity index 93% rename from java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreator.java rename to java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreator.java index bc5c8d8bdd..4a23200433 100644 --- a/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreator.java +++ b/java/compaction/compaction-status-store/src/main/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreator.java @@ -30,16 +30,16 @@ import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat.EXPIRY_DATE; import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat.TASK_ID; import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat.UPDATE_TIME; -import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore.taskStatusTableName; +import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker.taskStatusTableName; import static sleeper.core.properties.instance.CommonProperty.ID; import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_STATUS_STORE_ENABLED; import static sleeper.dynamodb.tools.DynamoDBUtils.configureTimeToLive; import static sleeper.dynamodb.tools.DynamoDBUtils.initialiseTable; -public class DynamoDBCompactionTaskStatusStoreCreator { - private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBCompactionTaskStatusStoreCreator.class); +public class DynamoDBCompactionTaskTrackerCreator { + private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBCompactionTaskTrackerCreator.class); - private DynamoDBCompactionTaskStatusStoreCreator() { + private DynamoDBCompactionTaskTrackerCreator() { } public static void create(InstanceProperties properties, AmazonDynamoDB dynamoDB) { diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreatorIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreatorIT.java similarity index 64% rename from java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreatorIT.java rename to java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreatorIT.java index d3ef448476..71eb7b544d 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskStatusStoreCreatorIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/DynamoDBCompactionTaskTrackerCreatorIT.java @@ -30,21 +30,21 @@ import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_STATUS_STORE_ENABLED; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; -public class DynamoDBCompactionTaskStatusStoreCreatorIT extends DynamoDBTestBase { +public class DynamoDBCompactionTaskTrackerCreatorIT extends DynamoDBTestBase { private final InstanceProperties instanceProperties = createTestInstanceProperties(); - private final String tableName = DynamoDBCompactionTaskStatusStore.taskStatusTableName(instanceProperties.get(ID)); + private final String tableName = DynamoDBCompactionTaskTracker.taskStatusTableName(instanceProperties.get(ID)); @Test public void shouldCreateStore() { // When - DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDBClient); - CompactionTaskTracker store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); + DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDBClient); + CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); // Then assertThat(dynamoDBClient.describeTable(tableName)) .extracting(DescribeTableResult::getTable).isNotNull(); - assertThat(store).isInstanceOf(DynamoDBCompactionTaskStatusStore.class); + assertThat(tracker).isInstanceOf(DynamoDBCompactionTaskTracker.class); } @Test @@ -53,20 +53,20 @@ public void shouldNotCreateStoreIfDisabled() { instanceProperties.set(COMPACTION_STATUS_STORE_ENABLED, "false"); // When - DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDBClient); - CompactionTaskTracker store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); + DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDBClient); + CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); // Then assertThatThrownBy(() -> dynamoDBClient.describeTable(tableName)) .isInstanceOf(ResourceNotFoundException.class); - assertThat(store).isSameAs(CompactionTaskTracker.NONE); - assertThatThrownBy(store::getAllTasks).isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(store::getTasksInProgress).isInstanceOf(UnsupportedOperationException.class); - assertThatThrownBy(() -> store.getTask("some-task")).isInstanceOf(UnsupportedOperationException.class); + assertThat(tracker).isSameAs(CompactionTaskTracker.NONE); + assertThatThrownBy(tracker::getAllTasks).isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(tracker::getTasksInProgress).isInstanceOf(UnsupportedOperationException.class); + assertThatThrownBy(() -> tracker.getTask("some-task")).isInstanceOf(UnsupportedOperationException.class); } @AfterEach public void tearDown() { - DynamoDBCompactionTaskStatusStoreCreator.tearDown(instanceProperties, dynamoDBClient); + DynamoDBCompactionTaskTrackerCreator.tearDown(instanceProperties, dynamoDBClient); } } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryAllCompactionTasksIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryAllCompactionTasksIT.java index 70b367da78..3e4e19498e 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryAllCompactionTasksIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryAllCompactionTasksIT.java @@ -41,15 +41,15 @@ public void shouldReportMultipleCompactionTasksSortedMostRecentFirst() { Instant.parse("2022-10-06T11:22:00.001Z")); // When - store.taskStarted(task1); - store.taskStarted(task2); - store.taskStarted(task3); - store.taskFinished(task2); - store.taskStarted(task4); - store.taskFinished(task4); + tracker.taskStarted(task1); + tracker.taskStarted(task2); + tracker.taskStarted(task3); + tracker.taskFinished(task2); + tracker.taskStarted(task4); + tracker.taskFinished(task4); // Then - assertThat(store.getAllTasks()) + assertThat(tracker.getAllTasks()) .usingRecursiveFieldByFieldElementComparator(IGNORE_EXPIRY_DATE) .containsExactly(task4, task3, task2, task1); } @@ -57,6 +57,6 @@ public void shouldReportMultipleCompactionTasksSortedMostRecentFirst() { @Test public void shouldReportNoCompactionTasks() { // When / Then - assertThat(store.getAllTasks()).isEmpty(); + assertThat(tracker.getAllTasks()).isEmpty(); } } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksByPeriodIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksByPeriodIT.java index 8ccca9b07e..7322c09c71 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksByPeriodIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksByPeriodIT.java @@ -36,11 +36,11 @@ public void shouldReportCompactionTaskFinished() { Instant periodEnd = Instant.parse("2022-10-06T12:00:00.000Z"); // When - store.taskStarted(task); - store.taskFinished(task); + tracker.taskStarted(task); + tracker.taskFinished(task); // Then - assertThat(store.getTasksInTimePeriod(periodStart, periodEnd)) + assertThat(tracker.getTasksInTimePeriod(periodStart, periodEnd)) .usingRecursiveFieldByFieldElementComparator(IGNORE_EXPIRY_DATE) .containsExactly(task); } @@ -55,11 +55,11 @@ public void shouldExcludeCompactionTaskOutsidePeriod() { Instant periodEnd = Instant.parse("2022-10-06T11:00:00.000Z"); // When - store.taskStarted(task); - store.taskFinished(task); + tracker.taskStarted(task); + tracker.taskFinished(task); // Then - assertThat(store.getTasksInTimePeriod(periodStart, periodEnd)).isEmpty(); + assertThat(tracker.getTasksInTimePeriod(periodStart, periodEnd)).isEmpty(); } @Test @@ -75,13 +75,13 @@ public void shouldSortByStartTimeMostRecentFirst() { Instant periodEnd = Instant.parse("2022-10-06T12:00:00.000Z"); // When - store.taskStarted(task1); - store.taskFinished(task1); - store.taskStarted(task2); - store.taskFinished(task2); + tracker.taskStarted(task1); + tracker.taskFinished(task1); + tracker.taskStarted(task2); + tracker.taskFinished(task2); // Then - assertThat(store.getTasksInTimePeriod(periodStart, periodEnd)) + assertThat(tracker.getTasksInTimePeriod(periodStart, periodEnd)) .usingRecursiveFieldByFieldElementComparator(IGNORE_EXPIRY_DATE) .containsExactly(task2, task1); } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksInProgressIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksInProgressIT.java index 4b9700bb24..7d0248b6dc 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksInProgressIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/QueryCompactionTasksInProgressIT.java @@ -32,10 +32,10 @@ public void shouldIncludeUnfinishedTask() { CompactionTaskStatus task = startedTaskWithDefaults(); // When - store.taskStarted(task); + tracker.taskStarted(task); // Then - assertThat(store.getTasksInProgress()) + assertThat(tracker.getTasksInProgress()) .usingRecursiveFieldByFieldElementComparator(IGNORE_EXPIRY_DATE) .containsExactly(task); } @@ -46,11 +46,11 @@ public void shouldExcludeFinishedTask() { CompactionTaskStatus task = finishedTaskWithDefaults(); // When - store.taskStarted(task); - store.taskFinished(task); + tracker.taskStarted(task); + tracker.taskFinished(task); // Then - assertThat(store.getTasksInProgress()).isEmpty(); + assertThat(tracker.getTasksInProgress()).isEmpty(); } @Test @@ -62,11 +62,11 @@ public void shouldSortByStartTimeMostRecentFirst() { .startTime(Instant.parse("2022-10-06T11:19:10.001Z")).build(); // When - store.taskStarted(task1); - store.taskStarted(task2); + tracker.taskStarted(task1); + tracker.taskStarted(task2); // Then - assertThat(store.getTasksInProgress()) + assertThat(tracker.getTasksInProgress()) .usingRecursiveFieldByFieldElementComparator(IGNORE_EXPIRY_DATE) .containsExactly(task2, task1); } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskExpiryIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskExpiryIT.java index a9ac08042a..bee8b0647f 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskExpiryIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskExpiryIT.java @@ -34,13 +34,13 @@ public void shouldUpdateExpiryDateForCompactionTaskStatusStarted() { // Given CompactionTaskStatus taskStatus = startedTaskWithDefaults(); Duration timeToLive = Duration.ofDays(7); - CompactionTaskTracker store = storeWithTimeToLiveAndUpdateTimes(timeToLive, defaultTaskStartTime()); + CompactionTaskTracker tracker = trackerWithTimeToLiveAndUpdateTimes(timeToLive, defaultTaskStartTime()); // When - store.taskStarted(taskStatus); + tracker.taskStarted(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId()).getExpiryDate()) + assertThat(tracker.getTask(taskStatus.getTaskId()).getExpiryDate()) .isEqualTo(timePlusDurationAsExpiry(defaultTaskStartTime(), timeToLive)); } @@ -49,15 +49,15 @@ public void shouldUpdateExpiryDateForCompactionTaskStatusFinished() { // Given CompactionTaskStatus taskStatus = finishedTaskWithDefaults(); Duration timeToLive = Duration.ofDays(7); - CompactionTaskTracker store = storeWithTimeToLiveAndUpdateTimes(timeToLive, + CompactionTaskTracker tracker = trackerWithTimeToLiveAndUpdateTimes(timeToLive, defaultTaskStartTime(), defaultTaskFinishTime()); // When - store.taskStarted(taskStatus); - store.taskFinished(taskStatus); + tracker.taskStarted(taskStatus); + tracker.taskFinished(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId()).getExpiryDate()) + assertThat(tracker.getTask(taskStatus.getTaskId()).getExpiryDate()) .isEqualTo(timePlusDurationAsExpiry(defaultTaskStartTime(), timeToLive)); } @@ -66,13 +66,13 @@ public void shouldUpdateDifferentExpiryDateForCompactionTaskStatusStarted() { // Given CompactionTaskStatus taskStatus = startedTaskWithDefaults(); Duration timeToLive = Duration.ofDays(1); - CompactionTaskTracker store = storeWithTimeToLiveAndUpdateTimes(timeToLive, defaultTaskStartTime()); + CompactionTaskTracker tracker = trackerWithTimeToLiveAndUpdateTimes(timeToLive, defaultTaskStartTime()); // When - store.taskStarted(taskStatus); + tracker.taskStarted(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId()).getExpiryDate()) + assertThat(tracker.getTask(taskStatus.getTaskId()).getExpiryDate()) .isEqualTo(timePlusDurationAsExpiry(defaultTaskStartTime(), timeToLive)); } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskIT.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskIT.java index 77a06e161f..e0b9da7ef0 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskIT.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/task/StoreCompactionTaskIT.java @@ -30,10 +30,10 @@ public void shouldReportCompactionTaskStarted() { CompactionTaskStatus taskStatus = startedTaskWithDefaults(); // When - store.taskStarted(taskStatus); + tracker.taskStarted(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId())) + assertThat(tracker.getTask(taskStatus.getTaskId())) .usingRecursiveComparison(IGNORE_EXPIRY_DATE) .isEqualTo(taskStatus); } @@ -44,11 +44,11 @@ public void shouldReportCompactionTaskFinished() { CompactionTaskStatus taskStatus = finishedTaskWithDefaults(); // When - store.taskStarted(taskStatus); - store.taskFinished(taskStatus); + tracker.taskStarted(taskStatus); + tracker.taskFinished(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId())) + assertThat(tracker.getTask(taskStatus.getTaskId())) .usingRecursiveComparison(IGNORE_EXPIRY_DATE) .isEqualTo(taskStatus); } @@ -59,11 +59,11 @@ public void shouldReportCompactionTaskFinishedWithDurationInSecondsNotAWholeNumb CompactionTaskStatus taskStatus = finishedTaskWithDefaultsAndDurationInSecondsNotAWholeNumber(); // When - store.taskStarted(taskStatus); - store.taskFinished(taskStatus); + tracker.taskStarted(taskStatus); + tracker.taskFinished(taskStatus); // Then - assertThat(store.getTask(taskStatus.getTaskId())) + assertThat(tracker.getTask(taskStatus.getTaskId())) .usingRecursiveComparison(IGNORE_EXPIRY_DATE) .isEqualTo(taskStatus); } @@ -74,7 +74,7 @@ public void shouldReportNoCompactionTaskExistsInStore() { CompactionTaskStatus taskStatus = startedTaskWithDefaults(); // When/Then - assertThat(store.getTask(taskStatus.getTaskId())) + assertThat(tracker.getTask(taskStatus.getTaskId())) .usingRecursiveComparison(IGNORE_EXPIRY_DATE) .isNull(); } diff --git a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/testutils/DynamoDBCompactionTaskStatusStoreTestBase.java b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/testutils/DynamoDBCompactionTaskStatusStoreTestBase.java index bbddf9f8e2..575e059fa4 100644 --- a/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/testutils/DynamoDBCompactionTaskStatusStoreTestBase.java +++ b/java/compaction/compaction-status-store/src/test/java/sleeper/compaction/status/store/testutils/DynamoDBCompactionTaskStatusStoreTestBase.java @@ -19,9 +19,9 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; -import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore; -import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStoreCreator; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; +import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker; +import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTrackerCreator; import sleeper.core.properties.instance.InstanceProperties; import sleeper.core.record.process.RecordsProcessed; import sleeper.core.record.process.RecordsProcessedSummary; @@ -35,7 +35,7 @@ import java.util.Arrays; import java.util.UUID; -import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore.taskStatusTableName; +import static sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker.taskStatusTableName; import static sleeper.core.properties.instance.CommonProperty.ID; import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_TASK_STATUS_TTL_IN_SECONDS; import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; @@ -46,11 +46,11 @@ public class DynamoDBCompactionTaskStatusStoreTestBase extends DynamoDBTestBase .withIgnoredFields("expiryDate").build(); private final InstanceProperties instanceProperties = createTestInstanceProperties(); private final String taskStatusTableName = taskStatusTableName(instanceProperties.get(ID)); - protected final CompactionTaskTracker store = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties); + protected final CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties); @BeforeEach public void setUp() { - DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDBClient); + DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDBClient); } @AfterEach @@ -58,9 +58,9 @@ public void tearDown() { dynamoDBClient.deleteTable(taskStatusTableName); } - protected CompactionTaskTracker storeWithTimeToLiveAndUpdateTimes(Duration timeToLive, Instant... updateTimes) { + protected CompactionTaskTracker trackerWithTimeToLiveAndUpdateTimes(Duration timeToLive, Instant... updateTimes) { instanceProperties.set(COMPACTION_TASK_STATUS_TTL_IN_SECONDS, "" + timeToLive.getSeconds()); - return new DynamoDBCompactionTaskStatusStore(dynamoDBClient, instanceProperties, + return new DynamoDBCompactionTaskTracker(dynamoDBClient, instanceProperties, Arrays.stream(updateTimes).iterator()::next); } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/compaction/AwsCompactionReportsDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/compaction/AwsCompactionReportsDriver.java index fa379435ef..66a37f9272 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/compaction/AwsCompactionReportsDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/compaction/AwsCompactionReportsDriver.java @@ -25,7 +25,7 @@ import sleeper.clients.status.report.compaction.task.StandardCompactionTaskStatusReporter; import sleeper.clients.status.report.job.query.RangeJobsQuery; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.core.tracker.compaction.job.CompactionJobTracker; import sleeper.core.tracker.compaction.job.query.CompactionJobStatus; import sleeper.core.tracker.compaction.task.CompactionTaskStatus; @@ -49,11 +49,11 @@ public AwsCompactionReportsDriver(SystemTestInstanceContext instance, AmazonDyna public SystemTestReport tasksAndJobsReport() { return (out, startTime) -> { - new CompactionTaskStatusReport(taskStore(), + new CompactionTaskStatusReport(taskTracker(), new StandardCompactionTaskStatusReporter(out), CompactionTaskQuery.forPeriod(startTime, Instant.MAX)) .run(); - new CompactionJobStatusReport(jobStore(), + new CompactionJobStatusReport(jobTracker(), new StandardCompactionJobStatusReporter(out), new RangeJobsQuery(instance.getTableStatus(), startTime, Instant.MAX)) .run(); @@ -62,20 +62,20 @@ public SystemTestReport tasksAndJobsReport() { public List jobs(ReportingContext reportingContext) { return new RangeJobsQuery(instance.getTableStatus(), reportingContext.getRecordingStartTime(), Instant.MAX) - .run(jobStore()); + .run(jobTracker()); } @Override public List tasks(ReportingContext reportingContext) { return CompactionTaskQuery.forPeriod(reportingContext.getRecordingStartTime(), Instant.MAX) - .run(taskStore()); + .run(taskTracker()); } - private CompactionJobTracker jobStore() { + private CompactionJobTracker jobTracker() { return CompactionJobTrackerFactory.getTracker(dynamoDB, instance.getInstanceProperties()); } - private CompactionTaskTracker taskStore() { - return CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instance.getInstanceProperties()); + private CompactionTaskTracker taskTracker() { + return CompactionTaskTrackerFactory.getTracker(dynamoDB, instance.getInstanceProperties()); } } diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsWaitForJobs.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsWaitForJobs.java index 3e3f5a1322..a9332b2c7c 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsWaitForJobs.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/util/AwsWaitForJobs.java @@ -19,7 +19,7 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import sleeper.compaction.status.store.job.CompactionJobTrackerFactory; -import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory; +import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory; import sleeper.ingest.status.store.job.IngestJobStatusStoreFactory; import sleeper.ingest.status.store.task.IngestTaskStatusStoreFactory; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; @@ -47,7 +47,7 @@ public static WaitForJobs forBulkImport(SystemTestInstanceContext instance, Amaz public static WaitForJobs forCompaction(SystemTestInstanceContext instance, AmazonDynamoDB dynamoDBClient, PollWithRetriesDriver pollDriver) { return WaitForJobs.forCompaction(instance, properties -> CompactionJobTrackerFactory.getTracker(dynamoDBClient, properties), - properties -> CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, properties), + properties -> CompactionTaskTrackerFactory.getTracker(dynamoDBClient, properties), pollDriver); } }