From 47ce07736d6a567b2f69b37aa569eaa5e37de3e2 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 10:54:06 +0100 Subject: [PATCH 1/9] Send a message per table in TriggerGarbageCollectionClient --- .../TriggerGarbageCollectionClient.java | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java index b69a9011ec..2b2cb4ae0e 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java @@ -21,16 +21,18 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; -import sleeper.core.table.InvokeForTableRequest; -import sleeper.core.table.InvokeForTableRequestSerDe; import sleeper.core.table.TableIndex; import sleeper.core.table.TableNotFoundException; import sleeper.core.table.TableStatus; +import sleeper.core.util.SplitIntoBatches; import java.util.List; +import java.util.UUID; import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; @@ -63,13 +65,25 @@ public static void main(String[] args) { .orElseThrow(() -> TableNotFoundException.withTableName(name))) .collect(toUnmodifiableList()); String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL); - InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe(); - InvokeForTableRequest.forTables(tables.stream(), tables.size(), - request -> sqsClient.sendMessage(queueUrl, serDe.toJson(request))); + SplitIntoBatches.reusingListOfSize(10, + tables.stream(), + batch -> sendMessageBatch(sqsClient, batch, queueUrl)); } finally { s3Client.shutdown(); dynamoClient.shutdown(); sqsClient.shutdown(); } } + + public static void sendMessageBatch(AmazonSQS sqsClient, List tables, String queueUrl) { + sqsClient.sendMessageBatch(new SendMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries(tables.stream() + .map(table -> new SendMessageBatchRequestEntry() + .withMessageDeduplicationId(UUID.randomUUID().toString()) + .withId(table.getTableUniqueId()) + .withMessageGroupId(table.getTableUniqueId()) + .withMessageBody(table.getTableUniqueId())) + .collect(toUnmodifiableList()))); + } } From 699a096901cf3105fb4db915a87a97627dd5efb5 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 10:58:21 +0100 Subject: [PATCH 2/9] Delete InvokeForTableRequest --- .../core/table/InvokeForTableRequest.java | 92 ------------------- .../table/InvokeForTableRequestSerDe.java | 48 ---------- .../core/table/InvokeForTableRequestTest.java | 89 ------------------ 3 files changed, 229 deletions(-) delete mode 100644 java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java delete mode 100644 java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java delete mode 100644 java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java deleted file mode 100644 index 2689f33f89..0000000000 --- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.core.table; - -import sleeper.core.util.SplitIntoBatches; - -import java.util.List; -import java.util.Objects; -import java.util.function.Consumer; -import java.util.stream.Stream; - -/** - * Model for an SQS message to invoke some operation for a batch of tables. Use {@link InvokeForTableRequestSerDe} to - * build SQS messages from objects of this class. - */ -public class InvokeForTableRequest { - - private final List tableIds; - - public InvokeForTableRequest(List tableIds) { - this.tableIds = tableIds; - } - - public List getTableIds() { - return tableIds; - } - - /** - * Creates batches of tables, and sends a request for each batch. - * - * @param tables a stream of tables to batch - * @param batchSize the batch size - * @param sendRequest a function to send a request - */ - public static void forTables(Stream tables, int batchSize, Consumer sendRequest) { - SplitIntoBatches.reusingListOfSize(batchSize, - tables.map(TableStatus::getTableUniqueId), - tableIds -> sendRequest.accept(new InvokeForTableRequest(tableIds))); - } - - /** - * Loads tables from the table index, creates batches of tables, and sends a request for each batch. - * - * @param offlineEnabled whether to include offline tables when creating batches - * @param tableIndex the table index - * @param batchSize the batch size - * @param sendRequest a function to send a request - */ - public static void forTablesWithOfflineEnabled( - boolean offlineEnabled, TableIndex tableIndex, int batchSize, Consumer sendRequest) { - forTables( - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - batchSize, sendRequest); - } - - @Override - public int hashCode() { - return Objects.hash(tableIds); - } - - @Override - public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (!(obj instanceof InvokeForTableRequest)) { - return false; - } - InvokeForTableRequest other = (InvokeForTableRequest) obj; - return Objects.equals(tableIds, other.tableIds); - } - - @Override - public String toString() { - return "InvokeForTableRequest{tableIds=" + tableIds + "}"; - } - -} diff --git a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java b/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java deleted file mode 100644 index e785734731..0000000000 --- a/java/core/src/main/java/sleeper/core/table/InvokeForTableRequestSerDe.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.core.table; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -/** - * Serialisation and deserialisation of SQS messages to invoke some operation for a batch of tables. - */ -public class InvokeForTableRequestSerDe { - - private final Gson gson = new GsonBuilder().create(); - - /** - * Serialises a request for a batch of tables to a JSON string. - * - * @param request the request - * @return the JSON string - */ - public String toJson(InvokeForTableRequest request) { - return gson.toJson(request); - } - - /** - * Deserialises a request for a batch of tables from a JSON string. - * - * @param json the JSON string - * @return the request - */ - public InvokeForTableRequest fromJson(String json) { - return gson.fromJson(json, InvokeForTableRequest.class); - } -} diff --git a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java b/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java deleted file mode 100644 index f4eb193e3e..0000000000 --- a/java/core/src/test/java/sleeper/core/table/InvokeForTableRequestTest.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package sleeper.core.table; - -import org.junit.jupiter.api.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Stream; - -import static org.assertj.core.api.Assertions.assertThat; - -public class InvokeForTableRequestTest { - - private final InvokeForTableRequestSerDe serDe = new InvokeForTableRequestSerDe(); - - @Test - void shouldSendRequestForTwoTables() { - List sent = new ArrayList<>(); - InvokeForTableRequest.forTables( - Stream.of(table("table-1"), table("table-2")), - 1, request -> sent.add(serDe.toJson(request))); - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("table-1")), - new InvokeForTableRequest(List.of("table-2"))); - } - - @Test - void shouldSendBatchesOf2() { - List sent = new ArrayList<>(); - InvokeForTableRequest.forTables( - Stream.of(table("table-1"), table("table-2")), - 2, request -> sent.add(serDe.toJson(request))); - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("table-1", "table-2"))); - } - - @Test - void shouldSendRequestForOnlyOnlineTable() { - // Given - TableIndex tableIndex = new InMemoryTableIndex(); - tableIndex.create(table("offline-table").takeOffline()); - tableIndex.create(table("online-table")); - - // When - List sent = new ArrayList<>(); - InvokeForTableRequest.forTablesWithOfflineEnabled(false, tableIndex, - 1, request -> sent.add(serDe.toJson(request))); - - // Then - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("online-table"))); - } - - @Test - void shouldSendRequestForAllTablesWhenOfflineEnabled() { - // Given - TableIndex tableIndex = new InMemoryTableIndex(); - tableIndex.create(table("offline-table").takeOffline()); - tableIndex.create(table("online-table")); - - // When - List sent = new ArrayList<>(); - InvokeForTableRequest.forTablesWithOfflineEnabled(true, tableIndex, - 1, request -> sent.add(serDe.toJson(request))); - - // Then - assertThat(sent).extracting(serDe::fromJson).containsExactly( - new InvokeForTableRequest(List.of("offline-table")), - new InvokeForTableRequest(List.of("online-table"))); - } - - private TableStatus table(String tableName) { - return TableStatusTestHelper.uniqueIdAndName(tableName, tableName); - } -} From f84c17ecce8ada560f516779d1d4d72aa583c5be Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 11:44:54 +0100 Subject: [PATCH 3/9] Create InvokeForTables utility --- java/common/common-invoke-tables/pom.xml | 68 ++++++++++++ .../invoke/tables/InvokeForTables.java | 51 +++++++++ .../invoke/tables/InvokeForTablesIT.java | 101 ++++++++++++++++++ .../tables/LocalStackAwsV1ClientHelper.java | 39 +++++++ java/common/pom.xml | 1 + 5 files changed, 260 insertions(+) create mode 100644 java/common/common-invoke-tables/pom.xml create mode 100644 java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java create mode 100644 java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java create mode 100644 java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java diff --git a/java/common/common-invoke-tables/pom.xml b/java/common/common-invoke-tables/pom.xml new file mode 100644 index 0000000000..eb8eb39a0b --- /dev/null +++ b/java/common/common-invoke-tables/pom.xml @@ -0,0 +1,68 @@ + + + + + + common + sleeper + 0.23.0-SNAPSHOT + + + 4.0.0 + + common-invoke-tables + + + + com.amazonaws + aws-java-sdk-sqs + ${aws-java-sdk.version} + + + + sleeper + core + ${project.parent.version} + + + + sleeper + core + ${project.parent.version} + test-jar + test + + + org.testcontainers + testcontainers + test + + + org.testcontainers + localstack + test + + + org.testcontainers + junit-jupiter + test + + + + \ No newline at end of file diff --git a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java new file mode 100644 index 0000000000..fde0188a33 --- /dev/null +++ b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java @@ -0,0 +1,51 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.invoke.tables; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.model.SendMessageBatchRequest; +import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; + +import sleeper.core.table.TableStatus; +import sleeper.core.util.SplitIntoBatches; + +import java.util.List; +import java.util.UUID; + +import static java.util.stream.Collectors.toUnmodifiableList; + +public class InvokeForTables { + + private InvokeForTables() { + } + + public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, List tables) { + SplitIntoBatches.reusingListOfSize(10, tables.stream(), + batch -> sendMessageBatch(sqsClient, queueUrl, batch)); + } + + private static void sendMessageBatch(AmazonSQS sqsClient, String queueUrl, List tablesBatch) { + sqsClient.sendMessageBatch(new SendMessageBatchRequest() + .withQueueUrl(queueUrl) + .withEntries(tablesBatch.stream() + .map(table -> new SendMessageBatchRequestEntry() + .withMessageDeduplicationId(UUID.randomUUID().toString()) + .withId(table.getTableUniqueId()) + .withMessageGroupId(table.getTableUniqueId()) + .withMessageBody(table.getTableUniqueId())) + .collect(toUnmodifiableList()))); + } +} diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java new file mode 100644 index 0000000000..7623e1af02 --- /dev/null +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java @@ -0,0 +1,101 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package sleeper.invoke.tables; + +import com.amazonaws.services.sqs.AmazonSQS; +import com.amazonaws.services.sqs.AmazonSQSClientBuilder; +import com.amazonaws.services.sqs.model.CreateQueueRequest; +import com.amazonaws.services.sqs.model.CreateQueueResult; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.localstack.LocalStackContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; + +import sleeper.core.CommonTestConstants; + +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.IntStream; + +import static java.util.stream.Collectors.toUnmodifiableList; +import static org.assertj.core.api.Assertions.assertThat; +import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName; +import static sleeper.invoke.tables.LocalStackAwsV1ClientHelper.buildAwsV1Client; + +@Testcontainers +public class InvokeForTablesIT { + + @Container + public static LocalStackContainer localStackContainer = new LocalStackContainer(DockerImageName.parse(CommonTestConstants.LOCALSTACK_DOCKER_IMAGE)) + .withServices(LocalStackContainer.Service.SQS); + + private final AmazonSQS sqsClient = buildAwsV1Client(localStackContainer, LocalStackContainer.Service.SQS, AmazonSQSClientBuilder.standard()); + + @Test + void shouldSendOneMessage() { + // Given + String queueUrl = createFifoQueueGetUrl(); + + // When + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, List.of( + uniqueIdAndName("table-id", "table-name"))); + + // Then + assertThat(receiveTableIdMessages(queueUrl, 2)) + .containsExactly("table-id"); + } + + @Test + void shouldSendMoreMessagesThanFitInABatch() { + // Given + String queueUrl = createFifoQueueGetUrl(); + + // When + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + IntStream.rangeClosed(1, 11) + .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)) + .collect(toUnmodifiableList())); + + // Then + assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( + "table-id-1", "table-id-2", "table-id-3", "table-id-4", "table-id-5", + "table-id-6", "table-id-7", "table-id-8", "table-id-9", "table-id-10"); + assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( + "table-id-11"); + } + + private String createFifoQueueGetUrl() { + CreateQueueResult result = sqsClient.createQueue(new CreateQueueRequest() + .withQueueName(UUID.randomUUID().toString() + ".fifo") + .withAttributes(Map.of("FifoQueue", "true"))); + return result.getQueueUrl(); + } + + private List receiveTableIdMessages(String queueUrl, int maxMessages) { + ReceiveMessageResult result = sqsClient.receiveMessage( + new ReceiveMessageRequest(queueUrl) + .withMaxNumberOfMessages(maxMessages)); + return result.getMessages().stream() + .map(Message::getBody) + .collect(toUnmodifiableList()); + } + +} diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java new file mode 100644 index 0000000000..7009da59f5 --- /dev/null +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java @@ -0,0 +1,39 @@ +/* + * Copyright 2022-2024 Crown Copyright + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package sleeper.invoke.tables; + +import com.amazonaws.auth.AWSStaticCredentialsProvider; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.client.builder.AwsClientBuilder; +import org.testcontainers.containers.localstack.LocalStackContainer; + +public class LocalStackAwsV1ClientHelper { + + private LocalStackAwsV1ClientHelper() { + } + + public static , T> T buildAwsV1Client(LocalStackContainer localStackContainer, LocalStackContainer.Service service, B builder) { + return builder + .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( + localStackContainer.getEndpointOverride(service).toString(), + localStackContainer.getRegion())) + .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials( + localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) + .build(); + } + +} diff --git a/java/common/pom.xml b/java/common/pom.xml index 8409bfdf62..e04ff6ce0b 100644 --- a/java/common/pom.xml +++ b/java/common/pom.xml @@ -29,6 +29,7 @@ common-job common-task + common-invoke-tables dynamodb-tools From 358d0f7605952964c86cfe7c63a1a04704a84e8d Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:08:09 +0100 Subject: [PATCH 4/9] Invoke table operations with InvokeForTables --- java/clients/pom.xml | 5 ++ .../TriggerGarbageCollectionClient.java | 27 +------- .../invoke/tables/InvokeForTables.java | 15 ++++- .../invoke/tables/InvokeForTablesIT.java | 61 +++++++++++++++++-- .../compaction-job-creation-lambda/pom.xml | 5 ++ .../CreateCompactionJobsTriggerLambda.java | 25 +------- java/garbage-collector/pom.xml | 5 ++ .../GarbageCollectorTriggerLambda.java | 26 +------- java/metrics/pom.xml | 5 ++ .../metrics/TableMetricsTriggerLambda.java | 25 +------- java/splitter/splitter-lambda/pom.xml | 5 ++ .../FindPartitionsToSplitTriggerLambda.java | 25 +------- 12 files changed, 107 insertions(+), 122 deletions(-) diff --git a/java/clients/pom.xml b/java/clients/pom.xml index 1581690842..ce9125cfd0 100644 --- a/java/clients/pom.xml +++ b/java/clients/pom.xml @@ -136,6 +136,11 @@ ingest-batcher-store ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java index 2b2cb4ae0e..a9fcbb7115 100644 --- a/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java +++ b/java/clients/src/main/java/sleeper/clients/status/update/TriggerGarbageCollectionClient.java @@ -21,18 +21,13 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableNotFoundException; -import sleeper.core.table.TableStatus; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.util.List; -import java.util.UUID; import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; @@ -60,30 +55,12 @@ public static void main(String[] args) { InstanceProperties instanceProperties = new InstanceProperties(); instanceProperties.loadFromS3GivenInstanceId(s3Client, instanceId); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - List tables = tableNames.stream() - .map(name -> tableIndex.getTableByName(name) - .orElseThrow(() -> TableNotFoundException.withTableName(name))) - .collect(toUnmodifiableList()); String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL); - SplitIntoBatches.reusingListOfSize(10, - tables.stream(), - batch -> sendMessageBatch(sqsClient, batch, queueUrl)); + InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, tableNames); } finally { s3Client.shutdown(); dynamoClient.shutdown(); sqsClient.shutdown(); } } - - public static void sendMessageBatch(AmazonSQS sqsClient, List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } } diff --git a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java index fde0188a33..73f6f539c5 100644 --- a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java +++ b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java @@ -19,11 +19,14 @@ import com.amazonaws.services.sqs.model.SendMessageBatchRequest; import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; +import sleeper.core.table.TableIndex; +import sleeper.core.table.TableNotFoundException; import sleeper.core.table.TableStatus; import sleeper.core.util.SplitIntoBatches; import java.util.List; import java.util.UUID; +import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; @@ -32,11 +35,19 @@ public class InvokeForTables { private InvokeForTables() { } - public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, List tables) { - SplitIntoBatches.reusingListOfSize(10, tables.stream(), + public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, Stream tables) { + SplitIntoBatches.reusingListOfSize(10, tables, batch -> sendMessageBatch(sqsClient, queueUrl, batch)); } + public static void sendOneMessagePerTableByName( + AmazonSQS sqsClient, String queueUrl, TableIndex tableIndex, List tableNames) { + List tables = tableNames.stream().map(name -> tableIndex.getTableByName(name) + .orElseThrow(() -> TableNotFoundException.withTableName(name))) + .collect(toUnmodifiableList()); + sendOneMessagePerTable(sqsClient, queueUrl, tables.stream()); + } + private static void sendMessageBatch(AmazonSQS sqsClient, String queueUrl, List tablesBatch) { sqsClient.sendMessageBatch(new SendMessageBatchRequest() .withQueueUrl(queueUrl) diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java index 7623e1af02..4c8be9885f 100644 --- a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java @@ -29,14 +29,19 @@ import org.testcontainers.utility.DockerImageName; import sleeper.core.CommonTestConstants; +import sleeper.core.table.InMemoryTableIndex; +import sleeper.core.table.TableIndex; +import sleeper.core.table.TableNotFoundException; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.stream.IntStream; +import java.util.stream.Stream; import static java.util.stream.Collectors.toUnmodifiableList; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName; import static sleeper.invoke.tables.LocalStackAwsV1ClientHelper.buildAwsV1Client; @@ -55,7 +60,7 @@ void shouldSendOneMessage() { String queueUrl = createFifoQueueGetUrl(); // When - InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, List.of( + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, Stream.of( uniqueIdAndName("table-id", "table-name"))); // Then @@ -71,8 +76,7 @@ void shouldSendMoreMessagesThanFitInABatch() { // When InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, IntStream.rangeClosed(1, 11) - .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)) - .collect(toUnmodifiableList())); + .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))); // Then assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( @@ -82,6 +86,54 @@ void shouldSendMoreMessagesThanFitInABatch() { "table-id-11"); } + @Test + void shouldLookUpTableByName() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + tableIndex.create(uniqueIdAndName("table-id", "table-name")); + + // When + InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, List.of("table-name")); + + // Then + assertThat(receiveTableIdMessages(queueUrl, 2)) + .containsExactly("table-id"); + } + + @Test + void shouldFailLookUpTableByName() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + + // When / Then + assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName( + sqsClient, queueUrl, tableIndex, List.of("missing-table"))) + .isInstanceOf(TableNotFoundException.class); + assertThat(receiveTableIdMessages(queueUrl, 1)) + .isEmpty(); + } + + @Test + void shouldFailLookUpTableByNameOnSecondPage() { + // Given + String queueUrl = createFifoQueueGetUrl(); + TableIndex tableIndex = new InMemoryTableIndex(); + IntStream.rangeClosed(1, 11) + .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i)) + .forEach(tableIndex::create); + + // When / Then + assertThatThrownBy(() -> InvokeForTables.sendOneMessagePerTableByName(sqsClient, queueUrl, tableIndex, + IntStream.rangeClosed(1, 12) + .mapToObj(i -> "table-name-" + i) + .collect(toUnmodifiableList()))) + .isInstanceOf(TableNotFoundException.class); + assertThat(receiveTableIdMessages(queueUrl, 10)) + .isEmpty(); + } + private String createFifoQueueGetUrl() { CreateQueueResult result = sqsClient.createQueue(new CreateQueueRequest() .withQueueName(UUID.randomUUID().toString() + ".fifo") @@ -92,7 +144,8 @@ private String createFifoQueueGetUrl() { private List receiveTableIdMessages(String queueUrl, int maxMessages) { ReceiveMessageResult result = sqsClient.receiveMessage( new ReceiveMessageRequest(queueUrl) - .withMaxNumberOfMessages(maxMessages)); + .withMaxNumberOfMessages(maxMessages) + .withWaitTimeSeconds(0)); return result.getMessages().stream() .map(Message::getBody) .collect(toUnmodifiableList()); diff --git a/java/compaction/compaction-job-creation-lambda/pom.xml b/java/compaction/compaction-job-creation-lambda/pom.xml index e656f2c57e..b778867611 100644 --- a/java/compaction/compaction-job-creation-lambda/pom.xml +++ b/java/compaction/compaction-job-creation-lambda/pom.xml @@ -67,6 +67,11 @@ configuration ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + com.amazonaws diff --git a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java index 1fc3992fa2..4f7bdf8ce3 100644 --- a/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java +++ b/java/compaction/compaction-job-creation-lambda/src/main/java/sleeper/compaction/job/creation/lambda/CreateCompactionJobsTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.COMPACTION_JOB_CREATION_QUEUE_URL; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; @@ -72,25 +66,10 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(COMPACTION_JOB_CREATION_QUEUE_URL); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - } diff --git a/java/garbage-collector/pom.xml b/java/garbage-collector/pom.xml index f2b2ec2c14..09dc6e14e0 100644 --- a/java/garbage-collector/pom.xml +++ b/java/garbage-collector/pom.xml @@ -84,6 +84,11 @@ statestore ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java index be27c6e001..6d3354bc14 100644 --- a/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java +++ b/java/garbage-collector/src/main/java/sleeper/garbagecollector/GarbageCollectorTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.GARBAGE_COLLECTOR_QUEUE_URL; import static sleeper.configuration.properties.instance.GarbageCollectionProperty.GARBAGE_COLLECT_OFFLINE_TABLES; @@ -73,25 +67,11 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(GARBAGE_COLLECTOR_QUEUE_URL); boolean offlineEnabled = instanceProperties.getBoolean(GARBAGE_COLLECT_OFFLINE_TABLES); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - } diff --git a/java/metrics/pom.xml b/java/metrics/pom.xml index e76c4033de..93ad47817a 100644 --- a/java/metrics/pom.xml +++ b/java/metrics/pom.xml @@ -58,6 +58,11 @@ statestore ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + sleeper diff --git a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java index 287ba3876a..f020901b14 100644 --- a/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java +++ b/java/metrics/src/main/java/sleeper/metrics/TableMetricsTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.TABLE_METRICS_QUEUE_URL; import static sleeper.configuration.properties.instance.CommonProperty.METRICS_FOR_OFFLINE_TABLES; @@ -70,24 +64,11 @@ public Void handleRequest(ScheduledEvent event, Context context) { TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); String queueUrl = instanceProperties.get(TABLE_METRICS_QUEUE_URL); boolean offlineEnabled = instanceProperties.getBoolean(METRICS_FOR_OFFLINE_TABLES); - SplitIntoBatches.reusingListOfSize(10, - offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, + offlineEnabled ? tableIndex.streamAllTables() : tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } } diff --git a/java/splitter/splitter-lambda/pom.xml b/java/splitter/splitter-lambda/pom.xml index 8f668ccb77..167dd43ae2 100644 --- a/java/splitter/splitter-lambda/pom.xml +++ b/java/splitter/splitter-lambda/pom.xml @@ -62,6 +62,11 @@ splitter-core ${project.parent.version} + + sleeper + common-invoke-tables + ${project.parent.version} + org.testcontainers diff --git a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java index 56ea19c00a..f7fa22b1c9 100644 --- a/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java +++ b/java/splitter/splitter-lambda/src/main/java/sleeper/splitter/lambda/FindPartitionsToSplitTriggerLambda.java @@ -24,8 +24,6 @@ import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; -import com.amazonaws.services.sqs.model.SendMessageBatchRequest; -import com.amazonaws.services.sqs.model.SendMessageBatchRequestEntry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,15 +31,11 @@ import sleeper.configuration.properties.instance.InstanceProperties; import sleeper.configuration.table.index.DynamoDBTableIndex; import sleeper.core.table.TableIndex; -import sleeper.core.table.TableStatus; import sleeper.core.util.LoggedDuration; -import sleeper.core.util.SplitIntoBatches; +import sleeper.invoke.tables.InvokeForTables; import java.time.Instant; -import java.util.List; -import java.util.UUID; -import static java.util.stream.Collectors.toUnmodifiableList; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.CONFIG_BUCKET; import static sleeper.configuration.properties.instance.CdkDefinedInstanceProperty.FIND_PARTITIONS_TO_SPLIT_QUEUE_URL; @@ -71,25 +65,10 @@ public Void handleRequest(ScheduledEvent event, Context context) { String queueUrl = instanceProperties.get(FIND_PARTITIONS_TO_SPLIT_QUEUE_URL); TableIndex tableIndex = new DynamoDBTableIndex(instanceProperties, dynamoClient); - SplitIntoBatches.reusingListOfSize(10, - tableIndex.streamOnlineTables(), - tables -> sendMessageBatch(tables, queueUrl)); + InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, tableIndex.streamOnlineTables()); Instant finishTime = Instant.now(); LOGGER.info("Lambda finished at {} (ran for {})", finishTime, LoggedDuration.withFullOutput(startTime, finishTime)); return null; } - - private void sendMessageBatch(List tables, String queueUrl) { - sqsClient.sendMessageBatch(new SendMessageBatchRequest() - .withQueueUrl(queueUrl) - .withEntries(tables.stream() - .map(table -> new SendMessageBatchRequestEntry() - .withMessageDeduplicationId(UUID.randomUUID().toString()) - .withId(table.getTableUniqueId()) - .withMessageGroupId(table.getTableUniqueId()) - .withMessageBody(table.getTableUniqueId())) - .collect(toUnmodifiableList()))); - } - } From ed87c97bac5c5666406977728b0d75b20c355f77 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:11:18 +0100 Subject: [PATCH 5/9] Add common-invoke-tables to chunks.yaml --- .github/config/chunks.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/config/chunks.yaml b/.github/config/chunks.yaml index d29a863959..d654ec72da 100644 --- a/.github/config/chunks.yaml +++ b/.github/config/chunks.yaml @@ -18,7 +18,7 @@ chunks: common: name: Common workflow: chunk-common.yaml - modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, build, common/dynamodb-tools ] + modules: [ core, configuration, sketches, parquet, common/common-job, common/common-task, common/common-invoke-tables, build, common/dynamodb-tools ] compaction: name: Compaction workflow: chunk-compaction.yaml From 0cce7e8589f829669dc138e0844fcd8fc716a083 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:21:27 +0100 Subject: [PATCH 6/9] Fix workflow triggers --- .github/workflows/chunk-cdk.yaml | 3 ++- .github/workflows/chunk-clients.yaml | 1 + .github/workflows/chunk-common.yaml | 1 + .github/workflows/chunk-compaction.yaml | 1 + .github/workflows/chunk-data.yaml | 5 +++-- 5 files changed, 8 insertions(+), 3 deletions(-) diff --git a/.github/workflows/chunk-cdk.yaml b/.github/workflows/chunk-cdk.yaml index 867515d3b7..07ffa5111d 100644 --- a/.github/workflows/chunk-cdk.yaml +++ b/.github/workflows/chunk-cdk.yaml @@ -14,10 +14,10 @@ on: - 'java/cdk-environment/**' - 'java/system-test/system-test-cdk/**' - 'java/system-test/system-test-configuration/**' + - 'java/system-test/system-test-data-generation/**' - 'java/system-test/system-test-dsl/**' - 'java/system-test/system-test-drivers/**' - 'java/system-test/system-test-suite/**' - - 'java/system-test/system-test-data-generation/**' - 'java/bulk-import/bulk-import-starter/**' - 'java/clients/**' - 'java/ingest/ingest-batcher-submitter/**' @@ -34,6 +34,7 @@ on: - 'java/ingest/ingest-runner/**' - 'java/compaction/compaction-status-store/**' - 'java/compaction/compaction-job-creation/**' + - 'java/common/common-invoke-tables/**' - 'java/common/common-job/**' - 'java/ingest/ingest-status-store/**' - 'java/sketches/**' diff --git a/.github/workflows/chunk-clients.yaml b/.github/workflows/chunk-clients.yaml index bb2b4f58ae..b8a4f335c0 100644 --- a/.github/workflows/chunk-clients.yaml +++ b/.github/workflows/chunk-clients.yaml @@ -13,6 +13,7 @@ on: - 'java/compaction/compaction-status-store/**' - 'java/splitter/splitter-core/**' - 'java/ingest/ingest-batcher-store/**' + - 'java/common/common-invoke-tables/**' - 'java/common/common-task/**' - 'java/ingest/ingest-batcher-core/**' - 'java/compaction/compaction-core/**' diff --git a/.github/workflows/chunk-common.yaml b/.github/workflows/chunk-common.yaml index 3b4eaf3253..7c3203fd08 100644 --- a/.github/workflows/chunk-common.yaml +++ b/.github/workflows/chunk-common.yaml @@ -15,6 +15,7 @@ on: - 'java/parquet/**' - 'java/common/common-job/**' - 'java/common/common-task/**' + - 'java/common/common-invoke-tables/**' - 'java/build/**' - 'java/common/dynamodb-tools/**' diff --git a/.github/workflows/chunk-compaction.yaml b/.github/workflows/chunk-compaction.yaml index 62d02a2977..4ebb2b081b 100644 --- a/.github/workflows/chunk-compaction.yaml +++ b/.github/workflows/chunk-compaction.yaml @@ -19,6 +19,7 @@ on: - 'java/splitter/splitter-core/**' - 'java/splitter/splitter-lambda/**' - 'java/common/common-task/**' + - 'java/common/common-invoke-tables/**' - 'java/ingest/ingest-runner/**' - 'java/common/common-job/**' - 'java/ingest/ingest-status-store/**' diff --git a/.github/workflows/chunk-data.yaml b/.github/workflows/chunk-data.yaml index fe15b910c1..0cfdf66537 100644 --- a/.github/workflows/chunk-data.yaml +++ b/.github/workflows/chunk-data.yaml @@ -8,12 +8,13 @@ on: - 'code-style/checkstyle*.xml' - 'code-style/spotbugs*.xml' - 'java/pom.xml' - - 'java/metrics/**' - 'java/garbage-collector/**' + - 'java/metrics/**' + - 'java/common/common-invoke-tables/**' - 'java/statestore/**' - - 'java/common/dynamodb-tools/**' - 'java/parquet/**' - 'java/configuration/**' + - 'java/common/dynamodb-tools/**' - 'java/core/**' jobs: From 41b80d22be5a96dee8de5cb39c0e8b801c63ae61 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Thu, 25 Apr 2024 12:23:03 +0100 Subject: [PATCH 7/9] Remove duplicate LocalStackAwsV1ClientHelper --- java/common/common-invoke-tables/pom.xml | 7 ++++ .../invoke/tables/InvokeForTablesIT.java | 2 +- .../tables/LocalStackAwsV1ClientHelper.java | 39 ------------------- 3 files changed, 8 insertions(+), 40 deletions(-) delete mode 100644 java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java diff --git a/java/common/common-invoke-tables/pom.xml b/java/common/common-invoke-tables/pom.xml index eb8eb39a0b..24db271ea9 100644 --- a/java/common/common-invoke-tables/pom.xml +++ b/java/common/common-invoke-tables/pom.xml @@ -48,6 +48,13 @@ test-jar test + + sleeper + configuration + ${project.parent.version} + test-jar + test + org.testcontainers testcontainers diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java index 4c8be9885f..cec2bcf374 100644 --- a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java @@ -42,8 +42,8 @@ import static java.util.stream.Collectors.toUnmodifiableList; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static sleeper.configuration.testutils.LocalStackAwsV1ClientHelper.buildAwsV1Client; import static sleeper.core.table.TableStatusTestHelper.uniqueIdAndName; -import static sleeper.invoke.tables.LocalStackAwsV1ClientHelper.buildAwsV1Client; @Testcontainers public class InvokeForTablesIT { diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java deleted file mode 100644 index 7009da59f5..0000000000 --- a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/LocalStackAwsV1ClientHelper.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright 2022-2024 Crown Copyright - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package sleeper.invoke.tables; - -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import org.testcontainers.containers.localstack.LocalStackContainer; - -public class LocalStackAwsV1ClientHelper { - - private LocalStackAwsV1ClientHelper() { - } - - public static , T> T buildAwsV1Client(LocalStackContainer localStackContainer, LocalStackContainer.Service service, B builder) { - return builder - .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - localStackContainer.getEndpointOverride(service).toString(), - localStackContainer.getRegion())) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials( - localStackContainer.getAccessKey(), localStackContainer.getSecretKey()))) - .build(); - } - -} From 641bf0b7a71f06302152232401ecfb49efc675c0 Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 26 Apr 2024 10:19:07 +0100 Subject: [PATCH 8/9] Clarify test sending more messages than SQS send batch size --- .../java/sleeper/invoke/tables/InvokeForTablesIT.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java index cec2bcf374..e9c27584f0 100644 --- a/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java +++ b/java/common/common-invoke-tables/src/test/java/sleeper/invoke/tables/InvokeForTablesIT.java @@ -69,16 +69,16 @@ void shouldSendOneMessage() { } @Test - void shouldSendMoreMessagesThanFitInABatch() { - // Given + void shouldSendMoreMessagesThanFitInAnSqsSendMessageBatch() { + // Given a FIFO queue String queueUrl = createFifoQueueGetUrl(); - // When + // When we send more than the SQS hard limit of 10 messages to send in a single batch InvokeForTables.sendOneMessagePerTable(sqsClient, queueUrl, IntStream.rangeClosed(1, 11) .mapToObj(i -> uniqueIdAndName("table-id-" + i, "table-name-" + i))); - // Then + // Then we can receive those messages assertThat(receiveTableIdMessages(queueUrl, 10)).containsExactly( "table-id-1", "table-id-2", "table-id-3", "table-id-4", "table-id-5", "table-id-6", "table-id-7", "table-id-8", "table-id-9", "table-id-10"); From 661b57faa6a4e36ccb77a50d6ee21ad66ba3acdf Mon Sep 17 00:00:00 2001 From: patchwork01 <110390516+patchwork01@users.noreply.github.com> Date: Fri, 26 Apr 2024 10:21:39 +0100 Subject: [PATCH 9/9] Comment explaining maximum batch size --- .../src/main/java/sleeper/invoke/tables/InvokeForTables.java | 1 + 1 file changed, 1 insertion(+) diff --git a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java index 73f6f539c5..70beac9f10 100644 --- a/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java +++ b/java/common/common-invoke-tables/src/main/java/sleeper/invoke/tables/InvokeForTables.java @@ -36,6 +36,7 @@ private InvokeForTables() { } public static void sendOneMessagePerTable(AmazonSQS sqsClient, String queueUrl, Stream tables) { + // Limit to stay under the maximum size for an SQS sendMessageBatch call. SplitIntoBatches.reusingListOfSize(10, tables, batch -> sendMessageBatch(sqsClient, queueUrl, batch)); }