diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java
index e750c70a8c..9447c6a5fa 100644
--- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java
+++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobBatchExpiredException.java
@@ -15,10 +15,12 @@
*/
package sleeper.compaction.core.job.dispatch;
+import java.time.Instant;
+
public class CompactionJobBatchExpiredException extends RuntimeException {
- public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request) {
- super("Dispatch request for table " + request.getTableId() + " expired at " + request.getExpiryTime() + ", batch key: " + request.getBatchKey());
+ public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request, Instant expiryTime) {
+ super("Dispatch request for table " + request.getTableId() + " expired at " + expiryTime + ", batch key: " + request.getBatchKey());
}
}
diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java
index bf800863fc..149cdad195 100644
--- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java
+++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequest.java
@@ -19,31 +19,28 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.table.TableFilePaths;
-import java.time.Duration;
import java.time.Instant;
import java.util.Objects;
-import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS;
import static sleeper.core.properties.table.TableProperty.TABLE_ID;
public class CompactionJobDispatchRequest {
private final String tableId;
private final String batchKey;
- private final Instant expiryTime;
+ private final Instant createTime;
- private CompactionJobDispatchRequest(String tableId, String batchKey, Instant expiryTime) {
+ private CompactionJobDispatchRequest(String tableId, String batchKey, Instant createTime) {
this.tableId = tableId;
this.batchKey = batchKey;
- this.expiryTime = expiryTime;
+ this.createTime = createTime;
}
public static CompactionJobDispatchRequest forTableWithBatchIdAtTime(
InstanceProperties instanceProperties, TableProperties tableProperties, String batchId, Instant timeNow) {
String batchKey = TableFilePaths.buildDataFilePathPrefix(instanceProperties, tableProperties)
.constructCompactionJobBatchPath(batchId);
- Duration sendTimeout = Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS));
- return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow.plus(sendTimeout));
+ return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow);
}
public String getTableId() {
@@ -54,13 +51,13 @@ public String getBatchKey() {
return batchKey;
}
- public Instant getExpiryTime() {
- return expiryTime;
+ public Instant getCreateTime() {
+ return createTime;
}
@Override
public int hashCode() {
- return Objects.hash(tableId, batchKey, expiryTime);
+ return Objects.hash(tableId, batchKey, createTime);
}
@Override
@@ -72,11 +69,11 @@ public boolean equals(Object obj) {
return false;
}
CompactionJobDispatchRequest other = (CompactionJobDispatchRequest) obj;
- return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(expiryTime, other.expiryTime);
+ return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(createTime, other.createTime);
}
@Override
public String toString() {
- return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", expiryTime=" + expiryTime + "}";
+ return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", createTime=" + createTime + "}";
}
}
diff --git a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java
index c88556abee..28fb8aa3d4 100644
--- a/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java
+++ b/java/compaction/compaction-core/src/main/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatcher.java
@@ -22,12 +22,14 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreProvider;
+import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Supplier;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_RETRY_DELAY_SECS;
+import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS;
public class CompactionJobDispatcher {
@@ -60,10 +62,12 @@ public void dispatch(CompactionJobDispatchRequest request) {
sendJob.send(job);
}
} else {
- if (timeSupplier.get().isAfter(request.getExpiryTime())) {
- throw new CompactionJobBatchExpiredException(request);
- }
TableProperties tableProperties = tablePropertiesProvider.getById(request.getTableId());
+ Instant expiryTime = request.getCreateTime().plus(
+ Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS)));
+ if (timeSupplier.get().isAfter(expiryTime)) {
+ throw new CompactionJobBatchExpiredException(request, expiryTime);
+ }
returnToPendingQueue.sendWithDelay(request, tableProperties.getInt(COMPACTION_JOB_SEND_RETRY_DELAY_SECS));
}
}
diff --git a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json
index 57074c06cb..f84c853064 100644
--- a/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json
+++ b/java/compaction/compaction-core/src/test/java/sleeper/compaction/core/job/dispatch/CompactionJobDispatchRequestSerDeTest.shouldConvertRequestToFromJson.approved.json
@@ -1,5 +1,5 @@
{
"tableId": "test-table",
"batchKey": "s3a://test-bucket/test-table/compactions/test-batch.json",
- "expiryTime": 1731931350000
-}
\ No newline at end of file
+ "createTime": 1731931260000
+}
diff --git a/java/compaction/compaction-job-creation-lambda/pom.xml b/java/compaction/compaction-job-creation-lambda/pom.xml
index 2c530b2ae2..d913500fda 100644
--- a/java/compaction/compaction-job-creation-lambda/pom.xml
+++ b/java/compaction/compaction-job-creation-lambda/pom.xml
@@ -72,6 +72,43 @@
common-invoke-tables
${project.parent.version}
+
+
+ org.testcontainers
+ localstack
+ test
+
+
+ org.testcontainers
+ testcontainers
+ test
+
+
+ org.testcontainers
+ junit-jupiter
+ test
+
+
+ sleeper
+ core
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ sleeper
+ configuration
+ ${project.parent.version}
+ test-jar
+ test
+
+
+ sleeper
+ parquet
+ ${project.parent.version}
+ test-jar
+ test
+
diff --git a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java
new file mode 100644
index 0000000000..c91570bdb9
--- /dev/null
+++ b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambda.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.compaction.job.creation.lambda;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.lambda.runtime.Context;
+import com.amazonaws.services.lambda.runtime.RequestHandler;
+import com.amazonaws.services.lambda.runtime.events.SQSEvent;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.SendMessageRequest;
+import org.apache.hadoop.conf.Configuration;
+
+import sleeper.compaction.core.job.CompactionJobSerDe;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReadBatch;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReturnRequestToPendingQueue;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.SendJob;
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.parquet.utils.HadoopConfigurationProvider;
+import sleeper.statestore.StateStoreFactory;
+
+import java.time.Instant;
+import java.util.function.Supplier;
+
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
+
+/**
+ * Sends compaction jobs in batches from the pending jobs queue, running in AWS Lambda.
+ * The jobs are created by {@link CreateCompactionJobsLambda}, then are sent in batches in this class.
+ * This lambda also handles waiting for input files to be assigned to the jobs,
+ * when that is done asynchronously. Runs batches with {@link CompactionJobDispatcher}.
+ */
+public class CompactionJobDispatchLambda implements RequestHandler {
+
+ private final CompactionJobDispatcher dispatcher;
+ private final CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe();
+
+ public CompactionJobDispatchLambda() {
+ AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient();
+ AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
+ AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
+ String configBucket = System.getenv(CONFIG_BUCKET.toEnvironmentVariable());
+ InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(s3, configBucket);
+ Configuration conf = HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties);
+ dispatcher = dispatcher(s3, dynamoDB, sqs, conf, instanceProperties, Instant::now);
+ }
+
+ @Override
+ public Void handleRequest(SQSEvent event, Context context) {
+ event.getRecords().forEach(message -> dispatcher.dispatch(serDe.fromJson(message.getBody())));
+ return null;
+ }
+
+ public static CompactionJobDispatcher dispatcher(
+ AmazonS3 s3, AmazonDynamoDB dynamoDB, AmazonSQS sqs, Configuration conf, InstanceProperties instanceProperties, Supplier timeSupplier) {
+ CompactionJobSerDe compactionJobSerDe = new CompactionJobSerDe();
+ return new CompactionJobDispatcher(instanceProperties,
+ S3TableProperties.createProvider(instanceProperties, s3, dynamoDB),
+ StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf),
+ readBatch(s3, compactionJobSerDe),
+ sendJob(instanceProperties, sqs, compactionJobSerDe),
+ returnToQueue(instanceProperties, sqs), timeSupplier);
+ }
+
+ private static SendJob sendJob(InstanceProperties instanceProperties, AmazonSQS sqs, CompactionJobSerDe compactionJobSerDe) {
+ return compactionJob -> sqs.sendMessage(
+ instanceProperties.get(COMPACTION_JOB_QUEUE_URL),
+ compactionJobSerDe.toJson(compactionJob));
+ }
+
+ private static ReadBatch readBatch(AmazonS3 s3, CompactionJobSerDe compactionJobSerDe) {
+ return (bucketName, key) -> compactionJobSerDe.batchFromJson(s3.getObjectAsString(bucketName, key));
+ }
+
+ private static ReturnRequestToPendingQueue returnToQueue(InstanceProperties instanceProperties, AmazonSQS sqs) {
+ CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe();
+ return (request, delaySeconds) -> sqs.sendMessage(new SendMessageRequest()
+ .withQueueUrl(instanceProperties.get(COMPACTION_PENDING_QUEUE_URL))
+ .withMessageBody(serDe.toJson(request))
+ .withDelaySeconds(delaySeconds));
+ }
+}
diff --git a/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java
new file mode 100644
index 0000000000..be7d327b12
--- /dev/null
+++ b/java/compaction/compaction-job-creation-lambda/src/test/java/sleeper/compaction/job/creation/lambda/CompactionJobDispatchLambdaIT.java
@@ -0,0 +1,218 @@
+/*
+ * Copyright 2022-2024 Crown Copyright
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package sleeper.compaction.job.creation.lambda;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.AmazonS3ClientBuilder;
+import com.amazonaws.services.sqs.AmazonSQS;
+import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
+import com.amazonaws.services.sqs.model.Message;
+import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
+import com.amazonaws.services.sqs.model.ReceiveMessageResult;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.localstack.LocalStackContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+
+import sleeper.compaction.core.job.CompactionJob;
+import sleeper.compaction.core.job.CompactionJobFactory;
+import sleeper.compaction.core.job.CompactionJobSerDe;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequest;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe;
+import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher;
+import sleeper.configuration.properties.S3InstanceProperties;
+import sleeper.configuration.properties.S3TableProperties;
+import sleeper.configuration.table.index.DynamoDBTableIndexCreator;
+import sleeper.core.CommonTestConstants;
+import sleeper.core.partition.PartitionTree;
+import sleeper.core.partition.PartitionsBuilder;
+import sleeper.core.properties.instance.InstanceProperties;
+import sleeper.core.properties.table.TableProperties;
+import sleeper.core.schema.Schema;
+import sleeper.core.statestore.FileReference;
+import sleeper.core.statestore.FileReferenceFactory;
+import sleeper.core.statestore.StateStore;
+import sleeper.core.statestore.StateStoreProvider;
+import sleeper.parquet.utils.HadoopConfigurationLocalStackUtils;
+import sleeper.statestore.StateStoreFactory;
+import sleeper.statestore.transactionlog.TransactionLogStateStoreCreator;
+
+import java.time.Instant;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;
+import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
+import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_RETRY_DELAY_SECS;
+import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS;
+import static sleeper.core.properties.testutils.InstancePropertiesTestHelper.createTestInstanceProperties;
+import static sleeper.core.properties.testutils.TablePropertiesTestHelper.createTestTableProperties;
+import static sleeper.core.schema.SchemaTestHelper.schemaWithKey;
+
+@Testcontainers
+public class CompactionJobDispatchLambdaIT {
+
+ @Container
+ public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE))
+ .withServices(LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB);
+
+ AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard());
+ AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.DYNAMODB, AmazonDynamoDBClientBuilder.standard());
+ AmazonSQS sqs = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.SQS, AmazonSQSClientBuilder.standard());
+ Configuration conf = HadoopConfigurationLocalStackUtils.getHadoopConfiguration(localStackContainer);
+
+ InstanceProperties instanceProperties = createInstance();
+ StateStoreProvider stateStoreProvider = StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf);
+ Schema schema = schemaWithKey("key");
+ PartitionTree partitions = new PartitionsBuilder(schema).singlePartition("root").buildTree();
+ TableProperties tableProperties = addTable(instanceProperties, schema, partitions);
+ FileReferenceFactory fileFactory = FileReferenceFactory.from(partitions);
+ CompactionJobFactory compactionFactory = new CompactionJobFactory(instanceProperties, tableProperties);
+ StateStore stateStore = stateStoreProvider.getStateStore(tableProperties);
+
+ @Test
+ void shouldSendCompactionJobsInABatchWhenAllFilesAreAssigned() {
+
+ // Given
+ FileReference file1 = fileFactory.rootFile("test1.parquet", 1234);
+ FileReference file2 = fileFactory.rootFile("test2.parquet", 5678);
+ CompactionJob job1 = compactionFactory.createCompactionJob("test-job-1", List.of(file1), "root");
+ CompactionJob job2 = compactionFactory.createCompactionJob("test-job-2", List.of(file2), "root");
+ stateStore.addFiles(List.of(file1, file2));
+ assignJobIds(List.of(job1, job2));
+
+ CompactionJobDispatchRequest request = generateBatchRequestAtTime(
+ "test-batch", Instant.parse("2024-11-15T10:30:00Z"));
+ putCompactionJobBatch(request, List.of(job1, job2));
+
+ // When
+ dispatchWithNoRetry(request);
+
+ // Then
+ assertThat(receiveCompactionJobs()).containsExactly(job1, job2);
+ }
+
+ @Test
+ void shouldReturnBatchToTheQueueIfTheFilesForTheBatchAreUnassigned() throws Exception {
+
+ // Given
+ FileReference file1 = fileFactory.rootFile("test3.parquet", 1234);
+ FileReference file2 = fileFactory.rootFile("test4.parquet", 5678);
+ CompactionJob job1 = compactionFactory.createCompactionJob("test-job-3", List.of(file1), "root");
+ CompactionJob job2 = compactionFactory.createCompactionJob("test-job-4", List.of(file2), "root");
+ stateStore.addFiles(List.of(file1, file2));
+
+ tableProperties.setNumber(COMPACTION_JOB_SEND_TIMEOUT_SECS, 123);
+ tableProperties.setNumber(COMPACTION_JOB_SEND_RETRY_DELAY_SECS, 0);
+ saveTableProperties();
+
+ CompactionJobDispatchRequest request = generateBatchRequestAtTime(
+ "test-batch", Instant.parse("2024-11-21T10:20:00Z"));
+ Instant retryTime = Instant.parse("2024-11-21T10:22:00Z");
+ putCompactionJobBatch(request, List.of(job1, job2));
+
+ // When
+ dispatchWithTimeAtRetryCheck(request, retryTime);
+
+ // Then
+ assertThat(recievePendingBatches()).containsExactly(request);
+ }
+
+ private InstanceProperties createInstance() {
+ InstanceProperties instanceProperties = createTestInstanceProperties();
+ instanceProperties.set(COMPACTION_JOB_QUEUE_URL, sqs.createQueue(UUID.randomUUID().toString()).getQueueUrl());
+ instanceProperties.set(COMPACTION_PENDING_QUEUE_URL, sqs.createQueue(UUID.randomUUID().toString()).getQueueUrl());
+
+ DynamoDBTableIndexCreator.create(dynamoDB, instanceProperties);
+ new TransactionLogStateStoreCreator(instanceProperties, dynamoDB).create();
+
+ s3.createBucket(instanceProperties.get(CONFIG_BUCKET));
+ s3.createBucket(instanceProperties.get(DATA_BUCKET));
+ S3InstanceProperties.saveToS3(s3, instanceProperties);
+
+ return instanceProperties;
+ }
+
+ private TableProperties addTable(InstanceProperties instanceProperties, Schema schema, PartitionTree partitions) {
+ TableProperties tableProperties = createTestTableProperties(instanceProperties, schema);
+ S3TableProperties.createStore(instanceProperties, s3, dynamoDB)
+ .createTable(tableProperties);
+ stateStoreProvider.getStateStore(tableProperties)
+ .initialise(partitions.getAllPartitions());
+ return tableProperties;
+ }
+
+ private void saveTableProperties() {
+ S3TableProperties.createStore(instanceProperties, s3, dynamoDB)
+ .save(tableProperties);
+ }
+
+ private void assignJobIds(List jobs) {
+ for (CompactionJob job : jobs) {
+ stateStore.assignJobIds(List.of(job.createAssignJobIdRequest()));
+ }
+ }
+
+ private CompactionJobDispatchRequest generateBatchRequestAtTime(String batchId, Instant timeNow) {
+ return CompactionJobDispatchRequest.forTableWithBatchIdAtTime(
+ instanceProperties, tableProperties, batchId, timeNow);
+ }
+
+ private void putCompactionJobBatch(CompactionJobDispatchRequest request, List jobs) {
+ s3.putObject(
+ instanceProperties.get(DATA_BUCKET),
+ request.getBatchKey(),
+ new CompactionJobSerDe().toJson(jobs));
+ }
+
+ private void dispatchWithNoRetry(CompactionJobDispatchRequest request) {
+ dispatcher(List.of()).dispatch(request);
+ }
+
+ private void dispatchWithTimeAtRetryCheck(CompactionJobDispatchRequest request, Instant time) {
+ dispatcher(List.of(time)).dispatch(request);
+ }
+
+ private CompactionJobDispatcher dispatcher(List times) {
+ return CompactionJobDispatchLambda.dispatcher(s3, dynamoDB, sqs, conf, instanceProperties, times.iterator()::next);
+ }
+
+ private List receiveCompactionJobs() {
+ ReceiveMessageResult result = sqs.receiveMessage(new ReceiveMessageRequest()
+ .withQueueUrl(instanceProperties.get(COMPACTION_JOB_QUEUE_URL))
+ .withMaxNumberOfMessages(10));
+ return result.getMessages().stream()
+ .map(Message::getBody)
+ .map(new CompactionJobSerDe()::fromJson).toList();
+ }
+
+ private List recievePendingBatches() {
+ ReceiveMessageResult result = sqs.receiveMessage(new ReceiveMessageRequest()
+ .withQueueUrl(instanceProperties.get(COMPACTION_PENDING_QUEUE_URL))
+ .withMaxNumberOfMessages(10));
+ return result.getMessages().stream()
+ .map(Message::getBody)
+ .map(new CompactionJobDispatchRequestSerDe()::fromJson).toList();
+ }
+}
diff --git a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java
index cb6fd25d33..0726e2008f 100644
--- a/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java
+++ b/java/compaction/compaction-job-creation/src/test/java/sleeper/compaction/job/creation/CreateCompactionJobsIT.java
@@ -81,7 +81,7 @@ public class CreateCompactionJobsIT {
@Container
public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE)).withServices(
- LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB, LocalStackContainer.Service.IAM);
+ LocalStackContainer.Service.S3, LocalStackContainer.Service.SQS, LocalStackContainer.Service.DYNAMODB);
private final AmazonS3 s3 = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.S3, AmazonS3ClientBuilder.standard());
private final AmazonDynamoDB dynamoDB = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.DYNAMODB, AmazonDynamoDBClientBuilder.standard());
diff --git a/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java b/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java
index dc60f5a9a1..3350b56752 100644
--- a/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java
+++ b/java/core/src/main/java/sleeper/core/properties/instance/CdkDefinedInstanceProperty.java
@@ -328,6 +328,22 @@ public interface CdkDefinedInstanceProperty extends InstanceProperty {
.description("The ARN of the dead letter queue for compaction jobs.")
.propertyGroup(InstancePropertyGroup.COMPACTION)
.build();
+ CdkDefinedInstanceProperty COMPACTION_PENDING_QUEUE_URL = Index.propertyBuilder("sleeper.compaction.pending.queue.url")
+ .description("The URL of the queue for pending compaction job batches.")
+ .propertyGroup(InstancePropertyGroup.COMPACTION)
+ .build();
+ CdkDefinedInstanceProperty COMPACTION_PENDING_QUEUE_ARN = Index.propertyBuilder("sleeper.compaction.pending.queue.arn")
+ .description("The ARN of the queue for pending compaction job batches.")
+ .propertyGroup(InstancePropertyGroup.COMPACTION)
+ .build();
+ CdkDefinedInstanceProperty COMPACTION_PENDING_DLQ_URL = Index.propertyBuilder("sleeper.compaction.pending.dlq.url")
+ .description("The URL of the dead letter queue for pending compaction job batches.")
+ .propertyGroup(InstancePropertyGroup.COMPACTION)
+ .build();
+ CdkDefinedInstanceProperty COMPACTION_PENDING_DLQ_ARN = Index.propertyBuilder("sleeper.compaction.pending.dlq.arn")
+ .description("The ARN of the dead letter queue for pending compaction job batches.")
+ .propertyGroup(InstancePropertyGroup.COMPACTION)
+ .build();
CdkDefinedInstanceProperty COMPACTION_TASK_CREATION_LAMBDA_FUNCTION = Index.propertyBuilder("sleeper.compaction.task.creation.lambda.function")
.description("The function name of the compaction task creation lambda.")
diff --git a/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java b/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java
index a75e2accca..8e51119e83 100644
--- a/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java
+++ b/java/core/src/main/java/sleeper/core/properties/instance/CompactionProperty.java
@@ -293,7 +293,7 @@ public interface CompactionProperty {
"The batch will be sent if all input files have been successfully assigned to the jobs, otherwise " +
"the batch will be retried after a delay.")
.defaultValue("30")
- .validationPredicate(SleeperPropertyValueUtils::isPositiveInteger)
+ .validationPredicate(SleeperPropertyValueUtils::isNonNegativeInteger)
.propertyGroup(InstancePropertyGroup.COMPACTION).build();
UserDefinedInstanceProperty DEFAULT_SIZERATIO_COMPACTION_STRATEGY_RATIO = Index.propertyBuilder("sleeper.default.table.compaction.strategy.sizeratio.ratio")
.description("Used by the SizeRatioCompactionStrategy to decide if a group of files should be compacted.\n" +