Skip to content

Commit

Permalink
Rename CompactionTaskTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Dec 18, 2024
1 parent 224cdfd commit b326f85
Show file tree
Hide file tree
Showing 24 changed files with 111 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.ingest.batcher.core.IngestBatcherStore;
import sleeper.ingest.batcher.store.IngestBatcherStoreFactory;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
Expand All @@ -36,7 +36,7 @@ public interface AdminClientTrackerFactory {

CompactionJobTracker loadCompactionJobTracker(InstanceProperties instanceProperties);

CompactionTaskStatusStore loadCompactionTaskStatusStore(InstanceProperties instanceProperties);
CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties);

IngestJobStatusStore loadIngestJobStatusStore(InstanceProperties instanceProperties);

Expand All @@ -52,7 +52,7 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance
}

@Override
public CompactionTaskStatusStore loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
return CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;

import static sleeper.configuration.utils.AwsV1ClientHelper.buildAwsV1Client;

public class CompactionTaskStatusReport {

private final CompactionTaskStatusStore store;
private final CompactionTaskTracker store;
private final CompactionTaskStatusReporter reporter;
private final CompactionTaskQuery query;

public CompactionTaskStatusReport(
CompactionTaskStatusStore store,
CompactionTaskTracker store,
CompactionTaskStatusReporter reporter,
CompactionTaskQuery query) {
this.store = store;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static void main(String[] args) {

try {
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, arguments.getInstanceId());
CompactionTaskStatusStore statusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
CompactionTaskTracker statusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
new CompactionTaskStatusReport(statusStore, arguments.getReporter(), arguments.getQuery()).run();
} finally {
s3Client.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStore;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.statestore.StateStoreFactory;
import sleeper.task.common.QueueMessageCount;

Expand All @@ -56,15 +56,15 @@ public class StatusReport {
private final boolean verbose;
private final StateStore stateStore;
private final CompactionJobTracker compactionJobTracker;
private final CompactionTaskStatusStore compactionTaskStatusStore;
private final CompactionTaskTracker compactionTaskStatusStore;
private final SqsClient sqsClient;
private final QueueMessageCount.Client messageCount;
private final TablePropertiesProvider tablePropertiesProvider;

public StatusReport(
InstanceProperties instanceProperties, TableProperties tableProperties,
boolean verbose, StateStore stateStore,
CompactionJobTracker compactionJobTracker, CompactionTaskStatusStore compactionTaskStatusStore,
CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskStatusStore,
SqsClient sqsClient, QueueMessageCount.Client messageCount, TablePropertiesProvider tablePropertiesProvider) {
this.instanceProperties = instanceProperties;
this.tableProperties = tableProperties;
Expand Down Expand Up @@ -120,7 +120,7 @@ public static void main(String[] args) {
StateStoreFactory stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, new Configuration());
StateStore stateStore = stateStoreFactory.getStateStore(tableProperties);
CompactionJobTracker compactionJobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient, instanceProperties);
CompactionTaskStatusStore compactionTaskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
CompactionTaskTracker compactionTaskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);

StatusReport statusReport = new StatusReport(
instanceProperties, tableProperties, verbose,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,17 @@
package sleeper.clients.status.report.compaction.task;

import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;

import java.time.Instant;
import java.util.List;

@FunctionalInterface
public interface CompactionTaskQuery {
CompactionTaskQuery UNFINISHED = CompactionTaskStatusStore::getTasksInProgress;
CompactionTaskQuery ALL = CompactionTaskStatusStore::getAllTasks;
CompactionTaskQuery UNFINISHED = CompactionTaskTracker::getTasksInProgress;
CompactionTaskQuery ALL = CompactionTaskTracker::getAllTasks;

List<CompactionTaskStatus> run(CompactionTaskStatusStore store);
List<CompactionTaskStatus> run(CompactionTaskTracker store);

static CompactionTaskQuery from(String type) {
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskTracker;

import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -158,7 +158,7 @@ private RunAdminClient runCompactionJobStatusReport() {
@Nested
@DisplayName("Compaction task status report")
class CompactionTaskStatusReport {
private final InMemoryCompactionTaskStatusStore compactionTaskStatusStore = new InMemoryCompactionTaskStatusStore();
private final InMemoryCompactionTaskTracker compactionTaskStatusStore = new InMemoryCompactionTaskTracker();

private List<CompactionTaskStatus> exampleTaskStartedStatuses() {
return List.of(startedTask("task-1", "2023-03-15T18:53:12.001Z"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.ingest.batcher.core.IngestBatcherStore;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.ingest.core.task.IngestTaskStatusStore;
Expand All @@ -33,7 +33,7 @@
public class AdminClientProcessTrackerHolder implements AdminClientTrackerFactory {

private final Map<String, CompactionJobTracker> compactionJobTrackerByInstance = new HashMap<>();
private final Map<String, CompactionTaskStatusStore> compactionTaskStoreByInstance = new HashMap<>();
private final Map<String, CompactionTaskTracker> compactionTaskStoreByInstance = new HashMap<>();
private final Map<String, IngestJobStatusStore> ingestJobStoreByInstance = new HashMap<>();
private final Map<String, IngestTaskStatusStore> ingestTaskStoreByInstance = new HashMap<>();
private final Map<String, IngestBatcherStore> ingestBatcherStoreByInstance = new HashMap<>();
Expand All @@ -42,7 +42,7 @@ public void setTracker(String instanceId, CompactionJobTracker tracker) {
compactionJobTrackerByInstance.put(instanceId, tracker);
}

public void setTracker(String instanceId, CompactionTaskStatusStore tracker) {
public void setTracker(String instanceId, CompactionTaskTracker tracker) {
compactionTaskStoreByInstance.put(instanceId, tracker);
}

Expand All @@ -65,9 +65,9 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance
}

@Override
public CompactionTaskStatusStore loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
return Optional.ofNullable(compactionTaskStoreByInstance.get(instanceProperties.get(ID)))
.orElse(CompactionTaskStatusStore.NONE);
.orElse(CompactionTaskTracker.NONE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskTracker;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.ingest.core.task.IngestTaskStatusStore;

Expand All @@ -48,7 +48,7 @@ void shouldSetCompactionJobTracker() {
@Test
void shouldSetCompactionTaskStatusStore() {
// Given
InMemoryCompactionTaskStatusStore store = new InMemoryCompactionTaskStatusStore();
InMemoryCompactionTaskTracker store = new InMemoryCompactionTaskTracker();
InstanceProperties properties = createValidInstanceProperties();
setInstanceProperties(properties);

Expand Down Expand Up @@ -109,7 +109,7 @@ void shouldReturnNoCompactionTaskStatusStore() {

// When / Then
assertThat(runClient().trackers().loadCompactionTaskStatusStore(properties))
.isSameAs(CompactionTaskStatusStore.NONE);
.isSameAs(CompactionTaskTracker.NONE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TableProperties;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.ingest.batcher.core.IngestBatcherStore;
import sleeper.ingest.core.job.status.IngestJobStatusStore;
import sleeper.ingest.core.task.IngestTaskStatusStore;
Expand Down Expand Up @@ -140,7 +140,7 @@ public RunAdminClient tracker(CompactionJobTracker store) {
return this;
}

public RunAdminClient tracker(CompactionTaskStatusStore store) {
public RunAdminClient tracker(CompactionTaskTracker store) {
trackers.setTracker(harness.getInstanceId(), store);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import sleeper.clients.status.report.CompactionTaskStatusReport;
import sleeper.clients.testutil.ToStringConsoleOutput;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskTracker;

import java.io.PrintStream;
import java.time.Duration;
Expand All @@ -35,7 +35,7 @@

public class CompactionTaskStatusReportTest {

private final InMemoryCompactionTaskStatusStore store = new InMemoryCompactionTaskStatusStore();
private final InMemoryCompactionTaskTracker store = new InMemoryCompactionTaskTracker();

@Test
public void shouldReportCompactionTaskUnfinished() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskFinishedStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskStatus;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.core.util.LoggedDuration;

import java.io.IOException;
Expand Down Expand Up @@ -71,7 +71,7 @@ public class CompactionTask {
private final MessageReceiver messageReceiver;
private final CompactionRunnerFactory selector;
private final CompactionJobTracker jobTracker;
private final CompactionTaskStatusStore taskStatusStore;
private final CompactionTaskTracker taskStatusStore;
private final CompactionJobCommitterOrSendToLambda jobCommitter;
private final String taskId;
private final Supplier<String> jobRunIdSupplier;
Expand All @@ -82,7 +82,7 @@ public CompactionTask(InstanceProperties instanceProperties, TablePropertiesProv
PropertiesReloader propertiesReloader, StateStoreProvider stateStoreProvider,
MessageReceiver messageReceiver, StateStoreWaitForFiles waitForFiles,
CompactionJobCommitterOrSendToLambda jobCommitter, CompactionJobTracker jobStore,
CompactionTaskStatusStore taskStore, CompactionRunnerFactory selector, String taskId) {
CompactionTaskTracker taskStore, CompactionRunnerFactory selector, String taskId) {
this(instanceProperties, tablePropertiesProvider, propertiesReloader, stateStoreProvider,
messageReceiver, waitForFiles, jobCommitter,
jobStore, taskStore, selector, taskId,
Expand All @@ -97,7 +97,7 @@ public CompactionTask(
StateStoreProvider stateStoreProvider,
MessageReceiver messageReceiver, StateStoreWaitForFiles waitForFiles,
CompactionJobCommitterOrSendToLambda jobCommitter,
CompactionJobTracker jobTracker, CompactionTaskStatusStore taskTracker, CompactionRunnerFactory selector,
CompactionJobTracker jobTracker, CompactionTaskTracker taskTracker, CompactionRunnerFactory selector,
String taskId, Supplier<String> jobRunIdSupplier, Supplier<Instant> timeSupplier, Consumer<Duration> sleepForTime) {
this.instanceProperties = instanceProperties;
this.tablePropertiesProvider = tablePropertiesProvider;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.statestore.testutils.FixedStateStoreProvider;
import sleeper.core.tracker.compaction.job.InMemoryCompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.core.tracker.compaction.task.InMemoryCompactionTaskTracker;
import sleeper.core.util.ThreadSleep;
import sleeper.core.util.ThreadSleepTestHelper;

Expand Down Expand Up @@ -82,7 +82,7 @@ public class CompactionTaskTestBase {
protected final List<CompactionJob> consumedJobs = new ArrayList<>();
protected final List<CompactionJob> jobsReturnedToQueue = new ArrayList<>();
protected final InMemoryCompactionJobTracker jobTracker = new InMemoryCompactionJobTracker();
protected final CompactionTaskStatusStore taskStore = new InMemoryCompactionTaskStatusStore();
protected final CompactionTaskTracker taskStore = new InMemoryCompactionTaskTracker();
protected final List<Duration> sleeps = new ArrayList<>();
protected final List<CompactionJobCommitRequest> commitRequestsOnQueue = new ArrayList<>();
protected final List<Duration> foundWaitsForFileAssignment = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.statestore.StateStoreProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.core.util.LoggedDuration;
import sleeper.core.util.ObjectFactory;
import sleeper.core.util.ObjectFactoryException;
Expand Down Expand Up @@ -94,7 +94,7 @@ public static void main(String[] args) throws IOException, ObjectFactoryExceptio
HadoopConfigurationProvider.getConfigurationForECS(instanceProperties));
CompactionJobTracker jobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient,
instanceProperties);
CompactionTaskStatusStore taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient,
CompactionTaskTracker taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient,
instanceProperties);
String taskId = UUID.randomUUID().toString();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
import sleeper.core.statestore.exception.ReplaceRequestsFailedException;
import sleeper.core.statestore.testutils.FixedStateStoreProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
import sleeper.core.util.ObjectFactory;
import sleeper.ingest.runner.IngestFactory;
import sleeper.ingest.runner.impl.IngestCoordinator;
Expand Down Expand Up @@ -146,7 +146,7 @@ public class ECSCompactionTaskRunnerLocalStackIT {
private final TableProperties tableProperties = createTable();
private final String tableId = tableProperties.get(TABLE_ID);
private final CompactionJobTracker jobTracker = CompactionJobTrackerFactory.getTracker(dynamoDB, instanceProperties);
private final CompactionTaskStatusStore taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties);
private final CompactionTaskTracker taskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties);

@AfterEach
void tearDown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;

import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.tracker.compaction.task.CompactionTaskStatusStore;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;

import static sleeper.core.properties.instance.CompactionProperty.COMPACTION_STATUS_STORE_ENABLED;

Expand All @@ -28,11 +28,11 @@ public class CompactionTaskStatusStoreFactory {
private CompactionTaskStatusStoreFactory() {
}

public static CompactionTaskStatusStore getStatusStore(AmazonDynamoDB dynamoDB, InstanceProperties properties) {
public static CompactionTaskTracker getStatusStore(AmazonDynamoDB dynamoDB, InstanceProperties properties) {
if (properties.getBoolean(COMPACTION_STATUS_STORE_ENABLED)) {
return new DynamoDBCompactionTaskStatusStore(dynamoDB, properties);
} else {
return CompactionTaskStatusStore.NONE;
return CompactionTaskTracker.NONE;
}
}
}
Loading

0 comments on commit b326f85

Please sign in to comment.