Skip to content

Commit

Permalink
Send message batches in parallel in AwsStateStoreCommitterDriver
Browse files Browse the repository at this point in the history
  • Loading branch information
patchwork01 committed Aug 12, 2024
1 parent 09698a9 commit e3d92d7
Showing 1 changed file with 3 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
import com.amazonaws.services.sqs.model.SendMessageBatchRequest;
import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry;

import sleeper.core.util.SplitIntoBatches;
import sleeper.systemtest.dsl.instance.SystemTestInstanceContext;
import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage;
import sleeper.systemtest.dsl.statestore.StateStoreCommitterDriver;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.stream.Stream;
Expand All @@ -43,17 +43,8 @@ public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQ

@Override
public void sendCommitMessages(Stream<StateStoreCommitMessage> messages) {
List<StateStoreCommitMessage> batch = new ArrayList<>();
messages.forEach(message -> {
batch.add(message);
if (batch.size() == 10) {
sendMessageBatch(batch);
batch.clear();
}
});
if (!batch.isEmpty()) {
sendMessageBatch(batch);
}
SplitIntoBatches.toListsOfSize(10, messages)
.parallel().forEach(this::sendMessageBatch);
}

private void sendMessageBatch(List<StateStoreCommitMessage> batch) {
Expand Down

0 comments on commit e3d92d7

Please sign in to comment.