Skip to content

Commit

Permalink
Move ingestStartedStatus to IngestJobStatusFromJobTestData
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Dec 18, 2024
1 parent 5c14c25 commit b71aec1
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import static sleeper.core.record.process.RecordsProcessedSummaryTestHelper.summary;
import static sleeper.core.record.process.status.ProcessStatusUpdateTestHelper.defaultUpdateTime;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRun;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.acceptedRunWhichStarted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.failedIngestJob;
Expand All @@ -45,7 +46,6 @@
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestRun;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestFinishedStatusUncommitted;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.rejectedRun;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestJob;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ void shouldReportJobInProgressWithOneCommit() {
assertThat(store.getAllJobs(tableId))
.containsExactly(ingestJobStatus(job, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestStartedStatus(job, startTime))
.startedStatus(ingestStartedStatus(startTime))
.statusUpdate(ingestAddedFilesStatus(writtenTime, 1))
.build()));
assertThat(store.streamTableRecords(tableId))
Expand Down Expand Up @@ -542,7 +542,7 @@ void shouldReportJobAddedOneFileWithTwoReferences() {
assertThat(store.getAllJobs(tableId))
.containsExactly(ingestJobStatus(job, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestStartedStatus(job, startTime))
.startedStatus(ingestStartedStatus(startTime))
.statusUpdate(ingestAddedFilesStatus(writtenTime, 1))
.build()));
assertThat(store.streamTableRecords(tableId))
Expand Down Expand Up @@ -571,7 +571,7 @@ void shouldReportJobAddedTwoFiles() {
assertThat(store.getAllJobs(tableId))
.containsExactly(ingestJobStatus(job, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestStartedStatus(job, startTime))
.startedStatus(ingestStartedStatus(startTime))
.statusUpdate(ingestAddedFilesStatus(writtenTime, 2))
.build()));
assertThat(store.streamTableRecords(tableId))
Expand Down Expand Up @@ -602,7 +602,7 @@ void shouldReportJobFinishedButUncommitted() {
assertThat(store.getAllJobs(tableId))
.containsExactly(ingestJobStatus(job, ProcessRun.builder()
.taskId(taskId)
.startedStatus(ingestStartedStatus(job, startTime))
.startedStatus(ingestStartedStatus(startTime))
.statusUpdate(ingestFinishedStatusUncommitted(job, summary, 1))
.build()));
assertThat(store.streamTableRecords(tableId))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import sleeper.core.record.process.status.ProcessRun;
import sleeper.core.tracker.ingest.job.IngestJobStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobAcceptedStatus;
import sleeper.core.tracker.ingest.job.query.IngestJobStartedStatus;
import sleeper.ingest.core.job.IngestJob;

import java.time.Instant;
Expand Down Expand Up @@ -54,4 +55,15 @@ public static IngestJobAcceptedStatus ingestAcceptedStatus(IngestJob job, Instan
return IngestJobAcceptedStatus.from(job.getFileCount(), validationTime, defaultUpdateTime(validationTime));
}

/**
* Creates an ingest job started status.
*
* @param job the ingest job
* @param startTime the start time
* @return an ingest job started status
*/
public static IngestJobStartedStatus ingestStartedStatus(IngestJob job, Instant startTime) {
return IngestJobStatusTestHelper.ingestStartedStatus(job, startTime, defaultUpdateTime(startTime));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -417,12 +417,13 @@ public static IngestJobAcceptedStatus ingestAcceptedStatus(Instant validationTim
/**
* Creates an ingest job started status.
*
* @param job the ingest job
* @param startTime the start time
* @return an ingest job started status
*/
public static IngestJobStartedStatus ingestStartedStatus(IngestJob job, Instant startTime) {
return ingestStartedStatus(job, startTime, defaultUpdateTime(startTime));
public static IngestJobStartedStatus ingestStartedStatus(Instant startTime) {
return IngestJobStartedStatus.withStartOfRun(true).inputFileCount(1)
.startTime(startTime).updateTime(defaultUpdateTime(startTime))
.build();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@
import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
import static sleeper.core.statestore.testutils.StateStoreTestHelper.inMemoryStateStoreWithFixedSinglePartition;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus;
import static sleeper.ingest.runner.testutils.LocalStackAwsV2ClientHelper.buildAwsV2Client;
import static sleeper.ingest.runner.testutils.ResultVerifier.readMergedRecordsFromPartitionDataFiles;
import static sleeper.parquet.utils.HadoopConfigurationLocalStackUtils.getHadoopConfiguration;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@
import static sleeper.core.record.process.status.ProcessStatusUpdateTestHelper.defaultUpdateTime;
import static sleeper.core.statestore.AllReferencesToAFileTestHelper.filesWithReferences;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.failedIngestJob;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestJob;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.finishedIngestRun;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestJob;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.startedIngestRun;
import static sleeper.ingest.status.store.testutils.IngestStatusStoreTestUtils.createInstanceProperties;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@
import static sleeper.core.tracker.compaction.job.CompactionJobStatusTestData.compactionFinishedStatus;
import static sleeper.core.tracker.compaction.job.CompactionJobStatusTestData.compactionStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestJobStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusFromJobTestData.ingestStartedStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestAddedFilesStatus;
import static sleeper.ingest.core.job.status.IngestJobStatusTestHelper.ingestStartedStatus;

public class StateStoreCommitterTest {
private static final Instant DEFAULT_FILE_UPDATE_TIME = FilesReportTestHelper.DEFAULT_UPDATE_TIME;
Expand Down

0 comments on commit b71aec1

Please sign in to comment.