diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java index e750c70a8c..9447c6a5fa 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java @@ -15,10 +15,12 @@ */ package sleeper.compaction.core.job.dispatch; +import java.time.Instant; + public class CompactionJobBatchExpiredException extends RuntimeException { - public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request) { - super("Dispatch request for table " + request.getTableId() + " expired at " + request.getExpiryTime() + ", batch key: " + request.getBatchKey()); + public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request, Instant expiryTime) { + super("Dispatch request for table " + request.getTableId() + " expired at " + expiryTime + ", batch key: " + request.getBatchKey()); } } diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java index bf800863fc..149cdad195 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java @@ -19,31 +19,28 @@ import sleeper.core.properties.table.TableProperties; import sleeper.core.table.TableFilePaths; -import java.time.Duration; import java.time.Instant; import java.util.Objects; -import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS; import static sleeper.core.properties.table.TableProperty.TABLE_ID; public class CompactionJobDispatchRequest { private final String tableId; private final String batchKey; - private final Instant expiryTime; + private final Instant createTime; - private CompactionJobDispatchRequest(String tableId, String batchKey, Instant expiryTime) { + private CompactionJobDispatchRequest(String tableId, String batchKey, Instant createTime) { this.tableId = tableId; this.batchKey = batchKey; - this.expiryTime = expiryTime; + this.createTime = createTime; } public static CompactionJobDispatchRequest forTableWithBatchIdAtTime( InstanceProperties instanceProperties, TableProperties tableProperties, String batchId, Instant timeNow) { String batchKey = TableFilePaths.buildDataFilePathPrefix(instanceProperties, tableProperties) .constructCompactionJobBatchPath(batchId); - Duration sendTimeout = Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS)); - return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow.plus(sendTimeout)); + return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow); } public String getTableId() { @@ -54,13 +51,13 @@ public String getBatchKey() { return batchKey; } - public Instant getExpiryTime() { - return expiryTime; + public Instant getCreateTime() { + return createTime; } @Override public int hashCode() { - return Objects.hash(tableId, batchKey, expiryTime); + return Objects.hash(tableId, batchKey, createTime); } @Override @@ -72,11 +69,11 @@ public boolean equals(Object obj) { return false; } CompactionJobDispatchRequest other = (CompactionJobDispatchRequest) obj; - return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(expiryTime, other.expiryTime); + return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(createTime, other.createTime); } @Override public String toString() { - return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", expiryTime=" + expiryTime + "}"; + return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", createTime=" + createTime + "}"; } } diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java index c88556abee..28fb8aa3d4 100644 --- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java +++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java @@ -22,12 +22,14 @@ import sleeper.core.statestore.StateStore; import sleeper.core.statestore.StateStoreProvider; +import java.time.Duration; import java.time.Instant; import java.util.List; import java.util.function.Supplier; import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_RETRY_DELAY_SECS; +import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS; public class CompactionJobDispatcher { @@ -60,10 +62,12 @@ public void dispatch(CompactionJobDispatchRequest request) { sendJob.send(job); } } else { - if (timeSupplier.get().isAfter(request.getExpiryTime())) { - throw new CompactionJobBatchExpiredException(request); - } TableProperties tableProperties = tablePropertiesProvider.getById(request.getTableId()); + Instant expiryTime = request.getCreateTime().plus( + Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS))); + if (timeSupplier.get().isAfter(expiryTime)) { + throw new CompactionJobBatchExpiredException(request, expiryTime); + } returnToPendingQueue.sendWithDelay(request, tableProperties.getInt(COMPACTION_JOB_SEND_RETRY_DELAY_SECS)); } } diff --git a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json index 57074c06cb..f84c853064 100644 --- a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json +++ b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json @@ -1,5 +1,5 @@ { "tableId": "test-table", "batchKey": "s3a://test-bucket/test-table/compactions/test-batch.json", - "expiryTime": 1731931350000 -} \ No newline at end of file + "createTime": 1731931260000 +} diff --git a/java/compaction/compaction-job-creation-lambda/pom.xml b/java/compaction/compaction-job-creation-lambda/pom.xml index 2c530b2ae2..d913500fda 100644 --- a/java/compaction/compaction-job-creation-lambda/pom.xml +++ b/java/compaction/compaction-job-creation-lambda/pom.xml @@ -72,6 +72,43 @@ common-invoke-tables ${project.parent.version} + + + org.testcontainers + localstack + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + junit-jupiter + test + + + sleeper + core + ${project.parent.version} + test-jar + test + + + sleeper + configuration + ${project.parent.version} + test-jar + test + + + sleeper + parquet + ${project.parent.version} + test-jar + test + diff --git a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java new file mode 100644 index 0000000000..c91570bdb9 --- /dev/null +++ b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java @@ -0,0 +1,104 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.creation.lambda; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.lambda.runtime.Context; +import com.amazonaws.services.lambda.runtime.RequestHandler; +import com.amazonaws.services.lambda.runtime.events.SQSEvent; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import org.apache.hadoop.conf.Configuration; + +import sleeper.compaction.core.job.CompactionJobSerDe; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReadBatch; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReturnRequestToPendingQueue; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.SendJob; +import sleeper.configuration.properties.S3InstanceProperties; +import sleeper.configuration.properties.S3TableProperties; +import sleeper.core.properties.instance.InstanceProperties; +import sleeper.parquet.utils.HadoopConfigurationProvider; +import sleeper.statestore.StateStoreFactory; + +import java.time.Instant; +import java.util.function.Supplier; + +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; + +/** + * Sends compaction jobs in batches from the pending jobs queue, running in AWS Lambda. + * The jobs are created by {@link CreateCompactionJobsLambda}, then are sent in batches in this class. + * This lambda also handles waiting for input files to be assigned to the jobs, + * when that is done asynchronously. Runs batches with {@link CompactionJobDispatcher}. + */ +public class CompactionJobDispatchLambda implements RequestHandler { + + private final CompactionJobDispatcher dispatcher; + private final CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe(); + + public CompactionJobDispatchLambda() { + AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient(); + AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.defaultClient(); + AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient(); + String configBucket = System.getenv(CONFIG_BUCKET.toEnvironmentVariable()); + InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(s3, configBucket); + Configuration conf = HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties); + dispatcher = dispatcher(s3, dynamoDB, sqs, conf, instanceProperties, Instant::now); + } + + @Override + public Void handleRequest(SQSEvent event, Context context) { + event.getRecords().forEach(message -> dispatcher.dispatch(serDe.fromJson(message.getBody()))); + return null; + } + + public static CompactionJobDispatcher dispatcher( + AmazonS3 s3, AmazonDynamoDB dynamoDB, AmazonSQS sqs, Configuration conf, InstanceProperties instanceProperties, Supplier timeSupplier) { + CompactionJobSerDe compactionJobSerDe = new CompactionJobSerDe(); + return new CompactionJobDispatcher(instanceProperties, + S3TableProperties.createProvider(instanceProperties, s3, dynamoDB), + StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf), + readBatch(s3, compactionJobSerDe), + sendJob(instanceProperties, sqs, compactionJobSerDe), + returnToQueue(instanceProperties, sqs), timeSupplier); + } + + private static SendJob sendJob(InstanceProperties instanceProperties, AmazonSQS sqs, CompactionJobSerDe compactionJobSerDe) { + return compactionJob -> sqs.sendMessage( + instanceProperties.get(COMPACTION_JOB_QUEUE_URL), + compactionJobSerDe.toJson(compactionJob)); + } + + private static ReadBatch readBatch(AmazonS3 s3, CompactionJobSerDe compactionJobSerDe) { + return (bucketName, key) -> compactionJobSerDe.batchFromJson(s3.getObjectAsString(bucketName, key)); + } + + private static ReturnRequestToPendingQueue returnToQueue(InstanceProperties instanceProperties, AmazonSQS sqs) { + CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe(); + return (request, delaySeconds) -> sqs.sendMessage(new SendMessageRequest() + .withQueueUrl(instanceProperties.get(COMPACTION_PENDING_QUEUE_URL)) + .withMessageBody(serDe.toJson(request)) + .withDelaySeconds(delaySeconds)); + } +} diff --git a/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java new file mode 100644 index 0000000000..be7d327b12 --- /dev/null +++ b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java @@ -0,0 +1,218 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.compaction.job.creation.lambda; + +import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; +import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3ClientBuilder; +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.apache.hadoop.conf.Configuration; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import sleeper.compaction.core.job.CompactionJob; +import sleeper.compaction.core.job.CompactionJobFactory; +import sleeper.compaction.core.job.CompactionJobSerDe; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequest; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe; +import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher; +import sleeper.configuration.properties.S3InstanceProperties; +import sleeper.configuration.properties.S3TableProperties; +import sleeper.configuration.table.index.DynamoDBTableIndexCreator; +import sleeper.core.CommonTestConstants; +import sleeper.core.partition.PartitionTree; +import sleeper.core.partition.PartitionsBuilder; +import sleeper.core.properties.instance.InstanceProperties; +import sleeper.core.properties.table.TableProperties; +import sleeper.core.schema.Schema; +import sleeper.core.statestore.FileReference; +import sleeper.core.statestore.FileReferenceFactory; +import sleeper.core.statestore.StateStore; +import sleeper.core.statestore.StateStoreProvider; +import sleeper.parquet.utils.HadoopConfigurationLocalStackUtils; +import sleeper.statestore.StateStoreFactory; +import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator; + +import java.time.Instant; +import java.util.List; +import java.util.UUID; + +import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; +import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET; +import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_RETRY_DELAY_SECS; +import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS; +import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties; +import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties; +import static sleeper.core.schema.SchemaTestHelper.schemaWithKey; + +@Testcontainers +public class CompactionJobDispatchLambdaIT { + + @Container + public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE)) + .withServices(LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB); + + AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard()); + AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.DYNAMODB, AmazonDynamoDBClientBuilder.standard()); + AmazonSQS sqs = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.SQS, AmazonSQSClientBuilder.standard()); + Configuration conf = HadoopConfigurationLocalStackUtils.getHadoopConfiguration(localStackContainer); + + InstanceProperties instanceProperties = createInstance(); + StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf); + Schema schema = schemaWithKey("key"); + PartitionTree partitions = new PartitionsBuilder(schema).singlePartition("root").buildTree(); + TableProperties tableProperties = addTable(instanceProperties, schema, partitions); + FileReferenceFactory fileFactory = FileReferenceFactory.from(partitions); + CompactionJobFactory compactionFactory = new CompactionJobFactory(instanceProperties, tableProperties); + StateStore stateStore = stateStoreProvider.getStateStore(tableProperties); + + @Test + void shouldSendCompactionJobsInABatchWhenAllFilesAreAssigned() { + + // Given + FileReference file1 = fileFactory.rootFile("test1.parquet", 1234); + FileReference file2 = fileFactory.rootFile("test2.parquet", 5678); + CompactionJob job1 = compactionFactory.createCompactionJob("test-job-1", List.of(file1), "root"); + CompactionJob job2 = compactionFactory.createCompactionJob("test-job-2", List.of(file2), "root"); + stateStore.addFiles(List.of(file1, file2)); + assignJobIds(List.of(job1, job2)); + + CompactionJobDispatchRequest request = generateBatchRequestAtTime( + "test-batch", Instant.parse("2024-11-15T10:30:00Z")); + putCompactionJobBatch(request, List.of(job1, job2)); + + // When + dispatchWithNoRetry(request); + + // Then + assertThat(receiveCompactionJobs()).containsExactly(job1, job2); + } + + @Test + void shouldReturnBatchToTheQueueIfTheFilesForTheBatchAreUnassigned() throws Exception { + + // Given + FileReference file1 = fileFactory.rootFile("test3.parquet", 1234); + FileReference file2 = fileFactory.rootFile("test4.parquet", 5678); + CompactionJob job1 = compactionFactory.createCompactionJob("test-job-3", List.of(file1), "root"); + CompactionJob job2 = compactionFactory.createCompactionJob("test-job-4", List.of(file2), "root"); + stateStore.addFiles(List.of(file1, file2)); + + tableProperties.setNumber(COMPACTION_JOB_SEND_TIMEOUT_SECS, 123); + tableProperties.setNumber(COMPACTION_JOB_SEND_RETRY_DELAY_SECS, 0); + saveTableProperties(); + + CompactionJobDispatchRequest request = generateBatchRequestAtTime( + "test-batch", Instant.parse("2024-11-21T10:20:00Z")); + Instant retryTime = Instant.parse("2024-11-21T10:22:00Z"); + putCompactionJobBatch(request, List.of(job1, job2)); + + // When + dispatchWithTimeAtRetryCheck(request, retryTime); + + // Then + assertThat(recievePendingBatches()).containsExactly(request); + } + + private InstanceProperties createInstance() { + InstanceProperties instanceProperties = createTestInstanceProperties(); + instanceProperties.set(COMPACTION_JOB_QUEUE_URL, sqs.createQueue(UUID.randomUUID().toString()).getQueueUrl()); + instanceProperties.set(COMPACTION_PENDING_QUEUE_URL, sqs.createQueue(UUID.randomUUID().toString()).getQueueUrl()); + + DynamoDBTableIndexCreator.create(dynamoDB, instanceProperties); + new TransactionLogStateStoreCreator(instanceProperties, dynamoDB).create(); + + s3.createBucket(instanceProperties.get(CONFIG_BUCKET)); + s3.createBucket(instanceProperties.get(DATA_BUCKET)); + S3InstanceProperties.saveToS3(s3, instanceProperties); + + return instanceProperties; + } + + private TableProperties addTable(InstanceProperties instanceProperties, Schema schema, PartitionTree partitions) { + TableProperties tableProperties = createTestTableProperties(instanceProperties, schema); + S3TableProperties.createStore(instanceProperties, s3, dynamoDB) + .createTable(tableProperties); + stateStoreProvider.getStateStore(tableProperties) + .initialise(partitions.getAllPartitions()); + return tableProperties; + } + + private void saveTableProperties() { + S3TableProperties.createStore(instanceProperties, s3, dynamoDB) + .save(tableProperties); + } + + private void assignJobIds(List jobs) { + for (CompactionJob job : jobs) { + stateStore.assignJobIds(List.of(job.createAssignJobIdRequest())); + } + } + + private CompactionJobDispatchRequest generateBatchRequestAtTime(String batchId, Instant timeNow) { + return CompactionJobDispatchRequest.forTableWithBatchIdAtTime( + instanceProperties, tableProperties, batchId, timeNow); + } + + private void putCompactionJobBatch(CompactionJobDispatchRequest request, List jobs) { + s3.putObject( + instanceProperties.get(DATA_BUCKET), + request.getBatchKey(), + new CompactionJobSerDe().toJson(jobs)); + } + + private void dispatchWithNoRetry(CompactionJobDispatchRequest request) { + dispatcher(List.of()).dispatch(request); + } + + private void dispatchWithTimeAtRetryCheck(CompactionJobDispatchRequest request, Instant time) { + dispatcher(List.of(time)).dispatch(request); + } + + private CompactionJobDispatcher dispatcher(List times) { + return CompactionJobDispatchLambda.dispatcher(s3, dynamoDB, sqs, conf, instanceProperties, times.iterator()::next); + } + + private List receiveCompactionJobs() { + ReceiveMessageResult result = sqs.receiveMessage(new ReceiveMessageRequest() + .withQueueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL)) + .withMaxNumberOfMessages(10)); + return result.getMessages().stream() + .map(Message::getBody) + .map(new CompactionJobSerDe()::fromJson).toList(); + } + + private List recievePendingBatches() { + ReceiveMessageResult result = sqs.receiveMessage(new ReceiveMessageRequest() + .withQueueUrl(instanceProperties.get(COMPACTION_PENDING_QUEUE_URL)) + .withMaxNumberOfMessages(10)); + return result.getMessages().stream() + .map(Message::getBody) + .map(new CompactionJobDispatchRequestSerDe()::fromJson).toList(); + } +} diff --git a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java index cb6fd25d33..0726e2008f 100644 --- a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java +++ b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java @@ -81,7 +81,7 @@ public class CreateCompactionJobsIT { @Container public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE)).withServices( - LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB, LocalStackContainer.Service.IAM); + LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB); private final AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard()); private final AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.DYNAMODB, AmazonDynamoDBClientBuilder.standard()); diff --git a/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java b/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java index dc60f5a9a1..3350b56752 100644 --- a/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java +++ b/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java @@ -328,6 +328,22 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty { .description("The ARN of the dead letter queue for compaction jobs.") .propertyGroup(InstancePropertyGroup.COMPACTION) .build(); + CdkDefinedInstanceProperty COMPACTION_PENDING_QUEUE_URL = Index.propertyBuilder("sleeper.compaction.pending.queue.url") + .description("The URL of the queue for pending compaction job batches.") + .propertyGroup(InstancePropertyGroup.COMPACTION) + .build(); + CdkDefinedInstanceProperty COMPACTION_PENDING_QUEUE_ARN = Index.propertyBuilder("sleeper.compaction.pending.queue.arn") + .description("The ARN of the queue for pending compaction job batches.") + .propertyGroup(InstancePropertyGroup.COMPACTION) + .build(); + CdkDefinedInstanceProperty COMPACTION_PENDING_DLQ_URL = Index.propertyBuilder("sleeper.compaction.pending.dlq.url") + .description("The URL of the dead letter queue for pending compaction job batches.") + .propertyGroup(InstancePropertyGroup.COMPACTION) + .build(); + CdkDefinedInstanceProperty COMPACTION_PENDING_DLQ_ARN = Index.propertyBuilder("sleeper.compaction.pending.dlq.arn") + .description("The ARN of the dead letter queue for pending compaction job batches.") + .propertyGroup(InstancePropertyGroup.COMPACTION) + .build(); CdkDefinedInstanceProperty COMPACTION_TASK_CREATION_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.compaction.task.creation.lambda.function") .description("The function name of the compaction task creation lambda.") diff --git a/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java b/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java index a75e2accca..8e51119e83 100644 --- a/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java +++ b/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java @@ -293,7 +293,7 @@ public interface CompactionProperty { "The batch will be sent if all input files have been successfully assigned to the jobs, otherwise " + "the batch will be retried after a delay.") .defaultValue("30") - .validationPredicate(SleeperPropertyValueUtils::isPositiveInteger) + .validationPredicate(SleeperPropertyValueUtils::isNonNegativeInteger) .propertyGroup(InstancePropertyGroup.COMPACTION).build(); UserDefinedInstanceProperty DEFAULT_SIZERATIO_COMPACTION_STRATEGY_RATIO = Index.propertyBuilder("sleeper.default.table.compaction.strategy.sizeratio.ratio") .description("Used by the SizeRatioCompactionStrategy to decide if a group of files should be compacted.\n" +