Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 1961 - Update compaction job status store in CompactionTask #1971

Merged
merged 22 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4880fcd
Extract finishedCompactionTask in test
kr565370 Mar 11, 2024
8c70cbe
Merge remote-tracking branch 'origin/develop' into 1961-update-job-st…
kr565370 Mar 11, 2024
f903894
Fix compilation failures
kr565370 Mar 11, 2024
47c44c5
Start updating tests in CompactionTaskTest to assert on job store
kr565370 Mar 11, 2024
4dea26f
Fix failing tests
kr565370 Mar 11, 2024
4267703
Update remaining tests to assert on job store
kr565370 Mar 11, 2024
a607bf7
Remove job status store from CompactSortedFiles
kr565370 Mar 11, 2024
91c9c58
Introduce variable for jobFinishTime
kr565370 Mar 11, 2024
fdac1e6
Remove unnecessary delegation
kr565370 Mar 11, 2024
a6b06f9
Merge branch 'develop' into 1961-update-job-status-store-in-ingestjob…
kr565370 Mar 11, 2024
0c41e3c
Remove unused test variables
kr565370 Mar 11, 2024
b6ee9b5
Move job started call into try/catch block
kr565370 Mar 11, 2024
6fb8d60
Extract methods from handleMessages method
kr565370 Mar 11, 2024
0a6616f
Remove incorrect copied lines
kr565370 Mar 11, 2024
61ab616
Remove lone semicolon
kr565370 Mar 11, 2024
0903388
Remove comment
kr565370 Mar 11, 2024
9ae61e3
Change type of parameter to CompactionTaskFinishedStatus.Builder
kr565370 Mar 11, 2024
c5001e8
Move definition of summaries to assertions
kr565370 Mar 11, 2024
e213bec
Merge remote-tracking branch 'origin/1978-fix-compaction-task-termina…
kr565370 Mar 11, 2024
0e37a0a
Merge branch 'develop' into 1961-update-job-status-store-in-ingestjob…
kr565370 Mar 11, 2024
97a161d
Remove duplicate call to jobFinished
kr565370 Mar 11, 2024
77412cd
Merge branch 'develop' into 1961-update-job-status-store-in-ingestjob…
patchwork01 Mar 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobStatusStore;
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.jars.ObjectFactoryException;
import sleeper.configuration.properties.instance.InstanceProperties;
Expand All @@ -38,7 +37,6 @@
import sleeper.core.partition.Partition;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.StateStore;
Expand All @@ -53,14 +51,12 @@
import sleeper.statestore.StateStoreProvider;

import java.io.IOException;
import java.time.Instant;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;

import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER;
import static sleeper.sketches.s3.SketchesSerDeToS3.sketchesPathForDataFile;

