Skip to content

Commit

Permalink
Use v2 SQS client to drain compaction jobs queue
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Dec 4, 2024
1 parent 2ba1a90 commit 3c7b66b
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequest;
import com.amazonaws.services.sqs.model.DeleteMessageBatchRequestEntry;
import com.amazonaws.services.sqs.model.DeleteMessageBatchResult;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.autoscaling.AutoScalingClient;
import software.amazon.awssdk.services.ec2.Ec2Client;
import software.amazon.awssdk.services.ecs.EcsClient;
import software.amazon.awssdk.services.lambda.LambdaClient;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageBatchResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;

import sleeper.clients.deploy.InvokeLambda;
import sleeper.compaction.core.job.CompactionJob;
Expand Down Expand Up @@ -69,6 +68,7 @@ public class AwsCompactionDriver implements CompactionDriver {
private final AmazonDynamoDB dynamoDBClient;
private final AmazonS3 s3Client;
private final AmazonSQS sqsClient;
private final SqsClient sqsClientV2;
private final EcsClient ecsClient;
private final AutoScalingClient asClient;
private final Ec2Client ec2Client;
Expand All @@ -80,6 +80,7 @@ public AwsCompactionDriver(SystemTestInstanceContext instance, SystemTestClients
this.dynamoDBClient = clients.getDynamoDB();
this.s3Client = clients.getS3();
this.sqsClient = clients.getSqs();
this.sqsClientV2 = clients.getSqsV2();
this.ecsClient = clients.getEcs();
this.asClient = clients.getAutoScaling();
this.ec2Client = clients.getEc2();
Expand Down Expand Up @@ -164,26 +165,27 @@ public List<CompactionJob> drainJobsQueueForWholeInstance() {
}

private List<CompactionJob> receiveJobs(String queueUrl) {
ReceiveMessageResult receiveResult = sqsClient.receiveMessage(new ReceiveMessageRequest()
.withQueueUrl(queueUrl)
.withMaxNumberOfMessages(10)
.withWaitTimeSeconds(5));
List<Message> messages = receiveResult.getMessages();
ReceiveMessageResponse receiveResult = sqsClientV2.receiveMessage(request -> request
.queueUrl(queueUrl)
.maxNumberOfMessages(10)
.waitTimeSeconds(5));
List<Message> messages = receiveResult.messages();
if (messages.isEmpty()) {
return List.of();
}
DeleteMessageBatchResult deleteResult = sqsClient.deleteMessageBatch(new DeleteMessageBatchRequest()
.withQueueUrl(queueUrl)
.withEntries(messages.stream()
.map(message -> new DeleteMessageBatchRequestEntry()
.withId(message.getMessageId())
.withReceiptHandle(message.getReceiptHandle()))
DeleteMessageBatchResponse deleteResult = sqsClientV2.deleteMessageBatch(request -> request
.queueUrl(queueUrl)
.entries(messages.stream()
.map(message -> DeleteMessageBatchRequestEntry.builder()
.id(message.messageId())
.receiptHandle(message.receiptHandle())
.build())
.toList()));
if (!deleteResult.getFailed().isEmpty()) {
throw new RuntimeException("Failed deleting compaction job messages: " + deleteResult.getFailed());
if (!deleteResult.failed().isEmpty()) {
throw new RuntimeException("Failed deleting compaction job messages: " + deleteResult.failed());
}
return messages.stream()
.map(Message::getBody)
.map(Message::body)
.map(serDe::fromJson)
.toList();
}
Expand Down

0 comments on commit 3c7b66b

Please sign in to comment.