From e3d92d7687228f84fe54e1dccb0b6ea1d31cdd8d Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Mon, 12 Aug 2024 09:51:12 +0000 Subject: [PATCH] Send message batches in parallel in AwsStateStoreCommitterDriver --- .../statestore/AwsStateStoreCommitterDriver.java | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java index 81671394a9..bd174a429b 100644 --- a/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java +++ b/java/system-test/system-test-drivers/src/main/java/sleeper/systemtest/drivers/statestore/AwsStateStoreCommitterDriver.java @@ -19,11 +19,11 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import sleeper.core.util.SplitIntoBatches; import sleeper.systemtest.dsl.instance.SystemTestInstanceContext; import sleeper.systemtest.dsl.statestore.StateStoreCommitMessage; import sleeper.systemtest.dsl.statestore.StateStoreCommitterDriver; -import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.stream.Stream; @@ -43,17 +43,8 @@ public AwsStateStoreCommitterDriver(SystemTestInstanceContext instance, AmazonSQ @Override public void sendCommitMessages(Stream messages) { - List batch = new ArrayList<>(); - messages.forEach(message -> { - batch.add(message); - if (batch.size() == 10) { - sendMessageBatch(batch); - batch.clear(); - } - }); - if (!batch.isEmpty()) { - sendMessageBatch(batch); - } + SplitIntoBatches.toListsOfSize(10, messages) + .parallel().forEach(this::sendMessageBatch); } private void sendMessageBatch(List batch) {