Skip to content

Commit

Permalink
Rename usages of CompactionTaskTracker
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Dec 18, 2024
1 parent b326f85 commit d7c18be
Show file tree
Hide file tree
Showing 26 changed files with 141 additions and 141 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import sleeper.cdk.stack.core.ManagedPoliciesStack;
import sleeper.compaction.status.store.job.DynamoDBCompactionJobTracker;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusFormat;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStore;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTracker;
import sleeper.core.properties.instance.InstanceProperties;

import static sleeper.cdk.util.Utils.removalPolicy;
Expand Down Expand Up @@ -77,7 +77,7 @@ public CompactionTrackerStack(

tasksTable = Table.Builder
.create(this, "DynamoDBCompactionTaskStatusTable")
.tableName(DynamoDBCompactionTaskStatusStore.taskStatusTableName(instanceId))
.tableName(DynamoDBCompactionTaskTracker.taskStatusTableName(instanceId))
.removalPolicy(removalPolicy)
.billingMode(BillingMode.PAY_PER_REQUEST)
.partitionKey(Attribute.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;

import sleeper.compaction.status.store.job.CompactionJobTrackerFactory;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.properties.table.TablePropertiesProvider;
import sleeper.core.tracker.compaction.job.CompactionJobTracker;
Expand All @@ -36,7 +36,7 @@ public interface AdminClientTrackerFactory {

CompactionJobTracker loadCompactionJobTracker(InstanceProperties instanceProperties);

CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties);
CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties);

IngestJobStatusStore loadIngestJobStatusStore(InstanceProperties instanceProperties);

Expand All @@ -52,8 +52,8 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance
}

