Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 3689 - Lambda code for compaction job dispatch #3749

Merged
merged 12 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
*/
package sleeper.compaction.core.job.dispatch;

import java.time.Instant;

public class CompactionJobBatchExpiredException extends RuntimeException {

public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request) {
super("Dispatch request for table " + request.getTableId() + " expired at " + request.getExpiryTime() + ", batch key: " + request.getBatchKey());
public CompactionJobBatchExpiredException(CompactionJobDispatchRequest request, Instant expiryTime) {
super("Dispatch request for table " + request.getTableId() + " expired at " + expiryTime + ", batch key: " + request.getBatchKey());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,31 +19,28 @@
import sleeper.core.properties.table.TableProperties;
import sleeper.core.table.TableFilePaths;

import java.time.Duration;
import java.time.Instant;
import java.util.Objects;

import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS;
import static sleeper.core.properties.table.TableProperty.TABLE_ID;

public class CompactionJobDispatchRequest {

private final String tableId;
private final String batchKey;
private final Instant expiryTime;
private final Instant createTime;

private CompactionJobDispatchRequest(String tableId, String batchKey, Instant expiryTime) {
private CompactionJobDispatchRequest(String tableId, String batchKey, Instant createTime) {
this.tableId = tableId;
this.batchKey = batchKey;
this.expiryTime = expiryTime;
this.createTime = createTime;
}

public static CompactionJobDispatchRequest forTableWithBatchIdAtTime(
InstanceProperties instanceProperties, TableProperties tableProperties, String batchId, Instant timeNow) {
String batchKey = TableFilePaths.buildDataFilePathPrefix(instanceProperties, tableProperties)
.constructCompactionJobBatchPath(batchId);
Duration sendTimeout = Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS));
return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow.plus(sendTimeout));
return new CompactionJobDispatchRequest(tableProperties.get(TABLE_ID), batchKey, timeNow);
}

public String getTableId() {
Expand All @@ -54,13 +51,13 @@ public String getBatchKey() {
return batchKey;
}

public Instant getExpiryTime() {
return expiryTime;
public Instant getCreateTime() {
return createTime;
}

@Override
public int hashCode() {
return Objects.hash(tableId, batchKey, expiryTime);
return Objects.hash(tableId, batchKey, createTime);
}

@Override
Expand All @@ -72,11 +69,11 @@ public boolean equals(Object obj) {
return false;
}
CompactionJobDispatchRequest other = (CompactionJobDispatchRequest) obj;
return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(expiryTime, other.expiryTime);
return Objects.equals(tableId, other.tableId) && Objects.equals(batchKey, other.batchKey) && Objects.equals(createTime, other.createTime);
}

@Override
public String toString() {
return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", expiryTime=" + expiryTime + "}";
return "CompactionJobDispatchRequest{tableId=" + tableId + ", batchKey=" + batchKey + ", createTime=" + createTime + "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import sleeper.core.statestore.StateStore;
import sleeper.core.statestore.StateStoreProvider;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.function.Supplier;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.DATA_BUCKET;
import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_RETRY_DELAY_SECS;
import static sleeper.core.properties.table.TableProperty.COMPACTION_JOB_SEND_TIMEOUT_SECS;

public class CompactionJobDispatcher {

Expand Down Expand Up @@ -60,10 +62,12 @@ public void dispatch(CompactionJobDispatchRequest request) {
sendJob.send(job);
}
} else {
if (timeSupplier.get().isAfter(request.getExpiryTime())) {
throw new CompactionJobBatchExpiredException(request);
}
TableProperties tableProperties = tablePropertiesProvider.getById(request.getTableId());
Instant expiryTime = request.getCreateTime().plus(
Duration.ofSeconds(tableProperties.getInt(COMPACTION_JOB_SEND_TIMEOUT_SECS)));
if (timeSupplier.get().isAfter(expiryTime)) {
throw new CompactionJobBatchExpiredException(request, expiryTime);
}
returnToPendingQueue.sendWithDelay(request, tableProperties.getInt(COMPACTION_JOB_SEND_RETRY_DELAY_SECS));
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"tableId": "test-table",
"batchKey": "s3a://test-bucket/test-table/compactions/test-batch.json",
"expiryTime": 1731931350000
}
"createTime": 1731931260000
}
37 changes: 37 additions & 0 deletions java/compaction/compaction-job-creation-lambda/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,43 @@
<artifactId>common-invoke-tables</artifactId>
<version>${project.parent.version}</version>
</dependency>
<!-- Test dependencies -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>localstack</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>core</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>configuration</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>sleeper</groupId>
<artifactId>parquet</artifactId>
<version>${project.parent.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright 2022-2024 Crown Copyright
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package sleeper.compaction.job.creation.lambda;

import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import org.apache.hadoop.conf.Configuration;

import sleeper.compaction.core.job.CompactionJobSerDe;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatchRequestSerDe;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReadBatch;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.ReturnRequestToPendingQueue;
import sleeper.compaction.core.job.dispatch.CompactionJobDispatcher.SendJob;
import sleeper.configuration.properties.S3InstanceProperties;
import sleeper.configuration.properties.S3TableProperties;
import sleeper.core.properties.instance.InstanceProperties;
import sleeper.parquet.utils.HadoopConfigurationProvider;
import sleeper.statestore.StateStoreFactory;

import java.time.Instant;
import java.util.function.Supplier;

import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.COMPACTION_PENDING_QUEUE_URL;
import static sleeper.core.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET;

public class CompactionJobDispatchLambda implements RequestHandler<SQSEvent, Void> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to have some Javadoc for this class.


private final CompactionJobDispatcher dispatcher;
private final CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe();

private CompactionJobDispatchLambda() {
AmazonS3 s3 = AmazonS3ClientBuilder.defaultClient();
AmazonDynamoDB dynamoDB = AmazonDynamoDBClientBuilder.defaultClient();
AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();
String configBucket = System.getenv(CONFIG_BUCKET.toEnvironmentVariable());
InstanceProperties instanceProperties = S3InstanceProperties.loadFromBucket(s3, configBucket);
Configuration conf = HadoopConfigurationProvider.getConfigurationForLambdas(instanceProperties);
dispatcher = dispatcher(s3, dynamoDB, sqs, conf, instanceProperties, Instant::now);
}

@Override
public Void handleRequest(SQSEvent event, Context context) {
event.getRecords().forEach(message -> dispatcher.dispatch(serDe.fromJson(message.getBody())));
return null;
}

public static CompactionJobDispatcher dispatcher(
AmazonS3 s3, AmazonDynamoDB dynamoDB, AmazonSQS sqs, Configuration conf, InstanceProperties instanceProperties, Supplier<Instant> timeSupplier) {
CompactionJobSerDe compactionJobSerDe = new CompactionJobSerDe();
return new CompactionJobDispatcher(instanceProperties,
S3TableProperties.createProvider(instanceProperties, s3, dynamoDB),
StateStoreFactory.createProvider(instanceProperties, s3, dynamoDB, conf),
readBatch(s3, compactionJobSerDe),
sendJob(instanceProperties, sqs, compactionJobSerDe),
returnToQueue(instanceProperties, sqs), timeSupplier);
}

private static SendJob sendJob(InstanceProperties instanceProperties, AmazonSQS sqs, CompactionJobSerDe compactionJobSerDe) {
return compactionJob -> sqs.sendMessage(
instanceProperties.get(COMPACTION_JOB_QUEUE_URL),
compactionJobSerDe.toJson(compactionJob));
}

private static ReadBatch readBatch(AmazonS3 s3, CompactionJobSerDe compactionJobSerDe) {
return (bucketName, key) -> compactionJobSerDe.batchFromJson(s3.getObjectAsString(bucketName, key));
}

private static ReturnRequestToPendingQueue returnToQueue(InstanceProperties instanceProperties, AmazonSQS sqs) {
CompactionJobDispatchRequestSerDe serDe = new CompactionJobDispatchRequestSerDe();
return (request, delaySeconds) -> sqs.sendMessage(new SendMessageRequest()
.withQueueUrl(instanceProperties.get(COMPACTION_PENDING_QUEUE_URL))
.withMessageBody(serDe.toJson(request))
.withDelaySeconds(delaySeconds));
}
}
Loading
Loading