From b71aec1034d8796e8e6a06d35ee1d04c8c479923 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:53:37 +0000 Subject: [PATCH] Move ingestStartedStatus to IngestJobStatusFromJobTestData --- .../ingest/job/IngestJobStatusReporterTestData.java | 2 +- .../job/status/InMemoryIngestJobStatusStoreTest.java | 8 ++++---- .../job/status/IngestJobStatusFromJobTestData.java | 12 ++++++++++++ .../core/job/status/IngestJobStatusTestHelper.java | 7 ++++--- .../ingest/runner/task/IngestJobRunnerIT.java | 2 +- .../DynamoDBIngestJobStatusStoreTestBase.java | 2 +- .../committer/StateStoreCommitterTest.java | 2 +- 7 files changed, 24 insertions(+), 11 deletions(-) diff --git a/java/clients/src/test/java/sleeper/clients/status/report/ingest/job/IngestJobStatusReporterTestData.java b/java/clients/src/test/java/sleeper/clients/status/report/ingest/job/IngestJobStatusReporterTestData.java index d5d3f05e71..410434c5d7 100644 --- a/java/clients/src/test/java/sleeper/clients/status/report/ingest/job/IngestJobStatusReporterTestData.java +++ b/java/clients/src/test/java/sleeper/clients/status/report/ingest/job/IngestJobStatusReporterTestData.java @@ -36,6 +36,7 @@ import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary; import static sleeper.core.record.process.status.ProcessStatusUpdateTestHelper.defaultUpdateTime; import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus; +import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRun; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichStarted; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.failedIngestJob; @@ -45,7 +46,6 @@ import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestRun; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestJob; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun; diff --git a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java index 3dda2e6d08..683c209590 100644 --- a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java +++ b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/InMemoryIngestJobStatusStoreTest.java @@ -510,7 +510,7 @@ void shouldReportJobInProgressWithOneCommit() { assertThat(store.getAllJobs(tableId)) .containsExactly(ingestJobStatus(job, ProcessRun.builder() .taskId(taskId) - .startedStatus(ingestStartedStatus(job, startTime)) + .startedStatus(ingestStartedStatus(startTime)) .statusUpdate(ingestAddedFilesStatus(writtenTime, 1)) .build())); assertThat(store.streamTableRecords(tableId)) @@ -542,7 +542,7 @@ void shouldReportJobAddedOneFileWithTwoReferences() { assertThat(store.getAllJobs(tableId)) .containsExactly(ingestJobStatus(job, ProcessRun.builder() .taskId(taskId) - .startedStatus(ingestStartedStatus(job, startTime)) + .startedStatus(ingestStartedStatus(startTime)) .statusUpdate(ingestAddedFilesStatus(writtenTime, 1)) .build())); assertThat(store.streamTableRecords(tableId)) @@ -571,7 +571,7 @@ void shouldReportJobAddedTwoFiles() { assertThat(store.getAllJobs(tableId)) .containsExactly(ingestJobStatus(job, ProcessRun.builder() .taskId(taskId) - .startedStatus(ingestStartedStatus(job, startTime)) + .startedStatus(ingestStartedStatus(startTime)) .statusUpdate(ingestAddedFilesStatus(writtenTime, 2)) .build())); assertThat(store.streamTableRecords(tableId)) @@ -602,7 +602,7 @@ void shouldReportJobFinishedButUncommitted() { assertThat(store.getAllJobs(tableId)) .containsExactly(ingestJobStatus(job, ProcessRun.builder() .taskId(taskId) - .startedStatus(ingestStartedStatus(job, startTime)) + .startedStatus(ingestStartedStatus(startTime)) .statusUpdate(ingestFinishedStatusUncommitted(job, summary, 1)) .build())); assertThat(store.streamTableRecords(tableId)) diff --git a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusFromJobTestData.java b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusFromJobTestData.java index 94d337857b..4e85f607f7 100644 --- a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusFromJobTestData.java +++ b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusFromJobTestData.java @@ -18,6 +18,7 @@ import sleeper.core.record.process.status.ProcessRun; import sleeper.core.tracker.ingest.job.IngestJobStatus; import sleeper.core.tracker.ingest.job.query.IngestJobAcceptedStatus; +import sleeper.core.tracker.ingest.job.query.IngestJobStartedStatus; import sleeper.ingest.core.job.IngestJob; import java.time.Instant; @@ -54,4 +55,15 @@ public static IngestJobAcceptedStatus ingestAcceptedStatus(IngestJob job, Instan return IngestJobAcceptedStatus.from(job.getFileCount(), validationTime, defaultUpdateTime(validationTime)); } + /** + * Creates an ingest job started status. + * + * @param job the ingest job + * @param startTime the start time + * @return an ingest job started status + */ + public static IngestJobStartedStatus ingestStartedStatus(IngestJob job, Instant startTime) { + return IngestJobStatusTestHelper.ingestStartedStatus(job, startTime, defaultUpdateTime(startTime)); + } + } diff --git a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java index 663e909be9..d8f34a7892 100644 --- a/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java +++ b/java/ingest/ingest-core/src/test/java/sleeper/ingest/core/job/status/IngestJobStatusTestHelper.java @@ -417,12 +417,13 @@ public static IngestJobAcceptedStatus ingestAcceptedStatus(Instant validationTim /** * Creates an ingest job started status. * - * @param job the ingest job * @param startTime the start time * @return an ingest job started status */ - public static IngestJobStartedStatus ingestStartedStatus(IngestJob job, Instant startTime) { - return ingestStartedStatus(job, startTime, defaultUpdateTime(startTime)); + public static IngestJobStartedStatus ingestStartedStatus(Instant startTime) { + return IngestJobStartedStatus.withStartOfRun(true).inputFileCount(1) + .startTime(startTime).updateTime(defaultUpdateTime(startTime)) + .build(); } /** diff --git a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java index fea1d04ac0..b46a5d7892 100644 --- a/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java +++ b/java/ingest/ingest-runner/src/test/java/sleeper/ingest/runner/task/IngestJobRunnerIT.java @@ -86,8 +86,8 @@ import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; import static sleeper.core.statestore.testutils.StateStoreTestHelper.inMemoryStateStoreWithFixedSinglePartition; import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus; +import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus; import static sleeper.ingest.runner.testutils.LocalStackAwsV2ClientHelper.buildAwsV2Client; import static sleeper.ingest.runner.testutils.ResultVerifier.readMergedRecordsFromPartitionDataFiles; import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration; diff --git a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/testutils/DynamoDBIngestJobStatusStoreTestBase.java b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/testutils/DynamoDBIngestJobStatusStoreTestBase.java index 07b36efe65..06069e1e3b 100644 --- a/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/testutils/DynamoDBIngestJobStatusStoreTestBase.java +++ b/java/ingest/ingest-status-store/src/test/java/sleeper/ingest/status/store/testutils/DynamoDBIngestJobStatusStoreTestBase.java @@ -58,10 +58,10 @@ import static sleeper.core.record.process.status.ProcessStatusUpdateTestHelper.defaultUpdateTime; import static sleeper.core.statestore.AllReferencesToAFileTestHelper.filesWithReferences; import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus; +import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.failedIngestJob; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestJob; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestRun; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestJob; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun; import static sleeper.ingest.status.store.testutils.IngestStatusStoreTestUtils.createInstanceProperties; diff --git a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java index 3ef3a761ae..e92d515d3f 100644 --- a/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java +++ b/java/statestore-committer-core/src/test/java/sleeper/statestore/committer/StateStoreCommitterTest.java @@ -92,8 +92,8 @@ import static sleeper.core.tracker.compaction.job.CompactionJobStatusTestData.compactionFinishedStatus; import static sleeper.core.tracker.compaction.job.CompactionJobStatusTestData.compactionStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus; +import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus; import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus; -import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus; public class StateStoreCommitterTest { private static final Instant DEFAULT_FILE_UPDATE_TIME = FilesReportTestHelper.DEFAULT_UPDATE_TIME;