@Override
public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
return CompactionTaskStatusStoreFactory.getStatusStore(dynamoDB, instanceProperties);
public CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties) {
return CompactionTaskTrackerFactory.getTracker(dynamoDB, instanceProperties);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ private void runCompactionJobStatusReport(InstanceProperties properties, TableSt
}

private void runCompactionTaskStatusReport(InstanceProperties properties, CompactionTaskQuery queryType) {
new CompactionTaskStatusReport(trackers.loadCompactionTaskStatusStore(properties),
new CompactionTaskStatusReport(trackers.loadCompactionTaskTracker(properties),
new StandardCompactionTaskStatusReporter(out.printStream()), queryType).run();
confirmReturnToMainScreen(out, in);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import software.amazon.awssdk.services.sqs.SqsClient;

import sleeper.compaction.status.store.job.DynamoDBCompactionJobTrackerCreator;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskStatusStoreCreator;
import sleeper.compaction.status.store.task.DynamoDBCompactionTaskTrackerCreator;
import sleeper.core.properties.instance.InstanceProperties;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
Expand All @@ -44,7 +44,7 @@ public static CompactionDockerStack from(InstanceProperties instanceProperties,

public void deploy() {
DynamoDBCompactionJobTrackerCreator.create(instanceProperties, dynamoDB);
DynamoDBCompactionTaskStatusStoreCreator.create(instanceProperties, dynamoDB);
DynamoDBCompactionTaskTrackerCreator.create(instanceProperties, dynamoDB);
String queueName = "sleeper-" + instanceProperties.get(ID) + "-CompactionJobQ";
String queueUrl = sqsClient.createQueue(request -> request.queueName(queueName)).queueUrl();
instanceProperties.set(COMPACTION_JOB_QUEUE_URL, queueUrl);
Expand All @@ -53,7 +53,7 @@ public void deploy() {
@Override
public void tearDown() {
DynamoDBCompactionJobTrackerCreator.tearDown(instanceProperties, dynamoDB);
DynamoDBCompactionTaskStatusStoreCreator.tearDown(instanceProperties, dynamoDB);
DynamoDBCompactionTaskTrackerCreator.tearDown(instanceProperties, dynamoDB);
sqsClient.deleteQueue(request -> request.queueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import sleeper.clients.status.report.compaction.task.CompactionTaskQuery;
import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReportArguments;
import sleeper.clients.status.report.compaction.task.CompactionTaskStatusReporter;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.core.tracker.compaction.task.CompactionTaskTracker;
Expand All @@ -32,21 +32,21 @@

public class CompactionTaskStatusReport {

private final CompactionTaskTracker store;
private final CompactionTaskTracker tracker;
private final CompactionTaskStatusReporter reporter;
private final CompactionTaskQuery query;

public CompactionTaskStatusReport(
CompactionTaskTracker store,
CompactionTaskTracker tracker,
CompactionTaskStatusReporter reporter,
CompactionTaskQuery query) {
this.store = store;
this.tracker = tracker;
this.reporter = reporter;
this.query = query;
}

public void run() {
reporter.report(query, query.run(store));
reporter.report(query, query.run(tracker));
}

public static void main(String[] args) {
Expand All @@ -65,8 +65,8 @@ public static void main(String[] args) {

try {
InstanceProperties instanceProperties = S3InstanceProperties.loadGivenInstanceId(s3Client, arguments.getInstanceId());
CompactionTaskTracker statusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
new CompactionTaskStatusReport(statusStore, arguments.getReporter(), arguments.getQuery()).run();
CompactionTaskTracker tracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties);
new CompactionTaskStatusReport(tracker, arguments.getReporter(), arguments.getQuery()).run();
} finally {
s3Client.shutdown();
dynamoDBClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import sleeper.clients.status.report.job.query.JobQuery;
import sleeper.clients.status.report.partitions.PartitionsStatusReporter;
import sleeper.compaction.status.store.job.CompactionJobTrackerFactory;
import sleeper.compaction.status.store.task.CompactionTaskStatusStoreFactory;
import sleeper.compaction.status.store.task.CompactionTaskTrackerFactory;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.core.properties.instance.InstanceProperties;
Expand All @@ -56,22 +56,22 @@ public class StatusReport {
private final boolean verbose;
private final StateStore stateStore;
private final CompactionJobTracker compactionJobTracker;
private final CompactionTaskTracker compactionTaskStatusStore;
private final CompactionTaskTracker compactionTaskTracker;
private final SqsClient sqsClient;
private final QueueMessageCount.Client messageCount;
private final TablePropertiesProvider tablePropertiesProvider;

public StatusReport(
InstanceProperties instanceProperties, TableProperties tableProperties,
boolean verbose, StateStore stateStore,
CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskStatusStore,
CompactionJobTracker compactionJobTracker, CompactionTaskTracker compactionTaskTracker,
SqsClient sqsClient, QueueMessageCount.Client messageCount, TablePropertiesProvider tablePropertiesProvider) {
this.instanceProperties = instanceProperties;
this.tableProperties = tableProperties;
this.verbose = verbose;
this.stateStore = stateStore;
this.compactionJobTracker = compactionJobTracker;
this.compactionTaskStatusStore = compactionTaskStatusStore;
this.compactionTaskTracker = compactionTaskTracker;
this.sqsClient = sqsClient;
this.messageCount = messageCount;
this.tablePropertiesProvider = tablePropertiesProvider;
Expand All @@ -92,7 +92,7 @@ private void run() {
JobQuery.Type.UNFINISHED).run();

// Tasks
new CompactionTaskStatusReport(compactionTaskStatusStore,
new CompactionTaskStatusReport(compactionTaskTracker,
new StandardCompactionTaskStatusReporter(System.out),
CompactionTaskQuery.UNFINISHED).run();

Expand Down Expand Up @@ -120,11 +120,11 @@ public static void main(String[] args) {
StateStoreFactory stateStoreFactory = new StateStoreFactory(instanceProperties, s3Client, dynamoDBClient, new Configuration());
StateStore stateStore = stateStoreFactory.getStateStore(tableProperties);
CompactionJobTracker compactionJobTracker = CompactionJobTrackerFactory.getTracker(dynamoDBClient, instanceProperties);
CompactionTaskTracker compactionTaskStatusStore = CompactionTaskStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
CompactionTaskTracker compactionTaskTracker = CompactionTaskTrackerFactory.getTracker(dynamoDBClient, instanceProperties);

StatusReport statusReport = new StatusReport(
instanceProperties, tableProperties, verbose,
stateStore, compactionJobTracker, compactionTaskStatusStore,
stateStore, compactionJobTracker, compactionTaskTracker,
sqsClient, QueueMessageCount.withSqsClient(sqsClientV1), tablePropertiesProvider);
statusReport.run();
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface CompactionTaskQuery {
CompactionTaskQuery UNFINISHED = CompactionTaskTracker::getTasksInProgress;
CompactionTaskQuery ALL = CompactionTaskTracker::getAllTasks;

List<CompactionTaskStatus> run(CompactionTaskTracker store);
List<CompactionTaskStatus> run(CompactionTaskTracker tracker);

static CompactionTaskQuery from(String type) {
switch (type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
public class AdminClientProcessTrackerHolder implements AdminClientTrackerFactory {

private final Map<String, CompactionJobTracker> compactionJobTrackerByInstance = new HashMap<>();
private final Map<String, CompactionTaskTracker> compactionTaskStoreByInstance = new HashMap<>();
private final Map<String, CompactionTaskTracker> compactionTaskTrackerByInstance = new HashMap<>();
private final Map<String, IngestJobStatusStore> ingestJobStoreByInstance = new HashMap<>();
private final Map<String, IngestTaskStatusStore> ingestTaskStoreByInstance = new HashMap<>();
private final Map<String, IngestBatcherStore> ingestBatcherStoreByInstance = new HashMap<>();
Expand All @@ -43,7 +43,7 @@ public void setTracker(String instanceId, CompactionJobTracker tracker) {
}

public void setTracker(String instanceId, CompactionTaskTracker tracker) {
compactionTaskStoreByInstance.put(instanceId, tracker);
compactionTaskTrackerByInstance.put(instanceId, tracker);
}

public void setTracker(String instanceId, IngestJobStatusStore tracker) {
Expand All @@ -65,8 +65,8 @@ public CompactionJobTracker loadCompactionJobTracker(InstanceProperties instance
}

@Override
public CompactionTaskTracker loadCompactionTaskStatusStore(InstanceProperties instanceProperties) {
return Optional.ofNullable(compactionTaskStoreByInstance.get(instanceProperties.get(ID)))
public CompactionTaskTracker loadCompactionTaskTracker(InstanceProperties instanceProperties) {
return Optional.ofNullable(compactionTaskTrackerByInstance.get(instanceProperties.get(ID)))
.orElse(CompactionTaskTracker.NONE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ void shouldSetCompactionJobTracker() {
}

@Test
void shouldSetCompactionTaskStatusStore() {
void shouldSetCompactionTaskTracker() {
// Given
InMemoryCompactionTaskTracker store = new InMemoryCompactionTaskTracker();
InstanceProperties properties = createValidInstanceProperties();
Expand All @@ -56,7 +56,7 @@ void shouldSetCompactionTaskStatusStore() {
RunAdminClient runner = runClient().tracker(store);

// Then
assertThat(runner.trackers().loadCompactionTaskStatusStore(properties))
assertThat(runner.trackers().loadCompactionTaskTracker(properties))
.isSameAs(store);
}

Expand Down Expand Up @@ -102,13 +102,13 @@ void shouldReturnNoCompactionJobTracker() {
}

@Test
void shouldReturnNoCompactionTaskStatusStore() {
void shouldReturnNoCompactionTaskTracker() {
// Given
InstanceProperties properties = createValidInstanceProperties();
setInstanceProperties(properties);

// When / Then
assertThat(runClient().trackers().loadCompactionTaskStatusStore(properties))
assertThat(runClient().trackers().loadCompactionTaskTracker(properties))
.isSameAs(CompactionTaskTracker.NONE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public class CompactionTask {
private final MessageReceiver messageReceiver;
private final CompactionRunnerFactory selector;
private final CompactionJobTracker jobTracker;
private final CompactionTaskTracker taskStatusStore;
private final CompactionTaskTracker taskTracker;
private final CompactionJobCommitterOrSendToLambda jobCommitter;
private final String taskId;
private final Supplier<String> jobRunIdSupplier;
Expand All @@ -82,10 +82,10 @@ public CompactionTask(InstanceProperties instanceProperties, TablePropertiesProv
PropertiesReloader propertiesReloader, StateStoreProvider stateStoreProvider,
MessageReceiver messageReceiver, StateStoreWaitForFiles waitForFiles,
CompactionJobCommitterOrSendToLambda jobCommitter, CompactionJobTracker jobStore,
CompactionTaskTracker taskStore, CompactionRunnerFactory selector, String taskId) {
CompactionTaskTracker taskTracker, CompactionRunnerFactory selector, String taskId) {
this(instanceProperties, tablePropertiesProvider, propertiesReloader, stateStoreProvider,
messageReceiver, waitForFiles, jobCommitter,
jobStore, taskStore, selector, taskId,
jobStore, taskTracker, selector, taskId,
() -> UUID.randomUUID().toString(), Instant::now, threadSleep());
}

Expand All @@ -108,7 +108,7 @@ public CompactionTask(
this.messageReceiver = messageReceiver;
this.selector = selector;
this.jobTracker = jobTracker;
this.taskStatusStore = taskTracker;
this.taskTracker = taskTracker;
this.taskId = taskId;
this.jobRunIdSupplier = jobRunIdSupplier;
this.jobCommitter = jobCommitter;
Expand All @@ -119,14 +119,14 @@ public void run() throws IOException {
Instant startTime = timeSupplier.get();
CompactionTaskStatus.Builder taskStatusBuilder = CompactionTaskStatus.builder().taskId(taskId).startTime(startTime);
LOGGER.info("Starting task {}", taskId);
taskStatusStore.taskStarted(taskStatusBuilder.build());
taskTracker.taskStarted(taskStatusBuilder.build());
CompactionTaskFinishedStatus.Builder taskFinishedBuilder = CompactionTaskFinishedStatus.builder();
Instant finishTime = handleMessages(startTime, taskFinishedBuilder);
CompactionTaskStatus taskFinished = taskStatusBuilder.finished(finishTime, taskFinishedBuilder).build();
LOGGER.info("Total number of messages processed = {}", taskFinished.getJobRuns());
LOGGER.info("Total run time = {}", LoggedDuration.withFullOutput(startTime, finishTime));

taskStatusStore.taskFinished(taskFinished);
taskTracker.taskFinished(taskFinished);
}

private Instant handleMessages(Instant startTime, CompactionTaskFinishedStatus.Builder taskFinishedBuilder) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ void shouldSaveTaskAndJobWhenOneJobSucceeds() throws Exception {
// Then
RecordsProcessedSummary jobSummary = new RecordsProcessedSummary(recordsProcessed,
jobStartTime, jobFinishTime);
assertThat(taskStore.getAllTasks()).containsExactly(
assertThat(taskTracker.getAllTasks()).containsExactly(
finishedCompactionTask("test-task-1", taskStartTime, taskFinishTime, jobSummary));
assertThat(jobTracker.getAllJobs(DEFAULT_TABLE_ID)).containsExactly(
compactionJobCreated(job, DEFAULT_CREATED_TIME,
Expand Down Expand Up @@ -441,7 +441,7 @@ void shouldSaveTaskAndJobsWhenMultipleJobsSucceed() throws Exception {
job1StartTime, job1FinishTime);
RecordsProcessedSummary job2Summary = new RecordsProcessedSummary(job2RecordsProcessed,
job2StartTime, job2FinishTime);
assertThat(taskStore.getAllTasks()).containsExactly(
assertThat(taskTracker.getAllTasks()).containsExactly(
finishedCompactionTask("test-task-1", taskStartTime, taskFinishTime, job1Summary, job2Summary));
assertThat(jobTracker.getAllJobs(DEFAULT_TABLE_ID)).containsExactlyInAnyOrder(
compactionJobCreated(job1, DEFAULT_CREATED_TIME,
Expand All @@ -467,7 +467,7 @@ void shouldSaveTaskAndJobWhenOneJobFails() throws Exception {
runTask("test-task-1", processJobs(jobFails(failure)), times::poll);

// Then
assertThat(taskStore.getAllTasks()).containsExactly(
assertThat(taskTracker.getAllTasks()).containsExactly(
finishedCompactionTask("test-task-1",
Instant.parse("2024-02-22T13:50:00Z"),
Instant.parse("2024-02-22T13:50:06Z")));
Expand All @@ -491,7 +491,7 @@ void shouldSaveTaskWhenNoJobsFound() throws Exception {
runTask("test-task-1", processNoJobs(), times::poll);

// Then
assertThat(taskStore.getAllTasks()).containsExactly(
assertThat(taskTracker.getAllTasks()).containsExactly(
finishedCompactionTask("test-task-1",
Instant.parse("2024-02-22T13:50:00Z"),
Instant.parse("2024-02-22T13:50:05Z")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public class CompactionTaskTestBase {
protected final List<CompactionJob> consumedJobs = new ArrayList<>();
protected final List<CompactionJob> jobsReturnedToQueue = new ArrayList<>();
protected final InMemoryCompactionJobTracker jobTracker = new InMemoryCompactionJobTracker();
protected final CompactionTaskTracker taskStore = new InMemoryCompactionTaskTracker();
protected final CompactionTaskTracker taskTracker = new InMemoryCompactionTaskTracker();
protected final List<Duration> sleeps = new ArrayList<>();
protected final List<CompactionJobCommitRequest> commitRequestsOnQueue = new ArrayList<>();
protected final List<Duration> foundWaitsForFileAssignment = new ArrayList<>();
Expand Down Expand Up @@ -150,7 +150,7 @@ private void runTask(
CompactionRunnerFactory selector = (job, properties) -> compactor;
new CompactionTask(instanceProperties, tablePropertiesProvider(), PropertiesReloader.neverReload(),
stateStoreProvider(), messageReceiver, fileAssignmentCheck,
committer, jobTracker, taskStore, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add)
committer, jobTracker, taskTracker, selector, taskId, jobRunIdSupplier, timeSupplier, sleeps::add)
.run();
}

Expand Down
Loading

0 comments on commit d7c18be

Please sign in to comment.