/**
Expand All @@ -72,44 +68,19 @@ public class CompactSortedFiles {
private final TablePropertiesProvider tablePropertiesProvider;
private final ObjectFactory objectFactory;
private final StateStoreProvider stateStoreProvider;
private final CompactionJobStatusStore jobStatusStore;
private final String taskId;

private static final Logger LOGGER = LoggerFactory.getLogger(CompactSortedFiles.class);

public CompactSortedFiles(
InstanceProperties instanceProperties, TablePropertiesProvider tablePropertiesProvider,
StateStoreProvider stateStoreProvider, ObjectFactory objectFactory, CompactionJobStatusStore jobStatusStore,
String taskId) {
StateStoreProvider stateStoreProvider, ObjectFactory objectFactory) {
this.instanceProperties = instanceProperties;
this.tablePropertiesProvider = tablePropertiesProvider;
this.objectFactory = objectFactory;
this.stateStoreProvider = stateStoreProvider;
this.jobStatusStore = jobStatusStore;
this.taskId = taskId;
}

public RecordsProcessedSummary run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
Instant startTime = Instant.now();
String id = compactionJob.getId();
LOGGER.info("Compaction job {}: compaction called at {}", id, startTime);
jobStatusStore.jobStarted(compactionJob, startTime, taskId);

RecordsProcessed recordsProcessed = compact(compactionJob);

Instant finishTime = Instant.now();
// Print summary
LOGGER.info("Compaction job {}: finished at {}", id, finishTime);

RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, startTime, finishTime);
METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds());
METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", id, summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond()));
METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(), String.format("%.1f", summary.getRecordsWrittenPerSecond()));
jobStatusStore.jobFinished(compactionJob, summary, taskId);
return summary;
}

private RecordsProcessed compact(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
public RecordsProcessed run(CompactionJob compactionJob) throws IOException, IteratorException, StateStoreException {
TableProperties tableProperties = tablePropertiesProvider.getById(compactionJob.getTableId());
Schema schema = tableProperties.getSchema();
StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import org.slf4j.LoggerFactory;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobStatusStore;
import sleeper.compaction.task.CompactionTaskFinishedStatus;
import sleeper.compaction.task.CompactionTaskStatus;
import sleeper.compaction.task.CompactionTaskStatusStore;
import sleeper.configuration.properties.PropertiesReloader;
import sleeper.configuration.properties.instance.InstanceProperties;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.util.LoggedDuration;

Expand All @@ -37,6 +39,7 @@

import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES;
import static sleeper.configuration.properties.instance.CompactionProperty.COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS;
import static sleeper.core.metrics.MetricsLogger.METRICS_LOGGER;

/**
* Runs a compaction task, updating the {@link CompactionTaskStatusStore} with progress of the task.
Expand All @@ -49,20 +52,23 @@ public class CompactionTask {
private final Duration maxIdleTime;
private final MessageReceiver messageReceiver;
private final CompactionRunner compactor;
private final CompactionJobStatusStore jobStatusStore;
private final CompactionTaskStatusStore taskStatusStore;
private final String taskId;
private final PropertiesReloader propertiesReloader;
private int numConsecutiveFailures = 0;
private int totalNumberOfMessagesProcessed = 0;

public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader, Supplier<Instant> timeSupplier,
MessageReceiver messageReceiver, CompactionRunner compactor, CompactionTaskStatusStore taskStore, String taskId) {
public CompactionTask(InstanceProperties instanceProperties, PropertiesReloader propertiesReloader,
Supplier<Instant> timeSupplier, MessageReceiver messageReceiver, CompactionRunner compactor,
CompactionJobStatusStore jobStatusStore, CompactionTaskStatusStore taskStore, String taskId) {
maxIdleTime = Duration.ofSeconds(instanceProperties.getInt(COMPACTION_TASK_MAX_IDLE_TIME_IN_SECONDS));
maxConsecutiveFailures = instanceProperties.getInt(COMPACTION_TASK_MAX_CONSECUTIVE_FAILURES);
this.propertiesReloader = propertiesReloader;
this.timeSupplier = timeSupplier;
this.messageReceiver = messageReceiver;
this.compactor = compactor;
this.jobStatusStore = jobStatusStore;
this.taskStatusStore = taskStore;
this.taskId = taskId;
}
Expand Down Expand Up @@ -103,14 +109,27 @@ public Instant handleMessages(Instant startTime, Consumer<RecordsProcessedSummar
}
try (MessageHandle message = messageOpt.get()) {
CompactionJob job = message.getJob();
LOGGER.info("CompactionJob is: {}", job);
Instant jobStartTime = timeSupplier.get();
String id = job.getId();
LOGGER.info("Compaction job {}: compaction called at {}", id, jobStartTime);
jobStatusStore.jobStarted(job, jobStartTime, taskId);
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
try {
propertiesReloader.reloadIfNeeded();
summaryConsumer.accept(compactor.compact(job));
RecordsProcessed recordsProcessed = compactor.compact(job);
Instant jobFinishTime = timeSupplier.get();
RecordsProcessedSummary summary = new RecordsProcessedSummary(recordsProcessed, jobStartTime, jobFinishTime);
summaryConsumer.accept(summary);
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
message.completed();
totalNumberOfMessagesProcessed++;
numConsecutiveFailures = 0;
lastActiveTime = timeSupplier.get();
// Print summary
LOGGER.info("Compaction job {}: finished at {}", id, lastActiveTime);
METRICS_LOGGER.info("Compaction job {}: compaction run time = {}", id, summary.getDurationInSeconds());
METRICS_LOGGER.info("Compaction job {}: compaction read {} records at {} per second", id, summary.getRecordsRead(), String.format("%.1f", summary.getRecordsReadPerSecond()));
METRICS_LOGGER.info("Compaction job {}: compaction wrote {} records at {} per second", id, summary.getRecordsWritten(),
String.format("%.1f", summary.getRecordsWrittenPerSecond()));
jobStatusStore.jobFinished(job, summary, taskId);
lastActiveTime = jobFinishTime;
patchwork01 marked this conversation as resolved.
Show resolved Hide resolved
} catch (Exception e) {
LOGGER.error("Failed processing compaction job, putting job back on queue", e);
numConsecutiveFailures++;
Expand All @@ -128,7 +147,7 @@ interface MessageReceiver {

@FunctionalInterface
interface CompactionRunner {
RecordsProcessedSummary compact(CompactionJob job) throws Exception;
RecordsProcessed compact(CompactionJob job) throws Exception;
}

interface MessageHandle extends AutoCloseable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,10 @@ public static void main(String[] args) throws InterruptedException, IOException,

ObjectFactory objectFactory = new ObjectFactory(instanceProperties, s3Client, "/tmp");
CompactSortedFiles compactSortedFiles = new CompactSortedFiles(instanceProperties,
tablePropertiesProvider, stateStoreProvider,
objectFactory, jobStatusStore, taskId);
tablePropertiesProvider, stateStoreProvider, objectFactory);
CompactionTask task = new CompactionTask(instanceProperties, propertiesReloader, Instant::now,
new SqsCompactionQueueHandler(sqsClient, instanceProperties)::receiveFromSqs,
job -> compactSortedFiles.run(job), taskStatusStore, taskId);
job -> compactSortedFiles.run(job), jobStatusStore, taskStatusStore, taskId);
task.run();

sqsClient.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
Expand Down Expand Up @@ -54,8 +54,8 @@ void shouldMergeFilesCorrectlyWhenSomeAreEmpty() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -88,8 +88,8 @@ void shouldMergeFilesCorrectlyWhenAllAreEmpty() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.ByteArrayType;
import sleeper.core.schema.type.LongType;
Expand Down Expand Up @@ -58,8 +58,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithLongKey() throws Exception
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -115,8 +115,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithStringKey() throws Exceptio
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down Expand Up @@ -179,8 +179,8 @@ void shouldMergeFilesCorrectlyAndUpdateStateStoreWithByteArrayKey() throws Excep
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import sleeper.core.iterator.impl.AgeOffIterator;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.statestore.FileReference;
import sleeper.core.statestore.FileReferenceFactory;
Expand Down Expand Up @@ -67,8 +67,8 @@ void shouldApplyIteratorDuringCompaction() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output files and check that they contain the right results
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import software.amazon.awssdk.services.s3.S3AsyncClient;

import sleeper.compaction.job.CompactionJob;
import sleeper.compaction.job.CompactionJobStatusStore;
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestBase;
import sleeper.compaction.job.execution.testutils.CompactSortedFilesTestData;
import sleeper.compaction.status.store.job.CompactionJobStatusStoreFactory;
import sleeper.compaction.status.store.job.DynamoDBCompactionJobStatusStoreCreator;
import sleeper.configuration.jars.ObjectFactory;
import sleeper.configuration.properties.table.FixedTablePropertiesProvider;
Expand All @@ -43,7 +41,7 @@
import sleeper.core.partition.PartitionTree;
import sleeper.core.partition.PartitionsBuilder;
import sleeper.core.record.Record;
import sleeper.core.record.process.RecordsProcessedSummary;
import sleeper.core.record.process.RecordsProcessed;
import sleeper.core.schema.Schema;
import sleeper.core.schema.type.LongType;
import sleeper.core.statestore.FileReference;
Expand Down Expand Up @@ -78,7 +76,6 @@ public class CompactSortedFilesLocalStackIT extends CompactSortedFilesTestBase {
private static AmazonDynamoDB dynamoDBClient;
private static AmazonS3 s3Client;
private static S3AsyncClient s3AsyncClient;
private CompactionJobStatusStore jobStatusStore;

@BeforeAll
public static void beforeAll() {
Expand All @@ -100,7 +97,6 @@ void setUp() {
s3Client.createBucket(instanceProperties.get(DATA_BUCKET));
new S3StateStoreCreator(instanceProperties, dynamoDBClient).create();
DynamoDBCompactionJobStatusStoreCreator.create(instanceProperties, dynamoDBClient);
jobStatusStore = CompactionJobStatusStoreFactory.getStatusStore(dynamoDBClient, instanceProperties);
}

protected FileReference ingestRecordsGetFile(StateStore stateStore, List<Record> records) throws Exception {
Expand All @@ -117,12 +113,12 @@ private StateStore createStateStore(Schema schema) {
.getStateStore(tableProperties);
}

private CompactSortedFiles createCompactSortedFiles(Schema schema, CompactionJob compactionJob, StateStore stateStore) throws Exception {
private CompactSortedFiles createCompactSortedFiles(Schema schema, StateStore stateStore) throws Exception {
tableProperties.setSchema(schema);
return new CompactSortedFiles(instanceProperties,
new FixedTablePropertiesProvider(tableProperties),
new FixedStateStoreProvider(tableProperties, stateStore),
ObjectFactory.noUserJars(), jobStatusStore, DEFAULT_TASK_ID);
ObjectFactory.noUserJars());
}

@Test
Expand All @@ -143,8 +139,8 @@ public void shouldUpdateStateStoreAfterRunningCompactionJob() throws Exception {
assignJobIdToInputFiles(stateStore, compactionJob);

// When
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, compactionJob, stateStore);
RecordsProcessedSummary summary = compactSortedFiles.run(compactionJob);
CompactSortedFiles compactSortedFiles = createCompactSortedFiles(schema, stateStore);
RecordsProcessed summary = compactSortedFiles.run(compactionJob);

// Then
// - Read output file and check that it contains the right results
Expand Down
Loading
Loading