Skip to content

Commit

Permalink
Remove job status store from CompactSortedFiles
Browse files Browse the repository at this point in the history
  • Loading branch information
kr565370 committed Mar 11, 2024
1 parent 4267703 commit a607bf7
Show file tree
Hide file tree
Showing 11 changed files with 47 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobStatusStore;
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.jars.ObjectFactoryException;
import sleeper.configuration.properties.instance.InstanceProperties;
Expand All @@ -38,7 +37,6 @@
import sleeper.core.partition.Partition;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStore;
Expand All @@ -53,14 +51,12 @@
import sleeper.statestore.StateStoreProvider;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;

import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER;
import static sleeper.sketches.s3.SketchesSerDeToS3.sketchesPathForDataFile;

/**
Expand All @@ -72,41 +68,21 @@ public class CompactSortedFiles {
private final TablePropertiesProvider tablePropertiesProvider;
private final ObjectFactory objectFactory;
private final StateStoreProvider stateStoreProvider;
private final CompactionJobStatusStore jobStatusStore;
private final String taskId;

private static final Logger LOGGER = LoggerFactory.getLogger(CompactSortedFiles.class);

public CompactSortedFiles(
InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider,
StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, CompactionJobStatusStore jobStatusStore,
String taskId) {
StateStoreProvider stateStoreProvider, ObjectFactory objectFactory) {
this.instanceProperties = instanceProperties;
this.tablePropertiesProvider = tablePropertiesProvider;
this.objectFactory = objectFactory;
this.stateStoreProvider = stateStoreProvider;
this.jobStatusStore = jobStatusStore;
this.taskId = taskId;
}

public RecordsProcessedSummary run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
Instant startTime = Instant.now();
String id = compactionJob.getId();
LOGGER.info("Compaction job {}: compaction called at {}", id, startTime);
jobStatusStore.jobStarted(compactionJob, startTime, taskId);

public RecordsProcessed run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
RecordsProcessed recordsProcessed = compact(compactionJob);

Instant finishTime = Instant.now();
// Print summary
LOGGER.info("Compaction job {}: finished at {}", id, finishTime);

RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, startTime, finishTime);
METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds());
METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", id, summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond()));
METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(), String.format("%.1f", summary.getRecordsWrittenPerSecond()));
jobStatusStore.jobFinished(compactionJob, summary, taskId);
return summary;
return recordsProcessed;
}

private RecordsProcessed compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import sleeper.compaction.task.CompactionTaskStatusStore;
import sleeper.configuration.properties.PropertiesReloader;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.util.LoggedDuration;

Expand Down Expand Up @@ -114,12 +115,13 @@ public Instant handleMessages(Instant startTime, Consumer<RecordsProcessedSummar
jobStatusStore.jobStarted(job, jobStartTime, taskId);
try {
propertiesReloader.reloadIfNeeded();
RecordsProcessedSummary summary = compactor.compact(job);
RecordsProcessed recordsProcessed = compactor.compact(job);
lastActiveTime = timeSupplier.get();
RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, lastActiveTime);
summaryConsumer.accept(summary);
message.completed();
totalNumberOfMessagesProcessed++;
numConsecutiveFailures = 0;
lastActiveTime = timeSupplier.get();
// Print summary
LOGGER.info("Compaction job {}: finished at {}", id, lastActiveTime);
METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds());
Expand All @@ -144,7 +146,7 @@ interface MessageReceiver {

@FunctionalInterface
interface CompactionRunner {
RecordsProcessedSummary compact(CompactionJob job) throws Exception;
RecordsProcessed compact(CompactionJob job) throws Exception;
}

interface MessageHandle extends AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@ public static void main(String[] args) throws InterruptedException, IOException,

ObjectFactory objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp");
CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties,
tablePropertiesProvider, stateStoreProvider,
objectFactory, jobStatusStore, taskId);
tablePropertiesProvider, stateStoreProvider, objectFactory);
CompactionTask task = new CompactionTask(instanceProperties, propertiesReloader, Instant::now,
new SqsCompactionQueueHandler(sqsClient, instanceProperties)::receiveFromSqs,
job -> compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
Expand Down Expand Up @@ -54,8 +54,8 @@ void shouldMergeFilesCorrectlyWhenSomeAreEmpty() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -88,8 +88,8 @@ void shouldMergeFilesCorrectlyWhenAllAreEmpty() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.ByteArrayType;
import sleeper.core.schema.type.LongType;
Expand Down Expand Up @@ -58,8 +58,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithLongKey() throws Exception
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -115,8 +115,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithStringKey() throws Exceptio
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -179,8 +179,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithByteArrayKey() throws Excep
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sleeper.core.iterator.impl.AgeOffIterator;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
Expand Down Expand Up @@ -67,8 +67,8 @@ void shouldApplyIteratorDuringCompaction() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output files and check that they contain the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobStatusStore;
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase;
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData;
import sleeper.compaction.status.store.job.CompactionJobStatusStoreFactory;
import sleeper.compaction.status.store.job.DynamoDBCompactionJobStatusStoreCreator;
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.properties.table.FixedTablePropertiesProvider;
Expand All @@ -43,7 +41,7 @@
import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
Expand Down Expand Up @@ -78,7 +76,6 @@ public class CompactSortedFilesLocalStackIT extends CompactSortedFilesTestBase {
private static AmazonDynamoDB dynamoDBClient;
private static AmazonS3 s3Client;
private static S3AsyncClient s3AsyncClient;
private CompactionJobStatusStore jobStatusStore;

@BeforeAll
public static void beforeAll() {
Expand All @@ -100,7 +97,6 @@ void setUp() {
s3Client.createBucket(instanceProperties.get(DATA_BUCKET));
new S3StateStoreCreator(instanceProperties, dynamoDBClient).create();
DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDBClient);
jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
}

protected FileReference ingestRecordsGetFile(StateStore stateStore, List<Record> records) throws Exception {
Expand All @@ -117,12 +113,12 @@ private StateStore createStateStore(Schema schema) {
.getStateStore(tableProperties);
}

private CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob, StateStore stateStore) throws Exception {
private CompactSortedFiles createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception {
tableProperties.setSchema(schema);
return new CompactSortedFiles(instanceProperties,
new FixedTablePropertiesProvider(tableProperties),
new FixedStateStoreProvider(tableProperties, stateStore),
ObjectFactory.noUserJars(), jobStatusStore, DEFAULT_TASK_ID);
ObjectFactory.noUserJars());
}

@Test
Expand All @@ -143,8 +139,8 @@ public void shouldUpdateStateStoreAfterRunningCompactionJob() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob, stateStore);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, stateStore);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down

This file was deleted.

Loading

0 comments on commit a607bf7

Please sign in to comment.